From 130a1151febcff22508db9a18e15169f08b4f57f Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Thu, 14 Nov 2024 03:18:56 +0000 Subject: [PATCH] MutationInterpreter prepare for proj --- src/Interpreters/MutationsInterpreter.cpp | 118 ++++++++++++++++++++++ src/Interpreters/MutationsInterpreter.h | 4 + src/Storages/MergeTree/MutateTask.cpp | 9 ++ 3 files changed, 131 insertions(+) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 0f25d5ac21c..2cd04de32f5 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -373,6 +373,14 @@ bool MutationsInterpreter::Source::isCompactPart() const return part && part->getType() == MergeTreeDataPartType::Compact; } +const NamesAndTypesList & MutationsInterpreter::Source::getColumns() const +{ + if (!part) + throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter source part is nullptr. It is a bug."); + + return part->getColumns(); +} + static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage) { auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -1062,6 +1070,116 @@ void MutationsInterpreter::prepare(bool dry_run) prepareMutationStages(stages, dry_run); } +void MutationsInterpreter::prepareForProjection(bool dry_run) +{ + if (is_prepared) + throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter is already prepared. It is a bug."); + + if (commands.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list"); + + auto all_columns = source.getColumns(); + NameSet available_columns_set(available_columns.begin(), available_columns.end()); + + /// Add _row_exists column if it is physically present in the part + if (source.hasLightweightDeleteMask()) + { + all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); + available_columns_set.insert(RowExistsColumn::name); + } + + NameSet updated_columns; + + for (auto & command : commands) + { + for (const auto & [name, _] : command.column_to_update_expression) + { + if (!available_columns_set.contains(name) && name != RowExistsColumn::name) + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, + "Column {} is updated but not requested to read", name); + + updated_columns.insert(name); + } + } + + std::vector read_columns; + + /// First, break a sequence of commands into stages. + for (const auto & command : commands) + { + if (command.type == MutationCommand::UPDATE) + { + mutation_kind.set(MutationKind::MUTATE_OTHER); + if (stages.empty() || !stages.back().column_to_updated.empty()) + stages.emplace_back(context); + if (stages.size() == 1) /// First stage only supports filtering and can't update columns. + stages.emplace_back(context); + + for (const auto & [column_name, update_expr] : command.column_to_update_expression) + { + /// When doing UPDATE column = expression WHERE condition + /// we will replace column to the result of the following expression: + /// + /// CAST(if(condition, CAST(expression, type), column), type) + /// + /// Inner CAST is needed to make 'if' work when branches have no common type, + /// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1. + /// + /// Outer CAST is added just in case if we don't trust the returning type of 'if'. + + DataTypePtr type; + if (column_name == RowExistsColumn::name) + { + type = RowExistsColumn::type; + deleted_mask_updated = true; + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name); + } + + auto type_literal = std::make_shared(type->getName()); + ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command); + + auto updated_column = makeASTFunction("_CAST", + makeASTFunction("if", + condition, + makeASTFunction("_CAST", + update_expr->clone(), + type_literal), + std::make_shared(column_name)), + type_literal); + + stages.back().column_to_updated.emplace(column_name, updated_column); + + if (condition && settings.return_mutated_rows) + stages.back().filters.push_back(condition); + } + } + else if (command.type == MutationCommand::READ_COLUMN) + { + mutation_kind.set(MutationKind::MUTATE_OTHER); + read_columns.emplace_back(command.column_name); + } + else + throw Exception(ErrorCodes::UNKNOWN_MUTATION_COMMAND, "Unknown mutation command type: {}", DB::toString(command.type)); + } + + if (!read_columns.empty()) + { + if (stages.empty() || !stages.back().column_to_updated.empty()) + stages.emplace_back(context); + if (stages.size() == 1) /// First stage only supports filtering and can't update columns. + stages.emplace_back(context); + + for (auto & column_name : read_columns) + stages.back().column_to_updated.emplace(column_name, std::make_shared(column_name)); + } + + is_prepared = true; + prepareMutationStages(stages, dry_run); +} + void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) { auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context); diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 901cd13cd2f..ccffdba00e6 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -129,6 +129,8 @@ public: bool hasBrokenProjection(const String & name) const; bool isCompactPart() const; + const NamesAndTypesList & getColumns() const; + void read( Stage & first_stage, QueryPlan & plan, @@ -160,6 +162,8 @@ private: void prepare(bool dry_run); + void prepareForProjection(bool dry_run); + void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); void prepareMutationStages(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 15abac099c6..5ca5132f82f 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -2270,6 +2270,10 @@ bool MutateTask::prepare() if (!ctx->for_interpreter.empty()) { + const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + /// Always disable filtering in mutations: we want to read and write all rows because for updates we rewrite only some of the /// columns and preserve the columns that are not affected, but after the update all columns must have the same number of row MutationsInterpreter::Settings settings(true); @@ -2280,6 +2284,11 @@ bool MutateTask::prepare() ctx->metadata_snapshot, ctx->for_interpreter, ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings); + auto projection_interpreter = std::make_unique( + *ctx->data, projection_part, alter_conversions, + ctx->metadata_snapshot, ctx->for_interpreter, + proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); + ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_statistics = ctx->interpreter->grabMaterializedStatistics(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();