From 4673b3fe1de44a030ca53ced88bd8d0efe9f94d6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 10 Jan 2023 16:31:01 +0100 Subject: [PATCH] Revert "Revert "Custom reading for mutation"" --- src/Interpreters/Context.h | 6 - src/Interpreters/MutationsInterpreter.cpp | 418 +++++++++++++----- src/Interpreters/MutationsInterpreter.h | 72 ++- .../QueryPlan/ReadFromMergeTree.cpp | 1 - .../Sources/ThrowingExceptionSource.h | 32 ++ src/Storages/IStorage.h | 2 + src/Storages/MergeTree/MergeTreeData.h | 2 + .../MergeTree/MergeTreeDataSelectExecutor.h | 14 +- .../MergeTree/MergeTreeSequentialSource.cpp | 112 ++++- .../MergeTree/MergeTreeSequentialSource.h | 13 + src/Storages/MergeTree/MutateTask.cpp | 9 +- 11 files changed, 539 insertions(+), 142 deletions(-) create mode 100644 src/Processors/Sources/ThrowingExceptionSource.h diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 4b7d0685ba3..58478ab79b8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -377,9 +377,6 @@ private: inline static ContextPtr global_context_instance; - /// A flag, used to mark if reader needs to apply deleted rows mask. - bool apply_deleted_mask = true; - /// Temporary data for query execution accounting. TemporaryDataOnDiskScopePtr temp_data_on_disk; public: @@ -973,9 +970,6 @@ public: bool isInternalQuery() const { return is_internal_query; } void setInternalQuery(bool internal) { is_internal_query = internal; } - bool applyDeletedMask() const { return apply_deleted_mask; } - void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; } - ActionLocksManagerPtr getActionLocksManager() const; enum class ApplicationType diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 3960e0759d6..f8627f1ff85 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -30,6 +30,9 @@ #include #include #include +#include +#include +#include namespace DB @@ -190,7 +193,8 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_ bool isStorageTouchedByMutations( - const StoragePtr & storage, + MergeTreeData & storage, + MergeTreeData::DataPartPtr source_part, const StorageMetadataPtr & metadata_snapshot, const std::vector & commands, ContextMutablePtr context_copy) @@ -199,19 +203,15 @@ bool isStorageTouchedByMutations( return false; bool all_commands_can_be_skipped = true; - auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast(storage); for (const MutationCommand & command : commands) { if (!command.predicate) /// The command touches all rows. return true; - if (command.partition && !storage_from_merge_tree_data_part) - throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED); - - if (command.partition && storage_from_merge_tree_data_part) + if (command.partition) { - const String partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context_copy); - if (partition_id == storage_from_merge_tree_data_part->getPartitionId()) + const String partition_id = storage.getPartitionIDFromQuery(command.partition, context_copy); + if (partition_id == source_part->info.partition_id) all_commands_can_be_skipped = false; } else @@ -229,13 +229,15 @@ bool isStorageTouchedByMutations( context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false); context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0)); - ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy); + ASTPtr select_query = prepareQueryAffectedAST(commands, storage.shared_from_this(), context_copy); + + auto storage_from_part = std::make_shared(source_part); /// Interpreter must be alive, when we use result of execute() method. /// For some reason it may copy context and give it into ExpressionTransform /// after that we will use context from destroyed stack frame in our stream. InterpreterSelectQuery interpreter( - select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections()); + select_query, context_copy, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections()); auto io = interpreter.execute(); PullingPipelineExecutor executor(io.pipeline); @@ -288,6 +290,57 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func; } +MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_)) +{ +} + +MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_) + : data(&storage_), part(std::move(source_part_)) +{ +} + +StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const +{ + if (data) + return data->getStorageSnapshot(snapshot_, context_); + + return storage->getStorageSnapshot(snapshot_, context_); +} + +StoragePtr MutationsInterpreter::Source::getStorage() const +{ + if (data) + return data->shared_from_this(); + + return storage; +} + +const MergeTreeData * MutationsInterpreter::Source::getMergeTreeData() const +{ + if (data) + return data; + + return dynamic_cast(storage.get()); +} + +bool MutationsInterpreter::Source::supportsLightweightDelete() const +{ + if (part) + return part->supportLightweightDeleteMutate(); + + return storage->supportsLightweightDelete(); +} + + +bool MutationsInterpreter::Source::hasLightweightDeleteMask() const +{ + return part && part->hasLightweightDelete(); +} + +bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const +{ + return data && data->getSettings()->materialize_ttl_recalculate_only; +} MutationsInterpreter::MutationsInterpreter( StoragePtr storage_, @@ -297,7 +350,45 @@ MutationsInterpreter::MutationsInterpreter( bool can_execute_, bool return_all_columns_, bool return_deleted_rows_) - : storage(std::move(storage_)) + : MutationsInterpreter( + Source(std::move(storage_)), + metadata_snapshot_, std::move(commands_), std::move(context_), + can_execute_, return_all_columns_, return_deleted_rows_) +{ + if (can_execute_ && dynamic_cast(source.getStorage().get())) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot execute mutation for {}. Mutation should be applied to every part separately.", + source.getStorage()->getName()); + } +} + +MutationsInterpreter::MutationsInterpreter( + MergeTreeData & storage_, + MergeTreeData::DataPartPtr source_part_, + const StorageMetadataPtr & metadata_snapshot_, + MutationCommands commands_, + ContextPtr context_, + bool can_execute_, + bool return_all_columns_, + bool return_deleted_rows_) + : MutationsInterpreter( + Source(storage_, std::move(source_part_)), + metadata_snapshot_, std::move(commands_), std::move(context_), + can_execute_, return_all_columns_, return_deleted_rows_) +{ +} + +MutationsInterpreter::MutationsInterpreter( + Source source_, + const StorageMetadataPtr & metadata_snapshot_, + MutationCommands commands_, + ContextPtr context_, + bool can_execute_, + bool return_all_columns_, + bool return_deleted_rows_) + : source(std::move(source_)) , metadata_snapshot(metadata_snapshot_) , commands(std::move(commands_)) , context(Context::createCopy(context_)) @@ -306,12 +397,12 @@ MutationsInterpreter::MutationsInterpreter( , return_all_columns(return_all_columns_) , return_deleted_rows(return_deleted_rows_) { - mutation_ast = prepare(!can_execute); + prepare(!can_execute); } -static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot) +static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) { - const MergeTreeData * merge_tree_data = dynamic_cast(storage.get()); + const MergeTreeData * merge_tree_data = source.getMergeTreeData(); if (!merge_tree_data) return {}; @@ -333,21 +424,12 @@ static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPt return key_columns; } -static bool materializeTTLRecalculateOnly(const StoragePtr & storage) -{ - auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast(storage); - if (!storage_from_merge_tree_data_part) - return false; - - return storage_from_merge_tree_data_part->materializeTTLRecalculateOnly(); -} - static void validateUpdateColumns( - const StoragePtr & storage, + const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns, const std::unordered_map & column_to_affected_materialized) { - NameSet key_columns = getKeyColumns(storage, metadata_snapshot); + NameSet key_columns = getKeyColumns(source, metadata_snapshot); for (const String & column_name : updated_columns) { @@ -364,7 +446,7 @@ static void validateUpdateColumns( /// Allow to override value of lightweight delete filter virtual column if (!found && column_name == LightweightDeleteDescription::FILTER_COLUMN.name) { - if (!storage->supportsLightweightDelete()) + if (!source.supportsLightweightDelete()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Lightweight delete is not supported for table"); found = true; } @@ -427,7 +509,7 @@ static std::optional> getExpressionsOfUpdatedNestedSubcolumn return res; } -ASTPtr MutationsInterpreter::prepare(bool dry_run) +void MutationsInterpreter::prepare(bool dry_run) { if (is_prepared) throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR); @@ -448,7 +530,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) } NameSet updated_columns; - bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage); + bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly(); for (const MutationCommand & command : commands) { @@ -481,7 +563,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) } } - validateUpdateColumns(storage, metadata_snapshot, updated_columns, column_to_affected_materialized); + validateUpdateColumns(source, metadata_snapshot, updated_columns, column_to_affected_materialized); } dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns); @@ -778,15 +860,10 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) stages_copy.back().filters = stage.filters; } - const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true); - InterpreterSelectQuery interpreter{ - select_query, context, storage, metadata_snapshot, - SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits().ignoreProjections()}; + prepareMutationStages(stages_copy, true); - auto first_stage_header = interpreter.getSampleBlock(); QueryPlan plan; - auto source = std::make_shared(first_stage_header); - plan.addStep(std::make_unique(Pipe(std::move(source)))); + initQueryPlan(stages_copy.front(), plan); auto pipeline = addStreamsForLaterStages(stages_copy, plan); updated_header = std::make_unique(pipeline.getHeader()); } @@ -801,21 +878,18 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) is_prepared = true; - return prepareInterpreterSelectQuery(stages, dry_run); + prepareMutationStages(stages, dry_run); } -ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & prepared_stages, bool dry_run) +void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) { - auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context); + auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context); auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects(); auto all_columns = storage_snapshot->getColumns(options); /// Add _row_exists column if it is present in the part - if (auto part_storage = dynamic_pointer_cast(storage)) - { - if (part_storage->hasLightweightDeletedMask()) - all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN}); - } + if (source.hasLightweightDeleteMask()) + all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN}); /// Next, for each stage calculate columns changed by this and previous stages. for (size_t i = 0; i < prepared_stages.size(); ++i) @@ -839,7 +913,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & /// Now, calculate `expressions_chain` for each stage except the first. /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. - for (size_t i = prepared_stages.size() - 1; i > 0; --i) + for (int64_t i = prepared_stages.size() - 1; i >= 0; --i) { auto & stage = prepared_stages[i]; @@ -859,7 +933,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & /// e.g. ALTER referencing the same table in scalar subquery bool execute_scalar_subqueries = !dry_run; auto syntax_result = TreeRewriter(context).analyze( - all_asts, all_columns, storage, storage_snapshot, + all_asts, all_columns, source.getStorage(), storage_snapshot, false, true, execute_scalar_subqueries); if (execute_scalar_subqueries && context->hasQueryContext()) @@ -897,6 +971,9 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & } } + if (i == 0 && actions_chain.steps.empty()) + actions_chain.lastStep(syntax_result->required_source_columns); + /// Remove all intermediate columns. actions_chain.addStep(); actions_chain.getLastStep().required_output.clear(); @@ -908,49 +985,198 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & actions_chain.finalize(); - /// Propagate information about columns needed as input. - for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) - prepared_stages[i - 1].output_columns.insert(column.name); - } - - /// Execute first stage as a SELECT statement. - - auto select = std::make_shared(); - - select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared()); - for (const auto & column_name : prepared_stages[0].output_columns) - select->select()->children.push_back(std::make_shared(column_name)); - - /// Don't let select list be empty. - if (select->select()->children.empty()) - select->select()->children.push_back(std::make_shared(Field(0))); - - if (!prepared_stages[0].filters.empty()) - { - ASTPtr where_expression; - if (prepared_stages[0].filters.size() == 1) - where_expression = prepared_stages[0].filters[0]; - else + if (i) { - auto coalesced_predicates = std::make_shared(); - coalesced_predicates->name = "and"; - coalesced_predicates->arguments = std::make_shared(); - coalesced_predicates->children.push_back(coalesced_predicates->arguments); - coalesced_predicates->arguments->children = prepared_stages[0].filters; - where_expression = std::move(coalesced_predicates); + /// Propagate information about columns needed as input. + for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) + prepared_stages[i - 1].output_columns.insert(column.name); + } + } +} + +/// This structure re-implements adding virtual columns while reading from MergeTree part. +/// It would be good to unify it with IMergeTreeSelectAlgorithm. +struct VirtualColumns +{ + struct ColumnAndPosition + { + ColumnWithTypeAndName column; + size_t position; + }; + + using Columns = std::vector; + + Columns virtuals; + Names columns_to_read; + + VirtualColumns(Names required_columns, const MergeTreeData::DataPartPtr & part) : columns_to_read(std::move(required_columns)) + { + for (size_t i = 0; i < columns_to_read.size(); ++i) + { + if (columns_to_read[i] == LightweightDeleteDescription::FILTER_COLUMN.name) + { + LoadedMergeTreeDataPartInfoForReader part_info_reader(part); + if (!part_info_reader.getColumns().contains(LightweightDeleteDescription::FILTER_COLUMN.name)) + { + ColumnWithTypeAndName mask_column; + mask_column.type = LightweightDeleteDescription::FILTER_COLUMN.type; + mask_column.column = mask_column.type->createColumnConst(0, 1); + mask_column.name = std::move(columns_to_read[i]); + + virtuals.emplace_back(ColumnAndPosition{.column = std::move(mask_column), .position = i}); + } + } + else if (columns_to_read[i] == "_partition_id") + { + ColumnWithTypeAndName column; + column.type = std::make_shared(); + column.column = column.type->createColumnConst(0, part->info.partition_id); + column.name = std::move(columns_to_read[i]); + + virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i}); + } + } + + if (!virtuals.empty()) + { + Names columns_no_virtuals; + columns_no_virtuals.reserve(columns_to_read.size()); + size_t next_virtual = 0; + for (size_t i = 0; i < columns_to_read.size(); ++i) + { + if (next_virtual < virtuals.size() && i == virtuals[next_virtual].position) + ++next_virtual; + else + columns_no_virtuals.emplace_back(std::move(columns_to_read[i])); + } + + columns_to_read.swap(columns_no_virtuals); } - select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); } - return select; + void addVirtuals(QueryPlan & plan) + { + auto dag = std::make_unique(plan.getCurrentDataStream().header.getColumnsWithTypeAndName()); + + for (auto & column : virtuals) + { + const auto & adding_const = dag->addColumn(std::move(column.column)); + auto & outputs = dag->getOutputs(); + outputs.insert(outputs.begin() + column.position, &adding_const); + } + + auto step = std::make_unique(plan.getCurrentDataStream(), std::move(dag)); + plan.addStep(std::move(step)); + } +}; + +void MutationsInterpreter::Source::read( + Stage & first_stage, + QueryPlan & plan, + const StorageMetadataPtr & snapshot_, + const ContextPtr & context_, + bool apply_deleted_mask_, + bool can_execute_) const +{ + auto required_columns = first_stage.expressions_chain.steps.front()->getRequiredColumns().getNames(); + auto storage_snapshot = getStorageSnapshot(snapshot_, context_); + + if (!can_execute_) + { + auto header = storage_snapshot->getSampleBlockForColumns(required_columns); + auto callback = []() + { + return DB::Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute a mutation because can_execute flag set to false"); + }; + + Pipe pipe(std::make_shared(header, callback)); + + auto read_from_pipe = std::make_unique(std::move(pipe)); + plan.addStep(std::move(read_from_pipe)); + return; + } + + if (data) + { + const auto & steps = first_stage.expressions_chain.steps; + const auto & names = first_stage.filter_column_names; + size_t num_filters = names.size(); + + ActionsDAGPtr filter; + if (!first_stage.filter_column_names.empty()) + { + + ActionsDAG::NodeRawConstPtrs nodes(num_filters); + for (size_t i = 0; i < num_filters; ++i) + nodes[i] = &steps[i]->actions()->findInOutputs(names[i]); + + filter = ActionsDAG::buildFilterActionsDAG(nodes, {}, context_); + } + + VirtualColumns virtual_columns(std::move(required_columns), part); + + createMergeTreeSequentialSource( + plan, *data, storage_snapshot, part, std::move(virtual_columns.columns_to_read), apply_deleted_mask_, filter, context_, + &Poco::Logger::get("MutationsInterpreter")); + + virtual_columns.addVirtuals(plan); + } + else + { + auto select = std::make_shared(); + + select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared()); + for (const auto & column_name : first_stage.output_columns) + select->select()->children.push_back(std::make_shared(column_name)); + + /// Don't let select list be empty. + if (select->select()->children.empty()) + select->select()->children.push_back(std::make_shared(Field(0))); + + if (!first_stage.filters.empty()) + { + ASTPtr where_expression; + if (first_stage.filters.size() == 1) + where_expression = first_stage.filters[0]; + else + { + auto coalesced_predicates = std::make_shared(); + coalesced_predicates->name = "and"; + coalesced_predicates->arguments = std::make_shared(); + coalesced_predicates->children.push_back(coalesced_predicates->arguments); + coalesced_predicates->arguments->children = first_stage.filters; + where_expression = std::move(coalesced_predicates); + } + select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); + } + + SelectQueryInfo query_info; + query_info.query = std::move(select); + + size_t max_block_size = context_->getSettingsRef().max_block_size; + size_t max_streams = 1; + storage->read(plan, required_columns, storage_snapshot, query_info, context_, QueryProcessingStage::FetchColumns, max_block_size, max_streams); + + if (!plan.isInitialized()) + { + /// It may be possible when there is nothing to read from storage. + auto header = storage_snapshot->getSampleBlockForColumns(required_columns); + auto read_from_pipe = std::make_unique(Pipe(std::make_shared(header))); + plan.addStep(std::move(read_from_pipe)); + } + } +} + +void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan) +{ + source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute); + addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context); } QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const { - for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage) + for (const Stage & stage : prepared_stages) { - const Stage & stage = prepared_stages[i_stage]; - for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) { const auto & step = stage.expressions_chain.steps[i]; @@ -988,14 +1214,11 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v void MutationsInterpreter::validate() { - if (!select_interpreter) - select_interpreter = std::make_unique(mutation_ast, context, storage, metadata_snapshot, select_limits); - const Settings & settings = context->getSettingsRef(); /// For Replicated* storages mutations cannot employ non-deterministic functions /// because that produces inconsistencies between replicas - if (startsWith(storage->getName(), "Replicated") && !settings.allow_nondeterministic_mutations) + if (startsWith(source.getStorage()->getName(), "Replicated") && !settings.allow_nondeterministic_mutations) { for (const auto & command : commands) { @@ -1012,7 +1235,7 @@ void MutationsInterpreter::validate() } QueryPlan plan; - select_interpreter->buildQueryPlan(plan); + initQueryPlan(stages.front(), plan); auto pipeline = addStreamsForLaterStages(stages, plan); } @@ -1021,23 +1244,8 @@ QueryPipelineBuilder MutationsInterpreter::execute() if (!can_execute) throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); - if (!select_interpreter) - { - /// Skip to apply deleted mask for MutateSomePartColumn cases when part has lightweight delete. - if (!apply_deleted_mask) - { - auto context_for_reading = Context::createCopy(context); - context_for_reading->setApplyDeletedMask(apply_deleted_mask); - select_interpreter = std::make_unique(mutation_ast, context_for_reading, storage, metadata_snapshot, select_limits); - } - else - select_interpreter = std::make_unique(mutation_ast, context, storage, metadata_snapshot, select_limits); - } - - QueryPlan plan; - select_interpreter->buildQueryPlan(plan); - + initQueryPlan(stages.front(), plan); auto builder = addStreamsForLaterStages(stages, plan); /// Sometimes we update just part of columns (for example UPDATE mutation) @@ -1069,11 +1277,7 @@ const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const size_t MutationsInterpreter::evaluateCommandsSize() { - for (const MutationCommand & command : commands) - if (unlikely(!command.predicate && !command.partition)) /// The command touches all rows. - return mutation_ast->size(); - - return std::max(prepareQueryAffectedAST(commands, storage, context)->size(), mutation_ast->size()); + return prepareQueryAffectedAST(commands, source.getStorage(), context)->size(); } std::optional MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const @@ -1096,7 +1300,7 @@ std::optional MutationsInterpreter::getStorageSortDescriptionIf ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const { - return DB::getPartitionAndPredicateExpressionForMutationCommand(command, storage, context); + return DB::getPartitionAndPredicateExpressionForMutationCommand(command, source.getStorage(), context); } bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 336c5f11162..fbcb56fac6f 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -19,7 +19,8 @@ using QueryPipelineBuilderPtr = std::unique_ptr; /// Return false if the data isn't going to be changed by mutations. bool isStorageTouchedByMutations( - const StoragePtr & storage, + MergeTreeData & storage, + MergeTreeData::DataPartPtr source_part, const StorageMetadataPtr & metadata_snapshot, const std::vector & commands, ContextMutablePtr context_copy @@ -35,6 +36,8 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( /// to this data. class MutationsInterpreter { + struct Stage; + public: /// Storage to mutate, array of mutations commands and context. If you really want to execute mutation /// use can_execute = true, in other cases (validation, amount of commands) it can be false @@ -47,8 +50,18 @@ public: bool return_all_columns_ = false, bool return_deleted_rows_ = false); - void validate(); + /// Special case for MergeTree + MutationsInterpreter( + MergeTreeData & storage_, + MergeTreeData::DataPartPtr source_part_, + const StorageMetadataPtr & metadata_snapshot_, + MutationCommands commands_, + ContextPtr context_, + bool can_execute_, + bool return_all_columns_ = false, + bool return_deleted_rows_ = false); + void validate(); size_t evaluateCommandsSize(); /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices. @@ -82,19 +95,60 @@ public: void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; } + /// Internal class which represents a data part for MergeTree + /// or just storage for other storages. + /// The main idea is to create a dedicated reading from MergeTree part. + /// Additionally we propagate some storage properties. + struct Source + { + StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const; + StoragePtr getStorage() const; + const MergeTreeData * getMergeTreeData() const; + + bool supportsLightweightDelete() const; + bool hasLightweightDeleteMask() const; + bool materializeTTLRecalculateOnly() const; + + void read( + Stage & first_stage, + QueryPlan & plan, + const StorageMetadataPtr & snapshot_, + const ContextPtr & context_, + bool apply_deleted_mask_, + bool can_execute_) const; + + explicit Source(StoragePtr storage_); + Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_); + + private: + StoragePtr storage; + + /// Special case for MergeTree. + MergeTreeData * data = nullptr; + MergeTreeData::DataPartPtr part; + }; + private: - ASTPtr prepare(bool dry_run); + MutationsInterpreter( + Source source_, + const StorageMetadataPtr & metadata_snapshot_, + MutationCommands commands_, + ContextPtr context_, + bool can_execute_, + bool return_all_columns_, + bool return_deleted_rows_); - struct Stage; + void prepare(bool dry_run); - ASTPtr prepareInterpreterSelectQuery(std::vector &prepared_stages, bool dry_run); + void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); + void prepareMutationStages(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; std::optional getStorageSortDescriptionIfPossible(const Block & header) const; ASTPtr getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const; - StoragePtr storage; + Source source; StorageMetadataPtr metadata_snapshot; MutationCommands commands; ContextPtr context; @@ -103,12 +157,6 @@ private: bool apply_deleted_mask = true; - ASTPtr mutation_ast; - - /// We have to store interpreter because it use own copy of context - /// and some streams from execute method may use it. - std::unique_ptr select_interpreter; - /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the /// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 0d8fe84f9d3..4765b2cbfbe 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -64,7 +64,6 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings( .save_marks_in_cache = true, .checksum_on_read = settings.checksum_on_read, .read_in_order = query_info.input_order_info != nullptr, - .apply_deleted_mask = context->applyDeletedMask(), .use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree && (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1), }; diff --git a/src/Processors/Sources/ThrowingExceptionSource.h b/src/Processors/Sources/ThrowingExceptionSource.h new file mode 100644 index 00000000000..5abebd89d07 --- /dev/null +++ b/src/Processors/Sources/ThrowingExceptionSource.h @@ -0,0 +1,32 @@ +#pragma once +#include + + +namespace DB +{ + +/// This source is throwing exception at the first attempt to read from it. +/// Can be used as a additional check that pipeline (or its part) is never executed. +class ThrowingExceptionSource : public ISource +{ +public: + + using CallBack = std::function; + + explicit ThrowingExceptionSource(Block header, CallBack callback_) + : ISource(std::move(header)) + , callback(std::move(callback_)) + {} + + String getName() const override { return "ThrowingExceptionSource"; } + +protected: + Chunk generate() override + { + throw callback(); + } + + CallBack callback; +}; + +} diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index cdf273b47df..7d927b51e5f 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -110,6 +110,8 @@ public: /// The name of the table. StorageID getStorageID() const; + virtual bool isMergeTree() const { return false; } + /// Returns true if the storage receives data from a remote server or servers. virtual bool isRemote() const { return false; } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 670c755cf72..19efd8f908a 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -424,6 +424,8 @@ public: StoragePolicyPtr getStoragePolicy() const override; + bool isMergeTree() const override { return true; } + bool supportsPrewhere() const override { return true; } bool supportsFinal() const override; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index e302663597d..30d09312245 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -66,6 +66,13 @@ public: size_t num_streams, std::shared_ptr max_block_numbers_to_read = nullptr) const; + static MarkRanges markRangesFromPKRange( + const MergeTreeData::DataPartPtr & part, + const StorageMetadataPtr & metadata_snapshot, + const KeyCondition & key_condition, + const Settings & settings, + Poco::Logger * log); + private: const MergeTreeData & data; Poco::Logger * log; @@ -78,13 +85,6 @@ private: const Settings & settings, Poco::Logger * log); - static MarkRanges markRangesFromPKRange( - const MergeTreeData::DataPartPtr & part, - const StorageMetadataPtr & metadata_snapshot, - const KeyCondition & key_condition, - const Settings & settings, - Poco::Logger * log); - static MarkRanges filterMarksUsingIndex( MergeTreeIndexPtr index_helper, MergeTreeIndexConditionPtr condition, diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 9e0c96fd88a..4539e0b36c5 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -1,9 +1,14 @@ #include #include #include +#include #include +#include +#include #include #include +#include +#include namespace DB { @@ -25,6 +30,8 @@ public: const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, Names columns_to_read_, + std::optional mark_ranges_, + bool apply_deleted_mask, bool read_with_direct_io_, bool take_column_types_from_storage, bool quiet = false); @@ -56,6 +63,8 @@ private: Poco::Logger * log = &Poco::Logger::get("MergeTreeSequentialSource"); + std::optional mark_ranges; + std::shared_ptr mark_cache; using MergeTreeReaderPtr = std::unique_ptr; MergeTreeReaderPtr reader; @@ -76,6 +85,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, Names columns_to_read_, + std::optional mark_ranges_, + bool apply_deleted_mask, bool read_with_direct_io_, bool take_column_types_from_storage, bool quiet) @@ -85,6 +96,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( , data_part(std::move(data_part_)) , columns_to_read(std::move(columns_to_read_)) , read_with_direct_io(read_with_direct_io_) + , mark_ranges(std::move(mark_ranges_)) , mark_cache(storage.getContext()->getMarkCache()) { if (!quiet) @@ -126,11 +138,15 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( MergeTreeReaderSettings reader_settings = { .read_settings = read_settings, - .save_marks_in_cache = false + .save_marks_in_cache = false, + .apply_deleted_mask = apply_deleted_mask, }; + if (!mark_ranges) + mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())}); + reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata, - MarkRanges{MarkRange(0, data_part->getMarksCount())}, + *mark_ranges, /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {}); } @@ -224,8 +240,10 @@ Pipe createMergeTreeSequentialSource( if (need_to_filter_deleted_rows) columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name); + bool apply_deleted_mask = false; + auto column_part_source = std::make_shared( - storage, storage_snapshot, data_part, columns, read_with_direct_io, take_column_types_from_storage, quiet); + storage, storage_snapshot, data_part, columns, std::optional{}, apply_deleted_mask, read_with_direct_io, take_column_types_from_storage, quiet); Pipe pipe(std::move(column_part_source)); @@ -242,4 +260,92 @@ Pipe createMergeTreeSequentialSource( return pipe; } +/// A Query Plan step to read from a single Merge Tree part +/// using Merge Tree Sequential Source (which reads strictly sequentially in a single thread). +/// This step is used for mutations because the usual reading is too tricky. +/// Previously, sequential reading was achieved by changing some settings like max_threads, +/// however, this approach lead to data corruption after some new settings were introduced. +class ReadFromPart final : public ISourceStep +{ +public: + ReadFromPart( + const MergeTreeData & storage_, + const StorageSnapshotPtr & storage_snapshot_, + MergeTreeData::DataPartPtr data_part_, + Names columns_to_read_, + bool apply_deleted_mask_, + ActionsDAGPtr filter_, + ContextPtr context_, + Poco::Logger * log_) + : ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}) + , storage(storage_) + , storage_snapshot(storage_snapshot_) + , data_part(std::move(data_part_)) + , columns_to_read(std::move(columns_to_read_)) + , apply_deleted_mask(apply_deleted_mask_) + , filter(std::move(filter_)) + , context(std::move(context_)) + , log(log_) + { + } + + String getName() const override { return fmt::format("ReadFromPart({})", data_part->name); } + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override + { + std::optional mark_ranges; + + const auto & metadata_snapshot = storage_snapshot->metadata; + if (filter && metadata_snapshot->hasPrimaryKey()) + { + const auto & primary_key = storage_snapshot->metadata->getPrimaryKey(); + const Names & primary_key_column_names = primary_key.column_names; + KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression, NameSet{}); + LOG_DEBUG(log, "Key condition: {}", key_condition.toString()); + + if (!key_condition.alwaysFalse()) + mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange( + data_part, metadata_snapshot, key_condition, context->getSettingsRef(), log); + + if (mark_ranges && mark_ranges->empty()) + { + pipeline.init(Pipe(std::make_unique(output_stream->header))); + return; + } + } + + auto source = std::make_unique( + storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges), apply_deleted_mask, false, true); + + pipeline.init(Pipe(std::move(source))); + } + +private: + const MergeTreeData & storage; + StorageSnapshotPtr storage_snapshot; + MergeTreeData::DataPartPtr data_part; + Names columns_to_read; + bool apply_deleted_mask; + ActionsDAGPtr filter; + ContextPtr context; + Poco::Logger * log; +}; + +void createMergeTreeSequentialSource( + QueryPlan & plan, + const MergeTreeData & storage, + const StorageSnapshotPtr & storage_snapshot, + MergeTreeData::DataPartPtr data_part, + Names columns_to_read, + bool apply_deleted_mask, + ActionsDAGPtr filter, + ContextPtr context, + Poco::Logger * log) +{ + auto reading = std::make_unique( + storage, storage_snapshot, std::move(data_part), std::move(columns_to_read), apply_deleted_mask, filter, std::move(context), log); + + plan.addStep(std::move(reading)); +} + } diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index c6c29f9d49a..fb249568e8f 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -20,4 +20,17 @@ Pipe createMergeTreeSequentialSource( bool quiet, std::shared_ptr> filtered_rows_count); +class QueryPlan; + +void createMergeTreeSequentialSource( + QueryPlan & plan, + const MergeTreeData & storage, + const StorageSnapshotPtr & storage_snapshot, + MergeTreeData::DataPartPtr data_part, + Names columns_to_read, + bool apply_deleted_mask, + ActionsDAGPtr filter, + ContextPtr context, + Poco::Logger * log); + } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index de68cb6f0ba..3ecb790243d 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -714,8 +714,6 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; - - StoragePtr storage_from_source_part; StorageMetadataPtr metadata_snapshot; MutationCommandsConstPtr commands; @@ -1478,10 +1476,9 @@ MutateTask::MutateTask( ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical(); ctx->txn = txn; ctx->source_part = ctx->future_part->parts[0]; - ctx->storage_from_source_part = std::make_shared(ctx->source_part); ctx->need_prefix = need_prefix_; - auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_); + auto storage_snapshot = ctx->data->getStorageSnapshot(ctx->metadata_snapshot, context_); extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false); } @@ -1554,7 +1551,7 @@ bool MutateTask::prepare() } if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations( - ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) + *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) { NameSet files_to_copy_instead_of_hardlinks; auto settings_ptr = ctx->data->getSettings(); @@ -1597,7 +1594,7 @@ bool MutateTask::prepare() if (!ctx->for_interpreter.empty()) { ctx->interpreter = std::make_unique( - ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); + *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->mutation_kind = ctx->interpreter->getMutationKind();