From af7e3640d196def9cb0d08365b5157b01ae14d1b Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Mon, 11 Nov 2024 01:52:19 +0000 Subject: [PATCH 1/7] preparaion --- src/Storages/MergeTree/MutateTask.cpp | 24 +++++++++++++++++++++++- src/Storages/ProjectionsDescription.cpp | 11 +++++++++++ src/Storages/ProjectionsDescription.h | 2 ++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 753b0c5d2fe..15abac099c6 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1028,6 +1028,8 @@ struct MutationContext std::unique_ptr num_mutations; QueryPipelineBuilder mutating_pipeline_builder; + /// source here is projection part instead of table part + QueryPipelineBuilder projection_mutating_pipeline_builder; QueryPipeline mutating_pipeline; // in std::unique_ptr mutating_executor; ProgressCallback progress_callback; @@ -1051,6 +1053,9 @@ struct MutationContext String mrk_extension; + /// Used in lightweight delete to bit mask the projection if possible, + /// preference is higher than rebuild as more performant. + std::vector projections_to_mask; std::vector projections_to_build; IMergeTreeDataPart::MinMaxIndexPtr minmax_idx; @@ -1144,6 +1149,14 @@ public: if (iterateThroughAllProjections()) return true; + state = State::NEED_MASK_PROJECTION_PARTS; + return true; + } + case State::NEED_MASK_PROJECTION_PARTS: + { + if (iterateThroughAllProjectionsToMask()) + return true; + state = State::SUCCESS; return true; } @@ -1163,6 +1176,7 @@ private: void finalizeTempProjections(); bool iterateThroughAllProjections(); void constructTaskForProjectionPartsMerge(); + bool iterateThroughAllProjectionsToMask(); void finalize(); enum class State : uint8_t @@ -1170,7 +1184,7 @@ private: NEED_PREPARE, NEED_MUTATE_ORIGINAL_PART, NEED_MERGE_PROJECTION_PARTS, - + NEED_MASK_PROJECTION_PARTS, SUCCESS }; @@ -1336,6 +1350,14 @@ bool PartMergerWriter::iterateThroughAllProjections() return true; } +bool PartMergerWriter::iterateThroughAllProjectionsToMask() +{ + for (size_t i = 0, size = ctx->projections_to_mask.size(); i < size; ++i) + { + } + return false; +} + void PartMergerWriter::finalize() { if (ctx->count_lightweight_deleted_rows) diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index 26c3238c940..3a4d8637a51 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -334,6 +334,17 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context) return ret; } +void ProjectionDescription::mask(const Block & block[[maybe_unused]], ContextPtr context[[maybe_unused]]) const +{ + // auto mut_context = Context::createCopy(context); + // query_ast_copy = query_ast->clone(); + // auto builder = InterpreterAlterQuery(query_ast_copy, mut_context, + // Pipe(std::make_shared(block))) + // .buildQueryPipeline(); + // auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); + // PullingPipelineExecutor executor(pipeline); + // executor.pull(ret); +} String ProjectionsDescription::toString() const { diff --git a/src/Storages/ProjectionsDescription.h b/src/Storages/ProjectionsDescription.h index 445a4828e31..f4068fdd403 100644 --- a/src/Storages/ProjectionsDescription.h +++ b/src/Storages/ProjectionsDescription.h @@ -93,6 +93,8 @@ struct ProjectionDescription Block calculate(const Block & block, ContextPtr context) const; + void mask(const Block & block, ContextPtr context) const; + String getDirectoryName() const { return name + ".proj"; } }; From 130a1151febcff22508db9a18e15169f08b4f57f Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Thu, 14 Nov 2024 03:18:56 +0000 Subject: [PATCH 2/7] MutationInterpreter prepare for proj --- src/Interpreters/MutationsInterpreter.cpp | 118 ++++++++++++++++++++++ src/Interpreters/MutationsInterpreter.h | 4 + src/Storages/MergeTree/MutateTask.cpp | 9 ++ 3 files changed, 131 insertions(+) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 0f25d5ac21c..2cd04de32f5 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -373,6 +373,14 @@ bool MutationsInterpreter::Source::isCompactPart() const return part && part->getType() == MergeTreeDataPartType::Compact; } +const NamesAndTypesList & MutationsInterpreter::Source::getColumns() const +{ + if (!part) + throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter source part is nullptr. It is a bug."); + + return part->getColumns(); +} + static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage) { auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -1062,6 +1070,116 @@ void MutationsInterpreter::prepare(bool dry_run) prepareMutationStages(stages, dry_run); } +void MutationsInterpreter::prepareForProjection(bool dry_run) +{ + if (is_prepared) + throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter is already prepared. It is a bug."); + + if (commands.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list"); + + auto all_columns = source.getColumns(); + NameSet available_columns_set(available_columns.begin(), available_columns.end()); + + /// Add _row_exists column if it is physically present in the part + if (source.hasLightweightDeleteMask()) + { + all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); + available_columns_set.insert(RowExistsColumn::name); + } + + NameSet updated_columns; + + for (auto & command : commands) + { + for (const auto & [name, _] : command.column_to_update_expression) + { + if (!available_columns_set.contains(name) && name != RowExistsColumn::name) + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, + "Column {} is updated but not requested to read", name); + + updated_columns.insert(name); + } + } + + std::vector read_columns; + + /// First, break a sequence of commands into stages. + for (const auto & command : commands) + { + if (command.type == MutationCommand::UPDATE) + { + mutation_kind.set(MutationKind::MUTATE_OTHER); + if (stages.empty() || !stages.back().column_to_updated.empty()) + stages.emplace_back(context); + if (stages.size() == 1) /// First stage only supports filtering and can't update columns. + stages.emplace_back(context); + + for (const auto & [column_name, update_expr] : command.column_to_update_expression) + { + /// When doing UPDATE column = expression WHERE condition + /// we will replace column to the result of the following expression: + /// + /// CAST(if(condition, CAST(expression, type), column), type) + /// + /// Inner CAST is needed to make 'if' work when branches have no common type, + /// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1. + /// + /// Outer CAST is added just in case if we don't trust the returning type of 'if'. + + DataTypePtr type; + if (column_name == RowExistsColumn::name) + { + type = RowExistsColumn::type; + deleted_mask_updated = true; + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name); + } + + auto type_literal = std::make_shared(type->getName()); + ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command); + + auto updated_column = makeASTFunction("_CAST", + makeASTFunction("if", + condition, + makeASTFunction("_CAST", + update_expr->clone(), + type_literal), + std::make_shared(column_name)), + type_literal); + + stages.back().column_to_updated.emplace(column_name, updated_column); + + if (condition && settings.return_mutated_rows) + stages.back().filters.push_back(condition); + } + } + else if (command.type == MutationCommand::READ_COLUMN) + { + mutation_kind.set(MutationKind::MUTATE_OTHER); + read_columns.emplace_back(command.column_name); + } + else + throw Exception(ErrorCodes::UNKNOWN_MUTATION_COMMAND, "Unknown mutation command type: {}", DB::toString(command.type)); + } + + if (!read_columns.empty()) + { + if (stages.empty() || !stages.back().column_to_updated.empty()) + stages.emplace_back(context); + if (stages.size() == 1) /// First stage only supports filtering and can't update columns. + stages.emplace_back(context); + + for (auto & column_name : read_columns) + stages.back().column_to_updated.emplace(column_name, std::make_shared(column_name)); + } + + is_prepared = true; + prepareMutationStages(stages, dry_run); +} + void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) { auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context); diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 901cd13cd2f..ccffdba00e6 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -129,6 +129,8 @@ public: bool hasBrokenProjection(const String & name) const; bool isCompactPart() const; + const NamesAndTypesList & getColumns() const; + void read( Stage & first_stage, QueryPlan & plan, @@ -160,6 +162,8 @@ private: void prepare(bool dry_run); + void prepareForProjection(bool dry_run); + void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); void prepareMutationStages(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 15abac099c6..5ca5132f82f 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -2270,6 +2270,10 @@ bool MutateTask::prepare() if (!ctx->for_interpreter.empty()) { + const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + /// Always disable filtering in mutations: we want to read and write all rows because for updates we rewrite only some of the /// columns and preserve the columns that are not affected, but after the update all columns must have the same number of row MutationsInterpreter::Settings settings(true); @@ -2280,6 +2284,11 @@ bool MutateTask::prepare() ctx->metadata_snapshot, ctx->for_interpreter, ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings); + auto projection_interpreter = std::make_unique( + *ctx->data, projection_part, alter_conversions, + ctx->metadata_snapshot, ctx->for_interpreter, + proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); + ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_statistics = ctx->interpreter->grabMaterializedStatistics(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); From 445879a2ac0ee290a00a49de65b5a5e62ab35c41 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Mon, 18 Nov 2024 02:09:50 +0000 Subject: [PATCH 3/7] prepare to add meta for proj part --- src/Interpreters/MutationsInterpreter.cpp | 130 +++++++++++++++++++++- src/Interpreters/MutationsInterpreter.h | 1 + src/Interpreters/TreeRewriter.cpp | 2 +- src/Storages/MergeTree/MutateTask.cpp | 23 ++-- 4 files changed, 144 insertions(+), 12 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 2cd04de32f5..c1834c83da7 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -450,7 +450,7 @@ MutationsInterpreter::MutationsInterpreter( } context = std::move(new_context); - prepare(!settings.can_execute); + metadata_snapshot ? prepare(!settings.can_execute) : prepareForProjection(!settings.can_execute); } static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) @@ -1177,7 +1177,7 @@ void MutationsInterpreter::prepareForProjection(bool dry_run) } is_prepared = true; - prepareMutationStages(stages, dry_run); + prepareMutationStagesForProjection(stages, dry_run); } void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) @@ -1309,6 +1309,132 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s } } +void MutationsInterpreter::prepareMutationStagesForProjection(std::vector & prepared_stages, bool dry_run) +{ + auto all_columns = source.getColumns(); + + /// Add _row_exists column if it is present in the part + if (source.hasLightweightDeleteMask() || deleted_mask_updated) + all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); + + bool has_filters = false; + /// Next, for each stage calculate columns changed by this and previous stages. + for (size_t i = 0; i < prepared_stages.size(); ++i) + { + if (settings.return_all_columns || !prepared_stages[i].filters.empty()) + { + for (const auto & column : all_columns) + { + if (column.name == RowExistsColumn::name && !deleted_mask_updated) + continue; + + prepared_stages[i].output_columns.insert(column.name); + } + + has_filters = true; + settings.apply_deleted_mask = true; + } + else + { + if (i > 0) + prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; + + /// Make sure that all updated columns are included into output_columns set. + /// This is important for a "hidden" column like _row_exists gets because it is a virtual column + /// and so it is not in the list of AllPhysical columns. + for (const auto & [column_name, _] : prepared_stages[i].column_to_updated) + { + if (column_name == RowExistsColumn::name && has_filters && !deleted_mask_updated) + continue; + + prepared_stages[i].output_columns.insert(column_name); + } + } + } + + /// Now, calculate `expressions_chain` for each stage except the first. + /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. + for (int64_t i = prepared_stages.size() - 1; i >= 0; --i) + { + auto & stage = prepared_stages[i]; + + ASTPtr all_asts = std::make_shared(); + + for (const auto & ast : stage.filters) + all_asts->children.push_back(ast); + + for (const auto & kv : stage.column_to_updated) + all_asts->children.push_back(kv.second); + + /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. + for (const auto & column : stage.output_columns) + all_asts->children.push_back(std::make_shared(column)); + + /// Executing scalar subquery on that stage can lead to deadlock + /// e.g. ALTER referencing the same table in scalar subquery + bool execute_scalar_subqueries = !dry_run; + auto syntax_result = TreeRewriter(context).analyze( + all_asts, all_columns, source.getStorage(), nullptr, + false, true, execute_scalar_subqueries); + + stage.analyzer = std::make_unique(all_asts, syntax_result, context); + + ExpressionActionsChain & actions_chain = stage.expressions_chain; + + if (!stage.filters.empty()) + { + auto ast = stage.filters.front(); + if (stage.filters.size() > 1) + ast = makeASTForLogicalAnd(std::move(stage.filters)); + + if (!actions_chain.steps.empty()) + actions_chain.addStep(); + + stage.analyzer->appendExpression(actions_chain, ast, dry_run); + stage.filter_column_names.push_back(ast->getColumnName()); + } + + if (!stage.column_to_updated.empty()) + { + if (!actions_chain.steps.empty()) + actions_chain.addStep(); + + for (const auto & kv : stage.column_to_updated) + stage.analyzer->appendExpression(actions_chain, kv.second, dry_run); + + auto & actions = actions_chain.getLastStep().actions(); + + for (const auto & kv : stage.column_to_updated) + { + auto column_name = kv.second->getColumnName(); + const auto & dag_node = actions->dag.findInOutputs(column_name); + const auto & alias = actions->dag.addAlias(dag_node, kv.first); + actions->dag.addOrReplaceInOutputs(alias); + } + } + + if (i == 0 && actions_chain.steps.empty()) + actions_chain.lastStep(syntax_result->required_source_columns); + + /// Remove all intermediate columns. + actions_chain.addStep(); + actions_chain.getLastStep().required_output.clear(); + ActionsDAG::NodeRawConstPtrs new_index; + for (const auto & name : stage.output_columns) + actions_chain.getLastStep().addRequiredOutput(name); + + actions_chain.getLastActions(); + actions_chain.finalize(); + + if (i) + { + /// Propagate information about columns needed as input. + for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) + prepared_stages[i - 1].output_columns.insert(column.name); + } + } +} + void MutationsInterpreter::Source::read( Stage & first_stage, QueryPlan & plan, diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index ccffdba00e6..3464fed9f33 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -166,6 +166,7 @@ private: void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); void prepareMutationStages(std::vector &prepared_stages, bool dry_run); + void prepareMutationStagesForProjection(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; std::optional getStorageSortDescriptionIfPossible(const Block & header) const; diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index 28e11166762..c261990a5df 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -1008,7 +1008,7 @@ TreeRewriterResult::TreeRewriterResult( /// Special columns are non physical columns, for example ALIAS void TreeRewriterResult::collectSourceColumns(bool add_special) { - if (storage) + if (storage && storage_snapshot) { auto options = GetColumnsOptions(add_special ? GetColumnsOptions::All : GetColumnsOptions::AllPhysical); options.withExtendedObjects(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5ca5132f82f..846658c3144 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1017,6 +1017,8 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; StorageMetadataPtr metadata_snapshot; + /// for projection part only, changed from table part + StorageMetadataPtr projection_metadata_snapshot; MutationCommandsConstPtr commands; time_t time_of_mutation; @@ -2270,10 +2272,6 @@ bool MutateTask::prepare() if (!ctx->for_interpreter.empty()) { - const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); - const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); - MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; - /// Always disable filtering in mutations: we want to read and write all rows because for updates we rewrite only some of the /// columns and preserve the columns that are not affected, but after the update all columns must have the same number of row MutationsInterpreter::Settings settings(true); @@ -2284,11 +2282,6 @@ bool MutateTask::prepare() ctx->metadata_snapshot, ctx->for_interpreter, ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings); - auto projection_interpreter = std::make_unique( - *ctx->data, projection_part, alter_conversions, - ctx->metadata_snapshot, ctx->for_interpreter, - proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); - ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_statistics = ctx->interpreter->grabMaterializedStatistics(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); @@ -2296,6 +2289,18 @@ bool MutateTask::prepare() ctx->updated_header = ctx->interpreter->getUpdatedHeader(); ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); + /// might be better to create metadata_snapshot for projection part. + const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + + auto projection_interpreter = std::make_unique( + *ctx->data, projection_part, alter_conversions, + nullptr, ctx->for_interpreter, + proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); + + ctx->projection_mutating_pipeline_builder = projection_interpreter->execute(); + lightweight_delete_mode = ctx->updated_header.has(RowExistsColumn::name); /// If under the condition of lightweight delete mode with rebuild option, add projections again here as we can only know /// the condition as early as from here. From 8fa8d5ac0ffcde885716ec27999895913e9dbdaa Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Tue, 19 Nov 2024 00:08:14 +0000 Subject: [PATCH 4/7] add metadata for projpart and remove duplicated code --- src/Interpreters/MutationsInterpreter.cpp | 238 +--------------------- src/Interpreters/MutationsInterpreter.h | 3 - src/Interpreters/TreeRewriter.cpp | 2 +- src/Storages/MergeTree/MutateTask.cpp | 15 +- 4 files changed, 15 insertions(+), 243 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index c1834c83da7..7e075c99521 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -450,7 +450,7 @@ MutationsInterpreter::MutationsInterpreter( } context = std::move(new_context); - metadata_snapshot ? prepare(!settings.can_execute) : prepareForProjection(!settings.can_execute); + prepare(!settings.can_execute); } static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) @@ -1070,116 +1070,6 @@ void MutationsInterpreter::prepare(bool dry_run) prepareMutationStages(stages, dry_run); } -void MutationsInterpreter::prepareForProjection(bool dry_run) -{ - if (is_prepared) - throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter is already prepared. It is a bug."); - - if (commands.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list"); - - auto all_columns = source.getColumns(); - NameSet available_columns_set(available_columns.begin(), available_columns.end()); - - /// Add _row_exists column if it is physically present in the part - if (source.hasLightweightDeleteMask()) - { - all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); - available_columns_set.insert(RowExistsColumn::name); - } - - NameSet updated_columns; - - for (auto & command : commands) - { - for (const auto & [name, _] : command.column_to_update_expression) - { - if (!available_columns_set.contains(name) && name != RowExistsColumn::name) - throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, - "Column {} is updated but not requested to read", name); - - updated_columns.insert(name); - } - } - - std::vector read_columns; - - /// First, break a sequence of commands into stages. - for (const auto & command : commands) - { - if (command.type == MutationCommand::UPDATE) - { - mutation_kind.set(MutationKind::MUTATE_OTHER); - if (stages.empty() || !stages.back().column_to_updated.empty()) - stages.emplace_back(context); - if (stages.size() == 1) /// First stage only supports filtering and can't update columns. - stages.emplace_back(context); - - for (const auto & [column_name, update_expr] : command.column_to_update_expression) - { - /// When doing UPDATE column = expression WHERE condition - /// we will replace column to the result of the following expression: - /// - /// CAST(if(condition, CAST(expression, type), column), type) - /// - /// Inner CAST is needed to make 'if' work when branches have no common type, - /// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1. - /// - /// Outer CAST is added just in case if we don't trust the returning type of 'if'. - - DataTypePtr type; - if (column_name == RowExistsColumn::name) - { - type = RowExistsColumn::type; - deleted_mask_updated = true; - } - else - { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name); - } - - auto type_literal = std::make_shared(type->getName()); - ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command); - - auto updated_column = makeASTFunction("_CAST", - makeASTFunction("if", - condition, - makeASTFunction("_CAST", - update_expr->clone(), - type_literal), - std::make_shared(column_name)), - type_literal); - - stages.back().column_to_updated.emplace(column_name, updated_column); - - if (condition && settings.return_mutated_rows) - stages.back().filters.push_back(condition); - } - } - else if (command.type == MutationCommand::READ_COLUMN) - { - mutation_kind.set(MutationKind::MUTATE_OTHER); - read_columns.emplace_back(command.column_name); - } - else - throw Exception(ErrorCodes::UNKNOWN_MUTATION_COMMAND, "Unknown mutation command type: {}", DB::toString(command.type)); - } - - if (!read_columns.empty()) - { - if (stages.empty() || !stages.back().column_to_updated.empty()) - stages.emplace_back(context); - if (stages.size() == 1) /// First stage only supports filtering and can't update columns. - stages.emplace_back(context); - - for (auto & column_name : read_columns) - stages.back().column_to_updated.emplace(column_name, std::make_shared(column_name)); - } - - is_prepared = true; - prepareMutationStagesForProjection(stages, dry_run); -} - void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) { auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context); @@ -1309,132 +1199,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s } } -void MutationsInterpreter::prepareMutationStagesForProjection(std::vector & prepared_stages, bool dry_run) -{ - auto all_columns = source.getColumns(); - - /// Add _row_exists column if it is present in the part - if (source.hasLightweightDeleteMask() || deleted_mask_updated) - all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type); - - bool has_filters = false; - /// Next, for each stage calculate columns changed by this and previous stages. - for (size_t i = 0; i < prepared_stages.size(); ++i) - { - if (settings.return_all_columns || !prepared_stages[i].filters.empty()) - { - for (const auto & column : all_columns) - { - if (column.name == RowExistsColumn::name && !deleted_mask_updated) - continue; - - prepared_stages[i].output_columns.insert(column.name); - } - - has_filters = true; - settings.apply_deleted_mask = true; - } - else - { - if (i > 0) - prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; - - /// Make sure that all updated columns are included into output_columns set. - /// This is important for a "hidden" column like _row_exists gets because it is a virtual column - /// and so it is not in the list of AllPhysical columns. - for (const auto & [column_name, _] : prepared_stages[i].column_to_updated) - { - if (column_name == RowExistsColumn::name && has_filters && !deleted_mask_updated) - continue; - - prepared_stages[i].output_columns.insert(column_name); - } - } - } - - /// Now, calculate `expressions_chain` for each stage except the first. - /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. - for (int64_t i = prepared_stages.size() - 1; i >= 0; --i) - { - auto & stage = prepared_stages[i]; - - ASTPtr all_asts = std::make_shared(); - - for (const auto & ast : stage.filters) - all_asts->children.push_back(ast); - - for (const auto & kv : stage.column_to_updated) - all_asts->children.push_back(kv.second); - - /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. - for (const auto & column : stage.output_columns) - all_asts->children.push_back(std::make_shared(column)); - - /// Executing scalar subquery on that stage can lead to deadlock - /// e.g. ALTER referencing the same table in scalar subquery - bool execute_scalar_subqueries = !dry_run; - auto syntax_result = TreeRewriter(context).analyze( - all_asts, all_columns, source.getStorage(), nullptr, - false, true, execute_scalar_subqueries); - - stage.analyzer = std::make_unique(all_asts, syntax_result, context); - - ExpressionActionsChain & actions_chain = stage.expressions_chain; - - if (!stage.filters.empty()) - { - auto ast = stage.filters.front(); - if (stage.filters.size() > 1) - ast = makeASTForLogicalAnd(std::move(stage.filters)); - - if (!actions_chain.steps.empty()) - actions_chain.addStep(); - - stage.analyzer->appendExpression(actions_chain, ast, dry_run); - stage.filter_column_names.push_back(ast->getColumnName()); - } - - if (!stage.column_to_updated.empty()) - { - if (!actions_chain.steps.empty()) - actions_chain.addStep(); - - for (const auto & kv : stage.column_to_updated) - stage.analyzer->appendExpression(actions_chain, kv.second, dry_run); - - auto & actions = actions_chain.getLastStep().actions(); - - for (const auto & kv : stage.column_to_updated) - { - auto column_name = kv.second->getColumnName(); - const auto & dag_node = actions->dag.findInOutputs(column_name); - const auto & alias = actions->dag.addAlias(dag_node, kv.first); - actions->dag.addOrReplaceInOutputs(alias); - } - } - - if (i == 0 && actions_chain.steps.empty()) - actions_chain.lastStep(syntax_result->required_source_columns); - - /// Remove all intermediate columns. - actions_chain.addStep(); - actions_chain.getLastStep().required_output.clear(); - ActionsDAG::NodeRawConstPtrs new_index; - for (const auto & name : stage.output_columns) - actions_chain.getLastStep().addRequiredOutput(name); - - actions_chain.getLastActions(); - actions_chain.finalize(); - - if (i) - { - /// Propagate information about columns needed as input. - for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) - prepared_stages[i - 1].output_columns.insert(column.name); - } - } -} - void MutationsInterpreter::Source::read( Stage & first_stage, QueryPlan & plan, diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 3464fed9f33..145774579da 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -162,11 +162,8 @@ private: void prepare(bool dry_run); - void prepareForProjection(bool dry_run); - void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); void prepareMutationStages(std::vector &prepared_stages, bool dry_run); - void prepareMutationStagesForProjection(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; std::optional getStorageSortDescriptionIfPossible(const Block & header) const; diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index c261990a5df..28e11166762 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -1008,7 +1008,7 @@ TreeRewriterResult::TreeRewriterResult( /// Special columns are non physical columns, for example ALIAS void TreeRewriterResult::collectSourceColumns(bool add_special) { - if (storage && storage_snapshot) + if (storage) { auto options = GetColumnsOptions(add_special ? GetColumnsOptions::All : GetColumnsOptions::AllPhysical); options.withExtendedObjects(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 846658c3144..997f48707ed 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1002,6 +1002,16 @@ void finalizeMutatedPart( new_data_part->default_codec = codec; } +static StorageMetadataPtr generateProjectionPartMetadata( + MergeTreeData::DataPartPtr projection_part, + StorageMetadataPtr table_metadata[[maybe_unused]]) +{ + auto res = std::make_shared(); + /// Currently only ColumnsDescription, later add as needed. + res->columns = projection_part->getColumnsDescription(); + return res; +} + } struct MutationContext @@ -2289,14 +2299,15 @@ bool MutateTask::prepare() ctx->updated_header = ctx->interpreter->getUpdatedHeader(); ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); - /// might be better to create metadata_snapshot for projection part. const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + ctx->projection_metadata_snapshot = MutationHelpers::generateProjectionPartMetadata(projection_part, ctx->metadata_snapshot); + auto projection_interpreter = std::make_unique( *ctx->data, projection_part, alter_conversions, - nullptr, ctx->for_interpreter, + ctx->projection_metadata_snapshot, ctx->for_interpreter, proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->projection_mutating_pipeline_builder = projection_interpreter->execute(); From 0ee0ab91323f9e870f87f6c0895b8df97f2fd15d Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Tue, 19 Nov 2024 00:35:46 +0000 Subject: [PATCH 5/7] use existing projpart --- src/Storages/MergeTree/MutateTask.cpp | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 997f48707ed..712835ede1c 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1001,17 +1001,6 @@ void finalizeMutatedPart( new_data_part->default_codec = codec; } - -static StorageMetadataPtr generateProjectionPartMetadata( - MergeTreeData::DataPartPtr projection_part, - StorageMetadataPtr table_metadata[[maybe_unused]]) -{ - auto res = std::make_shared(); - /// Currently only ColumnsDescription, later add as needed. - res->columns = projection_part->getColumnsDescription(); - return res; -} - } struct MutationContext @@ -1027,8 +1016,6 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; StorageMetadataPtr metadata_snapshot; - /// for projection part only, changed from table part - StorageMetadataPtr projection_metadata_snapshot; MutationCommandsConstPtr commands; time_t time_of_mutation; @@ -2303,11 +2290,9 @@ bool MutateTask::prepare() const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; - ctx->projection_metadata_snapshot = MutationHelpers::generateProjectionPartMetadata(projection_part, ctx->metadata_snapshot); - auto projection_interpreter = std::make_unique( *ctx->data, projection_part, alter_conversions, - ctx->projection_metadata_snapshot, ctx->for_interpreter, + proj_desc.metadata, ctx->for_interpreter, proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->projection_mutating_pipeline_builder = projection_interpreter->execute(); From 190651cf23a3203c4c0f37f6634885dfa63584d1 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Tue, 19 Nov 2024 20:44:19 +0000 Subject: [PATCH 6/7] add test --- src/Storages/MergeTree/MutateTask.cpp | 49 +++++++++++++++++-- .../03261_projection_mask.reference | 0 .../0_stateless/03261_projection_mask.sql | 29 +++++++++++ 3 files changed, 73 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/03261_projection_mask.reference create mode 100644 tests/queries/0_stateless/03261_projection_mask.sql diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 712835ede1c..5e30cafdee4 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1015,6 +1015,7 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; + MergeTreeData::DataPartPtr projection_part; StorageMetadataPtr metadata_snapshot; MutationCommandsConstPtr commands; @@ -1027,13 +1028,16 @@ struct MutationContext std::unique_ptr num_mutations; QueryPipelineBuilder mutating_pipeline_builder; - /// source here is projection part instead of table part - QueryPipelineBuilder projection_mutating_pipeline_builder; QueryPipeline mutating_pipeline; // in std::unique_ptr mutating_executor; ProgressCallback progress_callback; Block updated_header; + /// source here is projection part instead of table part + QueryPipelineBuilder projection_mutating_pipeline_builder; + QueryPipeline projection_mutating_pipeline; + std::unique_ptr projection_mutating_executor; + std::unique_ptr interpreter; UInt64 watch_prev_elapsed = 0; std::unique_ptr stage_progress; @@ -1050,6 +1054,9 @@ struct MutationContext MergeTreeData::MutableDataPartPtr new_data_part; IMergedBlockOutputStreamPtr out; + MergeTreeData::MutableDataPartPtr new_projection_part; + IMergedBlockOutputStreamPtr projection_out; + String mrk_extension; /// Used in lightweight delete to bit mask the projection if possible, @@ -1886,6 +1893,26 @@ private: ctx->projections_to_build = std::vector{ctx->projections_to_recalc.begin(), ctx->projections_to_recalc.end()}; + chassert (ctx->projection_mutating_pipeline_builder.initialized()); + builder = std::make_unique(std::move(ctx->projection_mutating_pipeline_builder)); + + // ctx->projection_out = std::make_shared( + // ctx->new_projection_part, + // ctx->metadata_snapshot, + // ctx->updated_header.getNamesAndTypesList(), + // ctx->compression_codec, + // std::vector(), + // std::vector(), + // nullptr, + // /*save_marks_in_cache=*/ false, + // ctx->projection_part->index_granularity, + // &ctx->projection_part->index_granularity_info + // ); + + ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + ctx->projection_mutating_pipeline.disableProfileEventUpdate(); + ctx->projection_mutating_executor = std::make_unique(ctx->projection_mutating_pipeline); + part_merger_writer_task = std::make_unique(ctx); } } @@ -2288,11 +2315,23 @@ bool MutateTask::prepare() const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); - MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second; + ctx->projection_part = projections_name_and_part.begin()->second; + + MutationCommands projection_commands; + + MutationHelpers::splitAndModifyMutationCommands( + ctx->projection_part, + proj_desc.metadata, + alter_conversions, + ctx->commands_for_part, + projection_commands, + ctx->for_file_renames, + false, + ctx->log); auto projection_interpreter = std::make_unique( - *ctx->data, projection_part, alter_conversions, - proj_desc.metadata, ctx->for_interpreter, + *ctx->data, ctx->projection_part, alter_conversions, + proj_desc.metadata, projection_commands, proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->projection_mutating_pipeline_builder = projection_interpreter->execute(); diff --git a/tests/queries/0_stateless/03261_projection_mask.reference b/tests/queries/0_stateless/03261_projection_mask.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03261_projection_mask.sql b/tests/queries/0_stateless/03261_projection_mask.sql new file mode 100644 index 00000000000..aeba53700dc --- /dev/null +++ b/tests/queries/0_stateless/03261_projection_mask.sql @@ -0,0 +1,29 @@ +-- compact +DROP TABLE IF EXISTS users; + +CREATE TABLE users ( + uid Int16, + name String, + age Int16, + projection p1 (select age, count() group by age), +) ENGINE = MergeTree order by uid +SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 10485760; + +INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34); + +DELETE FROM users WHERE age = 34; + +-- wide +DROP TABLE IF EXISTS users; + +CREATE TABLE users ( + uid Int16, + name String, + age Int16, + projection p1 (select age, count() group by age), +) ENGINE = MergeTree order by uid +SETTINGS lightweight_mutation_projection_mode = 'rebuild', min_bytes_for_wide_part = 0; + +INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34); + +DELETE FROM users WHERE age = 34; From 5231d00616e36fa7a85374be74f5d8d6b06870e1 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Wed, 20 Nov 2024 03:11:36 +0000 Subject: [PATCH 7/7] remove unused and add compact projout --- src/Interpreters/MutationsInterpreter.cpp | 8 ------ src/Interpreters/MutationsInterpreter.h | 2 -- .../MergeTree/MergeTreeDataWriter.cpp | 3 +- src/Storages/MergeTree/MutateTask.cpp | 28 +++++++++++++++++++ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 7e075c99521..0f25d5ac21c 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -373,14 +373,6 @@ bool MutationsInterpreter::Source::isCompactPart() const return part && part->getType() == MergeTreeDataPartType::Compact; } -const NamesAndTypesList & MutationsInterpreter::Source::getColumns() const -{ - if (!part) - throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter source part is nullptr. It is a bug."); - - return part->getColumns(); -} - static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage) { auto all_columns = metadata_snapshot->getColumns().getNamesOfPhysical(); diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 145774579da..901cd13cd2f 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -129,8 +129,6 @@ public: bool hasBrokenProjection(const String & name) const; bool isCompactPart() const; - const NamesAndTypesList & getColumns() const; - void read( Stage & first_stage, QueryPlan & plan, diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index ac29a9244b0..5d1ce7bd021 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -749,12 +749,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( TemporaryPart temp_part; const auto & metadata_snapshot = projection.metadata; - MergeTreeDataPartType part_type; /// Size of part would not be greater than block.bytes() + epsilon size_t expected_size = block.bytes(); // just check if there is enough space on parent volume MergeTreeData::reserveSpace(expected_size, parent_part->getDataPartStorage()); - part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; + MergeTreeDataPartType part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type; auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build(); auto projection_part_storage = new_data_part->getDataPartStoragePtr(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5e30cafdee4..f5cbf0bbc30 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1017,6 +1017,7 @@ struct MutationContext MergeTreeData::DataPartPtr source_part; MergeTreeData::DataPartPtr projection_part; StorageMetadataPtr metadata_snapshot; + StorageMetadataPtr projection_metadata_snapshot; MutationCommandsConstPtr commands; time_t time_of_mutation; @@ -1662,6 +1663,30 @@ private: ctx->mutating_pipeline.disableProfileEventUpdate(); ctx->mutating_executor = std::make_unique(ctx->mutating_pipeline); + chassert (ctx->projection_mutating_pipeline_builder.initialized()); + builder = std::make_unique(std::move(ctx->projection_mutating_pipeline_builder)); + + const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); + auto part_name = fmt::format("{}", projections_name_and_part.begin()->first); + auto new_data_part = ctx->new_data_part->getProjectionPartBuilder(part_name, true).withPartType(ctx->projection_part->getType()).build(); + + ctx->projection_out = std::make_shared( + new_data_part, + ctx->projection_metadata_snapshot, + ctx->projection_metadata_snapshot->getColumns().getAllPhysical(), + MergeTreeIndices{}, + ColumnsStatistics{}, + ctx->data->getContext()->chooseCompressionCodec(0, 0), + Tx::PrehistoricTID, + /*reset_columns=*/ false, + /*save_marks_in_cache=*/ false, + /*blocks_are_granules_size=*/ false, + ctx->data->getContext()->getWriteSettings()); + + ctx->projection_mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + ctx->projection_mutating_pipeline.disableProfileEventUpdate(); + ctx->projection_mutating_executor = std::make_unique(ctx->projection_mutating_pipeline); + part_merger_writer_task = std::make_unique(ctx); } @@ -2314,6 +2339,7 @@ bool MutateTask::prepare() ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress); const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin()); + ctx->projection_metadata_snapshot = proj_desc.metadata; const auto & projections_name_and_part = ctx->source_part->getProjectionParts(); ctx->projection_part = projections_name_and_part.begin()->second; @@ -2329,6 +2355,8 @@ bool MutateTask::prepare() false, ctx->log); + chassert(!projection_commands.empty()); + auto projection_interpreter = std::make_unique( *ctx->data, ctx->projection_part, alter_conversions, proj_desc.metadata, projection_commands,