diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index e963a453d50..c0e775c27d4 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -289,6 +289,15 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func; } +MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_)) +{ +} + +MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_) + : data(&storage_), part(std::move(source_part_)) +{ +} + StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const { if (data) @@ -297,14 +306,6 @@ StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const Storag return storage->getStorageSnapshot(snapshot_, context_); } -bool MutationsInterpreter::Source::supportsLightweightDelete() const -{ - if (part) - return part->supportLightweightDeleteMutate(); - - return storage->supportsLightweightDelete(); -} - StoragePtr MutationsInterpreter::Source::getStorage() const { if (data) @@ -313,8 +314,35 @@ StoragePtr MutationsInterpreter::Source::getStorage() const return storage; } +const MergeTreeData * MutationsInterpreter::Source::getMergeTreeData() const +{ + if (data) + return data; + + return dynamic_cast(storage.get()); +} + +bool MutationsInterpreter::Source::supportsLightweightDelete() const +{ + if (part) + return part->supportLightweightDeleteMutate(); + + return storage->supportsLightweightDelete(); +} + + +bool MutationsInterpreter::Source::hasLightweightDeleteMask() const +{ + return part && part->hasLightweightDelete(); +} + +bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const +{ + return data && data->getSettings()->materialize_ttl_recalculate_only; +} + MutationsInterpreter::MutationsInterpreter( - const StoragePtr & storage_, + StoragePtr storage_, const StorageMetadataPtr & metadata_snapshot_, MutationCommands commands_, ContextPtr context_, @@ -322,17 +350,16 @@ MutationsInterpreter::MutationsInterpreter( bool return_all_columns_, bool return_deleted_rows_) : MutationsInterpreter( - Source{.storage = storage_}, + Source(std::move(storage_)), metadata_snapshot_, std::move(commands_), std::move(context_), can_execute_, return_all_columns_, return_deleted_rows_) { - if (can_execute_ && (dynamic_cast(storage_.get()) || storage_->getName() == "MergeTree")) + if (can_execute_ && dynamic_cast(source.getStorage().get())) { - const auto & t = *storage_; throw Exception( ErrorCodes::LOGICAL_ERROR, - "Cannot execute mutation for {} {}. Mutation should be applied to every part separately.", - storage_->getName(), typeid(t).name()); + "Cannot execute mutation for {}. Mutation should be applied to every part separately.", + source.getStorage()->getName()); } } @@ -346,7 +373,7 @@ MutationsInterpreter::MutationsInterpreter( bool return_all_columns_, bool return_deleted_rows_) : MutationsInterpreter( - Source{.data = &storage_, .part = std::move(source_part_)}, + Source(storage_, std::move(source_part_)), metadata_snapshot_, std::move(commands_), std::move(context_), can_execute_, return_all_columns_, return_deleted_rows_) { @@ -374,7 +401,7 @@ MutationsInterpreter::MutationsInterpreter( static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) { - const MergeTreeData * merge_tree_data = source.data ? source.data : dynamic_cast(source.storage.get()); + const MergeTreeData * merge_tree_data = source.getMergeTreeData(); if (!merge_tree_data) return {}; @@ -495,7 +522,7 @@ void MutationsInterpreter::prepare(bool dry_run) NamesAndTypesList all_columns = columns_desc.getAllPhysical(); NameSet updated_columns; - bool materialize_ttl_recalculate_only = source.data && source.data->getSettings()->materialize_ttl_recalculate_only; + bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly(); for (const MutationCommand & command : commands) { @@ -853,7 +880,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s auto all_columns = storage_snapshot->getColumns(options); /// Add _row_exists column if it is present in the part - if (source.part && source.part->hasLightweightDelete()) + if (source.hasLightweightDeleteMask()) all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN}); /// Next, for each stage calculate columns changed by this and previous stages. @@ -959,6 +986,8 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s } } +/// This structure re-implements adding virtual columns while reading from MergeTree part. +/// It would be good to unify it with IMergeTreeSelectAlgorithm. struct VirtualColumns { struct ColumnAndPosition @@ -1059,18 +1088,12 @@ void MutationsInterpreter::Source::read( return; } - // for (const auto & name : required_columns) - // std::cerr << "====== Required column " + name << std::endl; - - const auto & steps = first_stage.expressions_chain.steps; - const auto & names = first_stage.filter_column_names; - size_t num_filters = names.size(); - - // for (size_t i = 0; i < steps.size(); ++i) - // std::cerr << steps[i]->actions()->dumpDAG() << std::endl; - if (data) { + const auto & steps = first_stage.expressions_chain.steps; + const auto & names = first_stage.filter_column_names; + size_t num_filters = names.size(); + ActionsDAGPtr filter; if (!first_stage.filter_column_names.empty()) { @@ -1089,23 +1112,6 @@ void MutationsInterpreter::Source::read( &Poco::Logger::get("MutationsInterpreter")); virtual_columns.addVirtuals(plan); - - // std::cerr << "<<<<<<<<< " << plan.getCurrentDataStream().header.dumpStructure() << std::endl; - - // for (size_t i = 0; i < steps.size(); ++i) - // { - // const auto & step = steps[i]; - // if (i < names.size()) - // { - // /// Execute DELETEs. - // plan.addStep(std::make_unique(plan.getCurrentDataStream(), step->actions(), names[i], false)); - // } - // else - // { - // /// Execute UPDATE or final projection. - // plan.addStep(std::make_unique(plan.getCurrentDataStream(), step->actions())); - // } - // } } else { @@ -1156,25 +1162,6 @@ void MutationsInterpreter::Source::read( void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan) { source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute); - - // const auto & steps = first_stage.expressions_chain.steps; - // const auto & names = first_stage.filter_column_names; - - // for (size_t i = 0; i < steps.size(); ++i) - // { - // const auto & step = steps[i]; - // if (i < names.size()) - // { - // /// Execute DELETEs. - // plan.addStep(std::make_unique(plan.getCurrentDataStream(), step->actions(), names[i], false)); - // } - // else - // { - // /// Execute UPDATE or final projection. - // plan.addStep(std::make_unique(plan.getCurrentDataStream(), step->actions())); - // } - // } - addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context); } @@ -1200,18 +1187,6 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v addCreatingSetsStep(plan, stage.analyzer->getPreparedSets(), context); } - { - if (can_execute && source.storage && source.storage->getName() == "MergeTree") - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cannot execute mutation for {}. Mutation should be applied to every part separately.", - source.storage->getName()); - - // WriteBufferFromOwnString buf; - // plan.explainPlan(buf, {.header = true, .actions = true}); - // std::cerr << "Plan 2 " + (source.storage ? source.storage->getName() : "") + " \n" + buf.str() << std::endl; - } - QueryPlanOptimizationSettings do_not_optimize_plan; do_not_optimize_plan.optimize_plan = false; @@ -1251,11 +1226,6 @@ void MutationsInterpreter::validate() QueryPlan plan; initQueryPlan(stages.front(), plan); - // { - // WriteBufferFromOwnString buf; - // plan.explainPlan(buf, {.header = true, .actions = true}); - // std::cerr << "Plan 1\n" + buf.str() << std::endl; - // } auto pipeline = addStreamsForLaterStages(stages, plan); } diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 53a85c02ec5..d7df8d22eda 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -42,7 +42,7 @@ public: /// Storage to mutate, array of mutations commands and context. If you really want to execute mutation /// use can_execute = true, in other cases (validation, amount of commands) it can be false MutationsInterpreter( - const StoragePtr & storage_, + StoragePtr storage_, const StorageMetadataPtr & metadata_snapshot_, MutationCommands commands_, ContextPtr context_, @@ -97,15 +97,13 @@ public: struct Source { - StoragePtr storage; - - /// Special case for MergeTree. - MergeTreeData * data = nullptr; - MergeTreeData::DataPartPtr part; - StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const; - bool supportsLightweightDelete() const; StoragePtr getStorage() const; + const MergeTreeData * getMergeTreeData() const; + + bool supportsLightweightDelete() const; + bool hasLightweightDeleteMask() const; + bool materializeTTLRecalculateOnly() const; void read( Stage & first_stage, @@ -114,6 +112,16 @@ public: const ContextPtr & context_, bool apply_deleted_mask_, bool can_execute_) const; + + explicit Source(StoragePtr storage_); + Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_); + + private: + StoragePtr storage; + + /// Special case for MergeTree. + MergeTreeData * data = nullptr; + MergeTreeData::DataPartPtr part; }; private: