diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index c1834c83da7..7e075c99521 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -450,7 +450,7 @@ MutationsInterpreter::MutationsInterpreter( } context = std::move(new_context); - metadata_snapshot ? prepare(!settings.can_execute) : prepareForProjection(!settings.can_execute); + prepare(!settings.can_execute); } static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) @@ -1070,116 +1070,6 @@ void MutationsInterpreter::prepare(bool dry_run) prepareMutationStages(stages, dry_run); } -void MutationsInterpreter::prepareForProjection(bool dry_run) -{ - if (is_prepared) - throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter is already prepared. It is a bug."); - - if (commands.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list"); - - auto all_columns = source.getColumns(); - NameSet available_columns_set(available_columns.begin(), available_columns.end()); - - /// Add _row_exists column if it is physically present in the part - if (source.hasLightweightDeleteMask()) - { - all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); - available_columns_set.insert(RowExistsColumn::name); - } - - NameSet updated_columns; - - for (auto & command : commands) - { - for (const auto & [name, _] : command.column_to_update_expression) - { - if (!available_columns_set.contains(name) && name != RowExistsColumn::name) - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, - "Column {} is updated but not requested to read", name); - - updated_columns.insert(name); - } - } - - std::vector read_columns; - - /// First, break a sequence of commands into stages. - for (const auto & command : commands) - { - if (command.type == MutationCommand::UPDATE) - { - mutation_kind.set(MutationKind::MUTATE_OTHER); - if (stages.empty() || !stages.back().column_to_updated.empty()) - stages.emplace_back(context); - if (stages.size() == 1) /// First stage only supports filtering and can't update columns. - stages.emplace_back(context); - - for (const auto & [column_name, update_expr] : command.column_to_update_expression) - { - /// When doing UPDATE column = expression WHERE condition - /// we will replace column to the result of the following expression: - /// - /// CAST(if(condition, CAST(expression, type), column), type) - /// - /// Inner CAST is needed to make 'if' work when branches have no common type, - /// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1. - /// - /// Outer CAST is added just in case if we don't trust the returning type of 'if'. - - DataTypePtr type; - if (column_name == RowExistsColumn::name) - { - type = RowExistsColumn::type; - deleted_mask_updated = true; - } - else - { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name); - } - - auto type_literal = std::make_shared(type->getName()); - ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command); - - auto updated_column = makeASTFunction("_CAST", - makeASTFunction("if", - condition, - makeASTFunction("_CAST", - update_expr->clone(), - type_literal), - std::make_shared(column_name)), - type_literal); - - stages.back().column_to_updated.emplace(column_name, updated_column); - - if (condition && settings.return_mutated_rows) - stages.back().filters.push_back(condition); - } - } - else if (command.type == MutationCommand::READ_COLUMN) - { - mutation_kind.set(MutationKind::MUTATE_OTHER); - read_columns.emplace_back(command.column_name); - } - else - throw Exception(ErrorCodes::UNKNOWN_MUTATION_COMMAND, "Unknown mutation command type: {}", DB::toString(command.type)); - } - - if (!read_columns.empty()) - { - if (stages.empty() || !stages.back().column_to_updated.empty()) - stages.emplace_back(context); - if (stages.size() == 1) /// First stage only supports filtering and can't update columns. - stages.emplace_back(context); - - for (auto & column_name : read_columns) - stages.back().column_to_updated.emplace(column_name, std::make_shared(column_name)); - } - - is_prepared = true; - prepareMutationStagesForProjection(stages, dry_run); -} - void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) { auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context); @@ -1309,132 +1199,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s } } -void MutationsInterpreter::prepareMutationStagesForProjection(std::vector & prepared_stages, bool dry_run) -{ - auto all_columns = source.getColumns(); - - /// Add _row_exists column if it is present in the part - if (source.hasLightweightDeleteMask() || deleted_mask_updated) - all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); - - bool has_filters = false; - /// Next, for each stage calculate columns changed by this and previous stages. - for (size_t i = 0; i < prepared_stages.size(); ++i) - { - if (settings.return_all_columns || !prepared_stages[i].filters.empty()) - { - for (const auto & column : all_columns) - { - if (column.name == RowExistsColumn::name && !deleted_mask_updated) - continue; - - prepared_stages[i].output_columns.insert(column.name); - } - - has_filters = true; - settings.apply_deleted_mask = true; - } - else - { - if (i > 0) - prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; - - /// Make sure that all updated columns are included into output_columns set. - /// This is important for a "hidden" column like _row_exists gets because it is a virtual column - /// and so it is not in the list of AllPhysical columns. - for (const auto & [column_name, _] : prepared_stages[i].column_to_updated) - { - if (column_name == RowExistsColumn::name && has_filters && !deleted_mask_updated) - continue; - - prepared_stages[i].output_columns.insert(column_name); - } - } - } - - /// Now, calculate `expressions_chain` for each stage except the first. - /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. - for (int64_t i = prepared_stages.size() - 1; i >= 0; --i) - { - auto & stage = prepared_stages[i]; - - ASTPtr all_asts = std::make_shared(); - - for (const auto & ast : stage.filters) - all_asts->children.push_back(ast); - - for (const auto & kv : stage.column_to_updated) - all_asts->children.push_back(kv.second); - - /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. - for (const auto & column : stage.output_columns) - all_asts->children.push_back(std::make_shared(column)); - - /// Executing scalar subquery on that stage can lead to deadlock - /// e.g. ALTER referencing the same table in scalar subquery - bool execute_scalar_subqueries = !dry_run; - auto syntax_result = TreeRewriter(context).analyze( - all_asts, all_columns, source.getStorage(), nullptr, - false, true, execute_scalar_subqueries); - - stage.analyzer = std::make_unique(all_asts, syntax_result, context); - - ExpressionActionsChain & actions_chain = stage.expressions_chain; - - if (!stage.filters.empty()) - { - auto ast = stage.filters.front(); - if (stage.filters.size() > 1) - ast = makeASTForLogicalAnd(std::move(stage.filters)); - - if (!actions_chain.steps.empty()) - actions_chain.addStep(); - - stage.analyzer->appendExpression(actions_chain, ast, dry_run); - stage.filter_column_names.push_back(ast->getColumnName()); - } - - if (!stage.column_to_updated.empty()) - { - if (!actions_chain.steps.empty()) - actions_chain.addStep(); - - for (const auto & kv : stage.column_to_updated) - stage.analyzer->appendExpression(actions_chain, kv.second, dry_run); - - auto & actions = actions_chain.getLastStep().actions(); - - for (const auto & kv : stage.column_to_updated) - { - auto column_name = kv.second->getColumnName(); - const auto & dag_node = actions->dag.findInOutputs(column_name); - const auto & alias = actions->dag.addAlias(dag_node, kv.first); - actions->dag.addOrReplaceInOutputs(alias); - } - } - - if (i == 0 && actions_chain.steps.empty()) - actions_chain.lastStep(syntax_result->required_source_columns); - - /// Remove all intermediate columns. - actions_chain.addStep(); - actions_chain.getLastStep().required_output.clear(); - ActionsDAG::NodeRawConstPtrs new_index; - for (const auto & name : stage.output_columns) - actions_chain.getLastStep().addRequiredOutput(name); - - actions_chain.getLastActions(); - actions_chain.finalize(); - - if (i) - { - /// Propagate information about columns needed as input. - for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) - prepared_stages[i - 1].output_columns.insert(column.name); - } - } -} - void MutationsInterpreter::Source::read( Stage & first_stage, QueryPlan & plan, diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 3464fed9f33..145774579da 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -162,11 +162,8 @@ private: void prepare(bool dry_run); - void prepareForProjection(bool dry_run); - void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); void prepareMutationStages(std::vector &prepared_stages, bool dry_run); - void prepareMutationStagesForProjection(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; std::optional getStorageSortDescriptionIfPossible(const Block & header) const; diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index c261990a5df..28e11166762 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -1008,7 +1008,7 @@ TreeRewriterResult::TreeRewriterResult( /// Special columns are non physical columns, for example ALIAS void TreeRewriterResult::collectSourceColumns(bool add_special) { - if (storage && storage_snapshot) + if (storage) { auto options = GetColumnsOptions(add_special ? GetColumnsOptions::All : GetColumnsOptions::AllPhysical); options.withExtendedObjects(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 846658c3144..997f48707ed 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1002,6 +1002,16 @@ void finalizeMutatedPart( new_data_part->default_codec = codec; } +static StorageMetadataPtr generateProjectionPartMetadata( + MergeTreeData::DataPartPtr projection_part, + StorageMetadataPtr table_metadata[[maybe_unused]]) +{ + auto res = std::make_shared(); + /// Currently only ColumnsDescription, later add as needed. + res->columns = projection_part->getColumnsDescription(); + return res; +} + } struct MutationContext @@ -2289,14 +2299,15 @@ bool MutateTask::prepare() ctx->updated_header = ctx->interpreter->getUpdatedHeader(); ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); - /// might be better to create metadata_snapshot for projection part. const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + ctx->projection_metadata_snapshot = MutationHelpers::generateProjectionPartMetadata(projection_part, ctx->metadata_snapshot); + auto projection_interpreter = std::make_unique( *ctx->data, projection_part, alter_conversions, - nullptr, ctx->for_interpreter, + ctx->projection_metadata_snapshot, ctx->for_interpreter, proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->projection_mutating_pipeline_builder = projection_interpreter->execute();