MutationInterpreter prepare for proj

This commit is contained in:
jsc0218 2024-11-14 03:18:56 +00:00
parent af7e3640d1
commit 130a1151fe
3 changed files with 131 additions and 0 deletions

View File

@ -373,6 +373,14 @@ bool MutationsInterpreter::Source::isCompactPart() const
return part && part->getType() == MergeTreeDataPartType::Compact;
}
const NamesAndTypesList & MutationsInterpreter::Source::getColumns() const
{
if (!part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter source part is nullptr. It is a bug.");
return part->getColumns();
}
static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage)
{
auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical();
@ -1062,6 +1070,116 @@ void MutationsInterpreter::prepare(bool dry_run)
prepareMutationStages(stages, dry_run);
}
void MutationsInterpreter::prepareForProjection(bool dry_run)
{
if (is_prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter is already prepared. It is a bug.");
if (commands.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list");
auto all_columns = source.getColumns();
NameSet available_columns_set(available_columns.begin(), available_columns.end());
/// Add _row_exists column if it is physically present in the part
if (source.hasLightweightDeleteMask())
{
all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
available_columns_set.insert(RowExistsColumn::name);
}
NameSet updated_columns;
for (auto & command : commands)
{
for (const auto & [name, _] : command.column_to_update_expression)
{
if (!available_columns_set.contains(name) && name != RowExistsColumn::name)
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
"Column {} is updated but not requested to read", name);
updated_columns.insert(name);
}
}
std::vector<String> read_columns;
/// First, break a sequence of commands into stages.
for (const auto & command : commands)
{
if (command.type == MutationCommand::UPDATE)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
for (const auto & [column_name, update_expr] : command.column_to_update_expression)
{
/// When doing UPDATE column = expression WHERE condition
/// we will replace column to the result of the following expression:
///
/// CAST(if(condition, CAST(expression, type), column), type)
///
/// Inner CAST is needed to make 'if' work when branches have no common type,
/// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1.
///
/// Outer CAST is added just in case if we don't trust the returning type of 'if'.
DataTypePtr type;
if (column_name == RowExistsColumn::name)
{
type = RowExistsColumn::type;
deleted_mask_updated = true;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name);
}
auto type_literal = std::make_shared<ASTLiteral>(type->getName());
ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command);
auto updated_column = makeASTFunction("_CAST",
makeASTFunction("if",
condition,
makeASTFunction("_CAST",
update_expr->clone(),
type_literal),
std::make_shared<ASTIdentifier>(column_name)),
type_literal);
stages.back().column_to_updated.emplace(column_name, updated_column);
if (condition && settings.return_mutated_rows)
stages.back().filters.push_back(condition);
}
}
else if (command.type == MutationCommand::READ_COLUMN)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
read_columns.emplace_back(command.column_name);
}
else
throw Exception(ErrorCodes::UNKNOWN_MUTATION_COMMAND, "Unknown mutation command type: {}", DB::toString<int>(command.type));
}
if (!read_columns.empty())
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
for (auto & column_name : read_columns)
stages.back().column_to_updated.emplace(column_name, std::make_shared<ASTIdentifier>(column_name));
}
is_prepared = true;
prepareMutationStages(stages, dry_run);
}
void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_stages, bool dry_run)
{
auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);

View File

@ -129,6 +129,8 @@ public:
bool hasBrokenProjection(const String & name) const;
bool isCompactPart() const;
const NamesAndTypesList & getColumns() const;
void read(
Stage & first_stage,
QueryPlan & plan,
@ -160,6 +162,8 @@ private:
void prepare(bool dry_run);
void prepareForProjection(bool dry_run);
void initQueryPlan(Stage & first_stage, QueryPlan & query_plan);
void prepareMutationStages(std::vector<Stage> &prepared_stages, bool dry_run);
QueryPipelineBuilder addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const;

View File

@ -2270,6 +2270,10 @@ bool MutateTask::prepare()
if (!ctx->for_interpreter.empty())
{
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second;
/// Always disable filtering in mutations: we want to read and write all rows because for updates we rewrite only some of the
/// columns and preserve the columns that are not affected, but after the update all columns must have the same number of row
MutationsInterpreter::Settings settings(true);
@ -2280,6 +2284,11 @@ bool MutateTask::prepare()
ctx->metadata_snapshot, ctx->for_interpreter,
ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings);
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, projection_part, alter_conversions,
ctx->metadata_snapshot, ctx->for_interpreter,
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_statistics = ctx->interpreter->grabMaterializedStatistics();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();