diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 58478ab79b8..4b7d0685ba3 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -377,6 +377,9 @@ private: inline static ContextPtr global_context_instance; + /// A flag, used to mark if reader needs to apply deleted rows mask. + bool apply_deleted_mask = true; + /// Temporary data for query execution accounting. TemporaryDataOnDiskScopePtr temp_data_on_disk; public: @@ -970,6 +973,9 @@ public: bool isInternalQuery() const { return is_internal_query; } void setInternalQuery(bool internal) { is_internal_query = internal; } + bool applyDeletedMask() const { return apply_deleted_mask; } + void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; } + ActionLocksManagerPtr getActionLocksManager() const; enum class ApplicationType diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index f8627f1ff85..3960e0759d6 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -30,9 +30,6 @@ #include #include #include -#include -#include -#include namespace DB @@ -193,8 +190,7 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_ bool isStorageTouchedByMutations( - MergeTreeData & storage, - MergeTreeData::DataPartPtr source_part, + const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot, const std::vector & commands, ContextMutablePtr context_copy) @@ -203,15 +199,19 @@ bool isStorageTouchedByMutations( return false; bool all_commands_can_be_skipped = true; + auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast(storage); for (const MutationCommand & command : commands) { if (!command.predicate) /// The command touches all rows. return true; - if (command.partition) + if (command.partition && !storage_from_merge_tree_data_part) + throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED); + + if (command.partition && storage_from_merge_tree_data_part) { - const String partition_id = storage.getPartitionIDFromQuery(command.partition, context_copy); - if (partition_id == source_part->info.partition_id) + const String partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context_copy); + if (partition_id == storage_from_merge_tree_data_part->getPartitionId()) all_commands_can_be_skipped = false; } else @@ -229,15 +229,13 @@ bool isStorageTouchedByMutations( context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false); context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0)); - ASTPtr select_query = prepareQueryAffectedAST(commands, storage.shared_from_this(), context_copy); - - auto storage_from_part = std::make_shared(source_part); + ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy); /// Interpreter must be alive, when we use result of execute() method. /// For some reason it may copy context and give it into ExpressionTransform /// after that we will use context from destroyed stack frame in our stream. InterpreterSelectQuery interpreter( - select_query, context_copy, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections()); + select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections()); auto io = interpreter.execute(); PullingPipelineExecutor executor(io.pipeline); @@ -290,57 +288,6 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func; } -MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_)) -{ -} - -MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_) - : data(&storage_), part(std::move(source_part_)) -{ -} - -StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const -{ - if (data) - return data->getStorageSnapshot(snapshot_, context_); - - return storage->getStorageSnapshot(snapshot_, context_); -} - -StoragePtr MutationsInterpreter::Source::getStorage() const -{ - if (data) - return data->shared_from_this(); - - return storage; -} - -const MergeTreeData * MutationsInterpreter::Source::getMergeTreeData() const -{ - if (data) - return data; - - return dynamic_cast(storage.get()); -} - -bool MutationsInterpreter::Source::supportsLightweightDelete() const -{ - if (part) - return part->supportLightweightDeleteMutate(); - - return storage->supportsLightweightDelete(); -} - - -bool MutationsInterpreter::Source::hasLightweightDeleteMask() const -{ - return part && part->hasLightweightDelete(); -} - -bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const -{ - return data && data->getSettings()->materialize_ttl_recalculate_only; -} MutationsInterpreter::MutationsInterpreter( StoragePtr storage_, @@ -350,45 +297,7 @@ MutationsInterpreter::MutationsInterpreter( bool can_execute_, bool return_all_columns_, bool return_deleted_rows_) - : MutationsInterpreter( - Source(std::move(storage_)), - metadata_snapshot_, std::move(commands_), std::move(context_), - can_execute_, return_all_columns_, return_deleted_rows_) -{ - if (can_execute_ && dynamic_cast(source.getStorage().get())) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cannot execute mutation for {}. Mutation should be applied to every part separately.", - source.getStorage()->getName()); - } -} - -MutationsInterpreter::MutationsInterpreter( - MergeTreeData & storage_, - MergeTreeData::DataPartPtr source_part_, - const StorageMetadataPtr & metadata_snapshot_, - MutationCommands commands_, - ContextPtr context_, - bool can_execute_, - bool return_all_columns_, - bool return_deleted_rows_) - : MutationsInterpreter( - Source(storage_, std::move(source_part_)), - metadata_snapshot_, std::move(commands_), std::move(context_), - can_execute_, return_all_columns_, return_deleted_rows_) -{ -} - -MutationsInterpreter::MutationsInterpreter( - Source source_, - const StorageMetadataPtr & metadata_snapshot_, - MutationCommands commands_, - ContextPtr context_, - bool can_execute_, - bool return_all_columns_, - bool return_deleted_rows_) - : source(std::move(source_)) + : storage(std::move(storage_)) , metadata_snapshot(metadata_snapshot_) , commands(std::move(commands_)) , context(Context::createCopy(context_)) @@ -397,12 +306,12 @@ MutationsInterpreter::MutationsInterpreter( , return_all_columns(return_all_columns_) , return_deleted_rows(return_deleted_rows_) { - prepare(!can_execute); + mutation_ast = prepare(!can_execute); } -static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) +static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot) { - const MergeTreeData * merge_tree_data = source.getMergeTreeData(); + const MergeTreeData * merge_tree_data = dynamic_cast(storage.get()); if (!merge_tree_data) return {}; @@ -424,12 +333,21 @@ static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const return key_columns; } +static bool materializeTTLRecalculateOnly(const StoragePtr & storage) +{ + auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast(storage); + if (!storage_from_merge_tree_data_part) + return false; + + return storage_from_merge_tree_data_part->materializeTTLRecalculateOnly(); +} + static void validateUpdateColumns( - const MutationsInterpreter::Source & source, + const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns, const std::unordered_map & column_to_affected_materialized) { - NameSet key_columns = getKeyColumns(source, metadata_snapshot); + NameSet key_columns = getKeyColumns(storage, metadata_snapshot); for (const String & column_name : updated_columns) { @@ -446,7 +364,7 @@ static void validateUpdateColumns( /// Allow to override value of lightweight delete filter virtual column if (!found && column_name == LightweightDeleteDescription::FILTER_COLUMN.name) { - if (!source.supportsLightweightDelete()) + if (!storage->supportsLightweightDelete()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Lightweight delete is not supported for table"); found = true; } @@ -509,7 +427,7 @@ static std::optional> getExpressionsOfUpdatedNestedSubcolumn return res; } -void MutationsInterpreter::prepare(bool dry_run) +ASTPtr MutationsInterpreter::prepare(bool dry_run) { if (is_prepared) throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR); @@ -530,7 +448,7 @@ void MutationsInterpreter::prepare(bool dry_run) } NameSet updated_columns; - bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly(); + bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage); for (const MutationCommand & command : commands) { @@ -563,7 +481,7 @@ void MutationsInterpreter::prepare(bool dry_run) } } - validateUpdateColumns(source, metadata_snapshot, updated_columns, column_to_affected_materialized); + validateUpdateColumns(storage, metadata_snapshot, updated_columns, column_to_affected_materialized); } dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns); @@ -860,10 +778,15 @@ void MutationsInterpreter::prepare(bool dry_run) stages_copy.back().filters = stage.filters; } - prepareMutationStages(stages_copy, true); + const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true); + InterpreterSelectQuery interpreter{ + select_query, context, storage, metadata_snapshot, + SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits().ignoreProjections()}; + auto first_stage_header = interpreter.getSampleBlock(); QueryPlan plan; - initQueryPlan(stages_copy.front(), plan); + auto source = std::make_shared(first_stage_header); + plan.addStep(std::make_unique(Pipe(std::move(source)))); auto pipeline = addStreamsForLaterStages(stages_copy, plan); updated_header = std::make_unique(pipeline.getHeader()); } @@ -878,18 +801,21 @@ void MutationsInterpreter::prepare(bool dry_run) is_prepared = true; - prepareMutationStages(stages, dry_run); + return prepareInterpreterSelectQuery(stages, dry_run); } -void MutationsInterpreter::prepareMutationStages(std::vector & prepared_stages, bool dry_run) +ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & prepared_stages, bool dry_run) { - auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context); + auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context); auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects(); auto all_columns = storage_snapshot->getColumns(options); /// Add _row_exists column if it is present in the part - if (source.hasLightweightDeleteMask()) - all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN}); + if (auto part_storage = dynamic_pointer_cast(storage)) + { + if (part_storage->hasLightweightDeletedMask()) + all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN}); + } /// Next, for each stage calculate columns changed by this and previous stages. for (size_t i = 0; i < prepared_stages.size(); ++i) @@ -913,7 +839,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s /// Now, calculate `expressions_chain` for each stage except the first. /// Do it backwards to propagate information about columns required as input for a stage to the previous stage. - for (int64_t i = prepared_stages.size() - 1; i >= 0; --i) + for (size_t i = prepared_stages.size() - 1; i > 0; --i) { auto & stage = prepared_stages[i]; @@ -933,7 +859,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s /// e.g. ALTER referencing the same table in scalar subquery bool execute_scalar_subqueries = !dry_run; auto syntax_result = TreeRewriter(context).analyze( - all_asts, all_columns, source.getStorage(), storage_snapshot, + all_asts, all_columns, storage, storage_snapshot, false, true, execute_scalar_subqueries); if (execute_scalar_subqueries && context->hasQueryContext()) @@ -971,9 +897,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s } } - if (i == 0 && actions_chain.steps.empty()) - actions_chain.lastStep(syntax_result->required_source_columns); - /// Remove all intermediate columns. actions_chain.addStep(); actions_chain.getLastStep().required_output.clear(); @@ -985,198 +908,49 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s actions_chain.finalize(); - if (i) - { - /// Propagate information about columns needed as input. - for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) - prepared_stages[i - 1].output_columns.insert(column.name); - } - } -} - -/// This structure re-implements adding virtual columns while reading from MergeTree part. -/// It would be good to unify it with IMergeTreeSelectAlgorithm. -struct VirtualColumns -{ - struct ColumnAndPosition - { - ColumnWithTypeAndName column; - size_t position; - }; - - using Columns = std::vector; - - Columns virtuals; - Names columns_to_read; - - VirtualColumns(Names required_columns, const MergeTreeData::DataPartPtr & part) : columns_to_read(std::move(required_columns)) - { - for (size_t i = 0; i < columns_to_read.size(); ++i) - { - if (columns_to_read[i] == LightweightDeleteDescription::FILTER_COLUMN.name) - { - LoadedMergeTreeDataPartInfoForReader part_info_reader(part); - if (!part_info_reader.getColumns().contains(LightweightDeleteDescription::FILTER_COLUMN.name)) - { - ColumnWithTypeAndName mask_column; - mask_column.type = LightweightDeleteDescription::FILTER_COLUMN.type; - mask_column.column = mask_column.type->createColumnConst(0, 1); - mask_column.name = std::move(columns_to_read[i]); - - virtuals.emplace_back(ColumnAndPosition{.column = std::move(mask_column), .position = i}); - } - } - else if (columns_to_read[i] == "_partition_id") - { - ColumnWithTypeAndName column; - column.type = std::make_shared(); - column.column = column.type->createColumnConst(0, part->info.partition_id); - column.name = std::move(columns_to_read[i]); - - virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i}); - } - } - - if (!virtuals.empty()) - { - Names columns_no_virtuals; - columns_no_virtuals.reserve(columns_to_read.size()); - size_t next_virtual = 0; - for (size_t i = 0; i < columns_to_read.size(); ++i) - { - if (next_virtual < virtuals.size() && i == virtuals[next_virtual].position) - ++next_virtual; - else - columns_no_virtuals.emplace_back(std::move(columns_to_read[i])); - } - - columns_to_read.swap(columns_no_virtuals); - } + /// Propagate information about columns needed as input. + for (const auto & column : actions_chain.steps.front()->getRequiredColumns()) + prepared_stages[i - 1].output_columns.insert(column.name); } - void addVirtuals(QueryPlan & plan) - { - auto dag = std::make_unique(plan.getCurrentDataStream().header.getColumnsWithTypeAndName()); + /// Execute first stage as a SELECT statement. - for (auto & column : virtuals) + auto select = std::make_shared(); + + select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared()); + for (const auto & column_name : prepared_stages[0].output_columns) + select->select()->children.push_back(std::make_shared(column_name)); + + /// Don't let select list be empty. + if (select->select()->children.empty()) + select->select()->children.push_back(std::make_shared(Field(0))); + + if (!prepared_stages[0].filters.empty()) + { + ASTPtr where_expression; + if (prepared_stages[0].filters.size() == 1) + where_expression = prepared_stages[0].filters[0]; + else { - const auto & adding_const = dag->addColumn(std::move(column.column)); - auto & outputs = dag->getOutputs(); - outputs.insert(outputs.begin() + column.position, &adding_const); + auto coalesced_predicates = std::make_shared(); + coalesced_predicates->name = "and"; + coalesced_predicates->arguments = std::make_shared(); + coalesced_predicates->children.push_back(coalesced_predicates->arguments); + coalesced_predicates->arguments->children = prepared_stages[0].filters; + where_expression = std::move(coalesced_predicates); } - - auto step = std::make_unique(plan.getCurrentDataStream(), std::move(dag)); - plan.addStep(std::move(step)); - } -}; - -void MutationsInterpreter::Source::read( - Stage & first_stage, - QueryPlan & plan, - const StorageMetadataPtr & snapshot_, - const ContextPtr & context_, - bool apply_deleted_mask_, - bool can_execute_) const -{ - auto required_columns = first_stage.expressions_chain.steps.front()->getRequiredColumns().getNames(); - auto storage_snapshot = getStorageSnapshot(snapshot_, context_); - - if (!can_execute_) - { - auto header = storage_snapshot->getSampleBlockForColumns(required_columns); - auto callback = []() - { - return DB::Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute a mutation because can_execute flag set to false"); - }; - - Pipe pipe(std::make_shared(header, callback)); - - auto read_from_pipe = std::make_unique(std::move(pipe)); - plan.addStep(std::move(read_from_pipe)); - return; + select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); } - if (data) - { - const auto & steps = first_stage.expressions_chain.steps; - const auto & names = first_stage.filter_column_names; - size_t num_filters = names.size(); - - ActionsDAGPtr filter; - if (!first_stage.filter_column_names.empty()) - { - - ActionsDAG::NodeRawConstPtrs nodes(num_filters); - for (size_t i = 0; i < num_filters; ++i) - nodes[i] = &steps[i]->actions()->findInOutputs(names[i]); - - filter = ActionsDAG::buildFilterActionsDAG(nodes, {}, context_); - } - - VirtualColumns virtual_columns(std::move(required_columns), part); - - createMergeTreeSequentialSource( - plan, *data, storage_snapshot, part, std::move(virtual_columns.columns_to_read), apply_deleted_mask_, filter, context_, - &Poco::Logger::get("MutationsInterpreter")); - - virtual_columns.addVirtuals(plan); - } - else - { - auto select = std::make_shared(); - - select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared()); - for (const auto & column_name : first_stage.output_columns) - select->select()->children.push_back(std::make_shared(column_name)); - - /// Don't let select list be empty. - if (select->select()->children.empty()) - select->select()->children.push_back(std::make_shared(Field(0))); - - if (!first_stage.filters.empty()) - { - ASTPtr where_expression; - if (first_stage.filters.size() == 1) - where_expression = first_stage.filters[0]; - else - { - auto coalesced_predicates = std::make_shared(); - coalesced_predicates->name = "and"; - coalesced_predicates->arguments = std::make_shared(); - coalesced_predicates->children.push_back(coalesced_predicates->arguments); - coalesced_predicates->arguments->children = first_stage.filters; - where_expression = std::move(coalesced_predicates); - } - select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); - } - - SelectQueryInfo query_info; - query_info.query = std::move(select); - - size_t max_block_size = context_->getSettingsRef().max_block_size; - size_t max_streams = 1; - storage->read(plan, required_columns, storage_snapshot, query_info, context_, QueryProcessingStage::FetchColumns, max_block_size, max_streams); - - if (!plan.isInitialized()) - { - /// It may be possible when there is nothing to read from storage. - auto header = storage_snapshot->getSampleBlockForColumns(required_columns); - auto read_from_pipe = std::make_unique(Pipe(std::make_shared(header))); - plan.addStep(std::move(read_from_pipe)); - } - } -} - -void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan) -{ - source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute); - addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context); + return select; } QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const { - for (const Stage & stage : prepared_stages) + for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage) { + const Stage & stage = prepared_stages[i_stage]; + for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) { const auto & step = stage.expressions_chain.steps[i]; @@ -1214,11 +988,14 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v void MutationsInterpreter::validate() { + if (!select_interpreter) + select_interpreter = std::make_unique(mutation_ast, context, storage, metadata_snapshot, select_limits); + const Settings & settings = context->getSettingsRef(); /// For Replicated* storages mutations cannot employ non-deterministic functions /// because that produces inconsistencies between replicas - if (startsWith(source.getStorage()->getName(), "Replicated") && !settings.allow_nondeterministic_mutations) + if (startsWith(storage->getName(), "Replicated") && !settings.allow_nondeterministic_mutations) { for (const auto & command : commands) { @@ -1235,7 +1012,7 @@ void MutationsInterpreter::validate() } QueryPlan plan; - initQueryPlan(stages.front(), plan); + select_interpreter->buildQueryPlan(plan); auto pipeline = addStreamsForLaterStages(stages, plan); } @@ -1244,8 +1021,23 @@ QueryPipelineBuilder MutationsInterpreter::execute() if (!can_execute) throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); + if (!select_interpreter) + { + /// Skip to apply deleted mask for MutateSomePartColumn cases when part has lightweight delete. + if (!apply_deleted_mask) + { + auto context_for_reading = Context::createCopy(context); + context_for_reading->setApplyDeletedMask(apply_deleted_mask); + select_interpreter = std::make_unique(mutation_ast, context_for_reading, storage, metadata_snapshot, select_limits); + } + else + select_interpreter = std::make_unique(mutation_ast, context, storage, metadata_snapshot, select_limits); + } + + QueryPlan plan; - initQueryPlan(stages.front(), plan); + select_interpreter->buildQueryPlan(plan); + auto builder = addStreamsForLaterStages(stages, plan); /// Sometimes we update just part of columns (for example UPDATE mutation) @@ -1277,7 +1069,11 @@ const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const size_t MutationsInterpreter::evaluateCommandsSize() { - return prepareQueryAffectedAST(commands, source.getStorage(), context)->size(); + for (const MutationCommand & command : commands) + if (unlikely(!command.predicate && !command.partition)) /// The command touches all rows. + return mutation_ast->size(); + + return std::max(prepareQueryAffectedAST(commands, storage, context)->size(), mutation_ast->size()); } std::optional MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const @@ -1300,7 +1096,7 @@ std::optional MutationsInterpreter::getStorageSortDescriptionIf ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const { - return DB::getPartitionAndPredicateExpressionForMutationCommand(command, source.getStorage(), context); + return DB::getPartitionAndPredicateExpressionForMutationCommand(command, storage, context); } bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index fbcb56fac6f..336c5f11162 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -19,8 +19,7 @@ using QueryPipelineBuilderPtr = std::unique_ptr; /// Return false if the data isn't going to be changed by mutations. bool isStorageTouchedByMutations( - MergeTreeData & storage, - MergeTreeData::DataPartPtr source_part, + const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot, const std::vector & commands, ContextMutablePtr context_copy @@ -36,8 +35,6 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( /// to this data. class MutationsInterpreter { - struct Stage; - public: /// Storage to mutate, array of mutations commands and context. If you really want to execute mutation /// use can_execute = true, in other cases (validation, amount of commands) it can be false @@ -50,18 +47,8 @@ public: bool return_all_columns_ = false, bool return_deleted_rows_ = false); - /// Special case for MergeTree - MutationsInterpreter( - MergeTreeData & storage_, - MergeTreeData::DataPartPtr source_part_, - const StorageMetadataPtr & metadata_snapshot_, - MutationCommands commands_, - ContextPtr context_, - bool can_execute_, - bool return_all_columns_ = false, - bool return_deleted_rows_ = false); - void validate(); + size_t evaluateCommandsSize(); /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices. @@ -95,60 +82,19 @@ public: void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; } - /// Internal class which represents a data part for MergeTree - /// or just storage for other storages. - /// The main idea is to create a dedicated reading from MergeTree part. - /// Additionally we propagate some storage properties. - struct Source - { - StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const; - StoragePtr getStorage() const; - const MergeTreeData * getMergeTreeData() const; - - bool supportsLightweightDelete() const; - bool hasLightweightDeleteMask() const; - bool materializeTTLRecalculateOnly() const; - - void read( - Stage & first_stage, - QueryPlan & plan, - const StorageMetadataPtr & snapshot_, - const ContextPtr & context_, - bool apply_deleted_mask_, - bool can_execute_) const; - - explicit Source(StoragePtr storage_); - Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_); - - private: - StoragePtr storage; - - /// Special case for MergeTree. - MergeTreeData * data = nullptr; - MergeTreeData::DataPartPtr part; - }; - private: - MutationsInterpreter( - Source source_, - const StorageMetadataPtr & metadata_snapshot_, - MutationCommands commands_, - ContextPtr context_, - bool can_execute_, - bool return_all_columns_, - bool return_deleted_rows_); + ASTPtr prepare(bool dry_run); - void prepare(bool dry_run); + struct Stage; - void initQueryPlan(Stage & first_stage, QueryPlan & query_plan); - void prepareMutationStages(std::vector &prepared_stages, bool dry_run); + ASTPtr prepareInterpreterSelectQuery(std::vector &prepared_stages, bool dry_run); QueryPipelineBuilder addStreamsForLaterStages(const std::vector & prepared_stages, QueryPlan & plan) const; std::optional getStorageSortDescriptionIfPossible(const Block & header) const; ASTPtr getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const; - Source source; + StoragePtr storage; StorageMetadataPtr metadata_snapshot; MutationCommands commands; ContextPtr context; @@ -157,6 +103,12 @@ private: bool apply_deleted_mask = true; + ASTPtr mutation_ast; + + /// We have to store interpreter because it use own copy of context + /// and some streams from execute method may use it. + std::unique_ptr select_interpreter; + /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the /// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 4765b2cbfbe..0d8fe84f9d3 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -64,6 +64,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings( .save_marks_in_cache = true, .checksum_on_read = settings.checksum_on_read, .read_in_order = query_info.input_order_info != nullptr, + .apply_deleted_mask = context->applyDeletedMask(), .use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree && (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1), }; diff --git a/src/Processors/Sources/ThrowingExceptionSource.h b/src/Processors/Sources/ThrowingExceptionSource.h deleted file mode 100644 index 5abebd89d07..00000000000 --- a/src/Processors/Sources/ThrowingExceptionSource.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once -#include - - -namespace DB -{ - -/// This source is throwing exception at the first attempt to read from it. -/// Can be used as a additional check that pipeline (or its part) is never executed. -class ThrowingExceptionSource : public ISource -{ -public: - - using CallBack = std::function; - - explicit ThrowingExceptionSource(Block header, CallBack callback_) - : ISource(std::move(header)) - , callback(std::move(callback_)) - {} - - String getName() const override { return "ThrowingExceptionSource"; } - -protected: - Chunk generate() override - { - throw callback(); - } - - CallBack callback; -}; - -} diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 7d927b51e5f..cdf273b47df 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -110,8 +110,6 @@ public: /// The name of the table. StorageID getStorageID() const; - virtual bool isMergeTree() const { return false; } - /// Returns true if the storage receives data from a remote server or servers. virtual bool isRemote() const { return false; } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 19efd8f908a..670c755cf72 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -424,8 +424,6 @@ public: StoragePolicyPtr getStoragePolicy() const override; - bool isMergeTree() const override { return true; } - bool supportsPrewhere() const override { return true; } bool supportsFinal() const override; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 30d09312245..e302663597d 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -66,13 +66,6 @@ public: size_t num_streams, std::shared_ptr max_block_numbers_to_read = nullptr) const; - static MarkRanges markRangesFromPKRange( - const MergeTreeData::DataPartPtr & part, - const StorageMetadataPtr & metadata_snapshot, - const KeyCondition & key_condition, - const Settings & settings, - Poco::Logger * log); - private: const MergeTreeData & data; Poco::Logger * log; @@ -85,6 +78,13 @@ private: const Settings & settings, Poco::Logger * log); + static MarkRanges markRangesFromPKRange( + const MergeTreeData::DataPartPtr & part, + const StorageMetadataPtr & metadata_snapshot, + const KeyCondition & key_condition, + const Settings & settings, + Poco::Logger * log); + static MarkRanges filterMarksUsingIndex( MergeTreeIndexPtr index_helper, MergeTreeIndexConditionPtr condition, diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 4539e0b36c5..9e0c96fd88a 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -1,14 +1,9 @@ #include #include #include -#include #include -#include -#include #include #include -#include -#include namespace DB { @@ -30,8 +25,6 @@ public: const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, Names columns_to_read_, - std::optional mark_ranges_, - bool apply_deleted_mask, bool read_with_direct_io_, bool take_column_types_from_storage, bool quiet = false); @@ -63,8 +56,6 @@ private: Poco::Logger * log = &Poco::Logger::get("MergeTreeSequentialSource"); - std::optional mark_ranges; - std::shared_ptr mark_cache; using MergeTreeReaderPtr = std::unique_ptr; MergeTreeReaderPtr reader; @@ -85,8 +76,6 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( const StorageSnapshotPtr & storage_snapshot_, MergeTreeData::DataPartPtr data_part_, Names columns_to_read_, - std::optional mark_ranges_, - bool apply_deleted_mask, bool read_with_direct_io_, bool take_column_types_from_storage, bool quiet) @@ -96,7 +85,6 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( , data_part(std::move(data_part_)) , columns_to_read(std::move(columns_to_read_)) , read_with_direct_io(read_with_direct_io_) - , mark_ranges(std::move(mark_ranges_)) , mark_cache(storage.getContext()->getMarkCache()) { if (!quiet) @@ -138,15 +126,11 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( MergeTreeReaderSettings reader_settings = { .read_settings = read_settings, - .save_marks_in_cache = false, - .apply_deleted_mask = apply_deleted_mask, + .save_marks_in_cache = false }; - if (!mark_ranges) - mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())}); - reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata, - *mark_ranges, + MarkRanges{MarkRange(0, data_part->getMarksCount())}, /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {}); } @@ -240,10 +224,8 @@ Pipe createMergeTreeSequentialSource( if (need_to_filter_deleted_rows) columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name); - bool apply_deleted_mask = false; - auto column_part_source = std::make_shared( - storage, storage_snapshot, data_part, columns, std::optional{}, apply_deleted_mask, read_with_direct_io, take_column_types_from_storage, quiet); + storage, storage_snapshot, data_part, columns, read_with_direct_io, take_column_types_from_storage, quiet); Pipe pipe(std::move(column_part_source)); @@ -260,92 +242,4 @@ Pipe createMergeTreeSequentialSource( return pipe; } -/// A Query Plan step to read from a single Merge Tree part -/// using Merge Tree Sequential Source (which reads strictly sequentially in a single thread). -/// This step is used for mutations because the usual reading is too tricky. -/// Previously, sequential reading was achieved by changing some settings like max_threads, -/// however, this approach lead to data corruption after some new settings were introduced. -class ReadFromPart final : public ISourceStep -{ -public: - ReadFromPart( - const MergeTreeData & storage_, - const StorageSnapshotPtr & storage_snapshot_, - MergeTreeData::DataPartPtr data_part_, - Names columns_to_read_, - bool apply_deleted_mask_, - ActionsDAGPtr filter_, - ContextPtr context_, - Poco::Logger * log_) - : ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}) - , storage(storage_) - , storage_snapshot(storage_snapshot_) - , data_part(std::move(data_part_)) - , columns_to_read(std::move(columns_to_read_)) - , apply_deleted_mask(apply_deleted_mask_) - , filter(std::move(filter_)) - , context(std::move(context_)) - , log(log_) - { - } - - String getName() const override { return fmt::format("ReadFromPart({})", data_part->name); } - - void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override - { - std::optional mark_ranges; - - const auto & metadata_snapshot = storage_snapshot->metadata; - if (filter && metadata_snapshot->hasPrimaryKey()) - { - const auto & primary_key = storage_snapshot->metadata->getPrimaryKey(); - const Names & primary_key_column_names = primary_key.column_names; - KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression, NameSet{}); - LOG_DEBUG(log, "Key condition: {}", key_condition.toString()); - - if (!key_condition.alwaysFalse()) - mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange( - data_part, metadata_snapshot, key_condition, context->getSettingsRef(), log); - - if (mark_ranges && mark_ranges->empty()) - { - pipeline.init(Pipe(std::make_unique(output_stream->header))); - return; - } - } - - auto source = std::make_unique( - storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges), apply_deleted_mask, false, true); - - pipeline.init(Pipe(std::move(source))); - } - -private: - const MergeTreeData & storage; - StorageSnapshotPtr storage_snapshot; - MergeTreeData::DataPartPtr data_part; - Names columns_to_read; - bool apply_deleted_mask; - ActionsDAGPtr filter; - ContextPtr context; - Poco::Logger * log; -}; - -void createMergeTreeSequentialSource( - QueryPlan & plan, - const MergeTreeData & storage, - const StorageSnapshotPtr & storage_snapshot, - MergeTreeData::DataPartPtr data_part, - Names columns_to_read, - bool apply_deleted_mask, - ActionsDAGPtr filter, - ContextPtr context, - Poco::Logger * log) -{ - auto reading = std::make_unique( - storage, storage_snapshot, std::move(data_part), std::move(columns_to_read), apply_deleted_mask, filter, std::move(context), log); - - plan.addStep(std::move(reading)); -} - } diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index fb249568e8f..c6c29f9d49a 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -20,17 +20,4 @@ Pipe createMergeTreeSequentialSource( bool quiet, std::shared_ptr> filtered_rows_count); -class QueryPlan; - -void createMergeTreeSequentialSource( - QueryPlan & plan, - const MergeTreeData & storage, - const StorageSnapshotPtr & storage_snapshot, - MergeTreeData::DataPartPtr data_part, - Names columns_to_read, - bool apply_deleted_mask, - ActionsDAGPtr filter, - ContextPtr context, - Poco::Logger * log); - } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 3ecb790243d..de68cb6f0ba 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -714,6 +714,8 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; + + StoragePtr storage_from_source_part; StorageMetadataPtr metadata_snapshot; MutationCommandsConstPtr commands; @@ -1476,9 +1478,10 @@ MutateTask::MutateTask( ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical(); ctx->txn = txn; ctx->source_part = ctx->future_part->parts[0]; + ctx->storage_from_source_part = std::make_shared(ctx->source_part); ctx->need_prefix = need_prefix_; - auto storage_snapshot = ctx->data->getStorageSnapshot(ctx->metadata_snapshot, context_); + auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_); extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false); } @@ -1551,7 +1554,7 @@ bool MutateTask::prepare() } if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations( - *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) + ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) { NameSet files_to_copy_instead_of_hardlinks; auto settings_ptr = ctx->data->getSettings(); @@ -1594,7 +1597,7 @@ bool MutateTask::prepare() if (!ctx->for_interpreter.empty()) { ctx->interpreter = std::make_unique( - *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); + ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->mutation_kind = ctx->interpreter->getMutationKind();