Merge pull request #45122 from ClickHouse/revert-45121-revert-44653-custom-reading-for-mutation

Revert "Revert "Custom reading for mutation""
This commit is contained in:
Nikolai Kochetov 2023-01-11 12:37:32 +01:00 committed by GitHub
commit 5e7a6ac619
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 541 additions and 147 deletions

View File

@ -377,9 +377,6 @@ private:
inline static ContextPtr global_context_instance; inline static ContextPtr global_context_instance;
/// A flag, used to mark if reader needs to apply deleted rows mask.
bool apply_deleted_mask = true;
/// Temporary data for query execution accounting. /// Temporary data for query execution accounting.
TemporaryDataOnDiskScopePtr temp_data_on_disk; TemporaryDataOnDiskScopePtr temp_data_on_disk;
public: public:
@ -973,9 +970,6 @@ public:
bool isInternalQuery() const { return is_internal_query; } bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; } void setInternalQuery(bool internal) { is_internal_query = internal; }
bool applyDeletedMask() const { return apply_deleted_mask; }
void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; }
ActionLocksManagerPtr getActionLocksManager() const; ActionLocksManagerPtr getActionLocksManager() const;
enum class ApplicationType enum class ApplicationType

View File

@ -30,6 +30,9 @@
#include <DataTypes/NestedUtils.h> #include <DataTypes/NestedUtils.h>
#include <Interpreters/PreparedSets.h> #include <Interpreters/PreparedSets.h>
#include <Storages/LightweightDeleteDescription.h> #include <Storages/LightweightDeleteDescription.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Processors/Sources/ThrowingExceptionSource.h>
namespace DB namespace DB
@ -190,7 +193,8 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_
bool isStorageTouchedByMutations( bool isStorageTouchedByMutations(
const StoragePtr & storage, MergeTreeData & storage,
MergeTreeData::DataPartPtr source_part,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands, const std::vector<MutationCommand> & commands,
ContextMutablePtr context_copy) ContextMutablePtr context_copy)
@ -199,19 +203,15 @@ bool isStorageTouchedByMutations(
return false; return false;
bool all_commands_can_be_skipped = true; bool all_commands_can_be_skipped = true;
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
for (const MutationCommand & command : commands) for (const MutationCommand & command : commands)
{ {
if (!command.predicate) /// The command touches all rows. if (!command.predicate) /// The command touches all rows.
return true; return true;
if (command.partition && !storage_from_merge_tree_data_part) if (command.partition)
throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED);
if (command.partition && storage_from_merge_tree_data_part)
{ {
const String partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context_copy); const String partition_id = storage.getPartitionIDFromQuery(command.partition, context_copy);
if (partition_id == storage_from_merge_tree_data_part->getPartitionId()) if (partition_id == source_part->info.partition_id)
all_commands_can_be_skipped = false; all_commands_can_be_skipped = false;
} }
else else
@ -229,13 +229,15 @@ bool isStorageTouchedByMutations(
context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false); context_copy->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0)); context_copy->setSetting("max_streams_for_merge_tree_reading", Field(0));
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy); ASTPtr select_query = prepareQueryAffectedAST(commands, storage.shared_from_this(), context_copy);
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part);
/// Interpreter must be alive, when we use result of execute() method. /// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and give it into ExpressionTransform /// For some reason it may copy context and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream. /// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter( InterpreterSelectQuery interpreter(
select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections()); select_query, context_copy, storage_from_part, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
auto io = interpreter.execute(); auto io = interpreter.execute();
PullingPipelineExecutor executor(io.pipeline); PullingPipelineExecutor executor(io.pipeline);
@ -288,6 +290,57 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func; return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func;
} }
MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_))
{
}
MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_)
: data(&storage_), part(std::move(source_part_))
{
}
StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const
{
if (data)
return data->getStorageSnapshot(snapshot_, context_);
return storage->getStorageSnapshot(snapshot_, context_);
}
StoragePtr MutationsInterpreter::Source::getStorage() const
{
if (data)
return data->shared_from_this();
return storage;
}
const MergeTreeData * MutationsInterpreter::Source::getMergeTreeData() const
{
if (data)
return data;
return dynamic_cast<const MergeTreeData *>(storage.get());
}
bool MutationsInterpreter::Source::supportsLightweightDelete() const
{
if (part)
return part->supportLightweightDeleteMutate();
return storage->supportsLightweightDelete();
}
bool MutationsInterpreter::Source::hasLightweightDeleteMask() const
{
return part && part->hasLightweightDelete();
}
bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const
{
return data && data->getSettings()->materialize_ttl_recalculate_only;
}
MutationsInterpreter::MutationsInterpreter( MutationsInterpreter::MutationsInterpreter(
StoragePtr storage_, StoragePtr storage_,
@ -297,7 +350,45 @@ MutationsInterpreter::MutationsInterpreter(
bool can_execute_, bool can_execute_,
bool return_all_columns_, bool return_all_columns_,
bool return_deleted_rows_) bool return_deleted_rows_)
: storage(std::move(storage_)) : MutationsInterpreter(
Source(std::move(storage_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
{
if (can_execute_ && dynamic_cast<const MergeTreeData *>(source.getStorage().get()))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot execute mutation for {}. Mutation should be applied to every part separately.",
source.getStorage()->getName());
}
}
MutationsInterpreter::MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
: MutationsInterpreter(
Source(storage_, std::move(source_part_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
{
}
MutationsInterpreter::MutationsInterpreter(
Source source_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
: source(std::move(source_))
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, commands(std::move(commands_)) , commands(std::move(commands_))
, context(Context::createCopy(context_)) , context(Context::createCopy(context_))
@ -306,12 +397,12 @@ MutationsInterpreter::MutationsInterpreter(
, return_all_columns(return_all_columns_) , return_all_columns(return_all_columns_)
, return_deleted_rows(return_deleted_rows_) , return_deleted_rows(return_deleted_rows_)
{ {
mutation_ast = prepare(!can_execute); prepare(!can_execute);
} }
static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot) static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot)
{ {
const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()); const MergeTreeData * merge_tree_data = source.getMergeTreeData();
if (!merge_tree_data) if (!merge_tree_data)
return {}; return {};
@ -333,21 +424,12 @@ static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPt
return key_columns; return key_columns;
} }
static bool materializeTTLRecalculateOnly(const StoragePtr & storage)
{
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
if (!storage_from_merge_tree_data_part)
return false;
return storage_from_merge_tree_data_part->materializeTTLRecalculateOnly();
}
static void validateUpdateColumns( static void validateUpdateColumns(
const StoragePtr & storage, const MutationsInterpreter::Source & source,
const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns, const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns,
const std::unordered_map<String, Names> & column_to_affected_materialized) const std::unordered_map<String, Names> & column_to_affected_materialized)
{ {
NameSet key_columns = getKeyColumns(storage, metadata_snapshot); NameSet key_columns = getKeyColumns(source, metadata_snapshot);
for (const String & column_name : updated_columns) for (const String & column_name : updated_columns)
{ {
@ -364,7 +446,7 @@ static void validateUpdateColumns(
/// Allow to override value of lightweight delete filter virtual column /// Allow to override value of lightweight delete filter virtual column
if (!found && column_name == LightweightDeleteDescription::FILTER_COLUMN.name) if (!found && column_name == LightweightDeleteDescription::FILTER_COLUMN.name)
{ {
if (!storage->supportsLightweightDelete()) if (!source.supportsLightweightDelete())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Lightweight delete is not supported for table"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Lightweight delete is not supported for table");
found = true; found = true;
} }
@ -427,7 +509,7 @@ static std::optional<std::vector<ASTPtr>> getExpressionsOfUpdatedNestedSubcolumn
return res; return res;
} }
ASTPtr MutationsInterpreter::prepare(bool dry_run) void MutationsInterpreter::prepare(bool dry_run)
{ {
if (is_prepared) if (is_prepared)
throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR); throw Exception("MutationsInterpreter is already prepared. It is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -441,14 +523,11 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
NamesAndTypesList all_columns = columns_desc.getAllPhysical(); NamesAndTypesList all_columns = columns_desc.getAllPhysical();
/// Add _row_exists column if it is physically present in the part /// Add _row_exists column if it is physically present in the part
if (auto part_storage = dynamic_pointer_cast<DB::StorageFromMergeTreeDataPart>(storage)) if (source.hasLightweightDeleteMask())
{ all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
if (part_storage->hasLightweightDeletedMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
}
NameSet updated_columns; NameSet updated_columns;
bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage); bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
for (const MutationCommand & command : commands) for (const MutationCommand & command : commands)
{ {
@ -481,7 +560,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
} }
} }
validateUpdateColumns(storage, metadata_snapshot, updated_columns, column_to_affected_materialized); validateUpdateColumns(source, metadata_snapshot, updated_columns, column_to_affected_materialized);
} }
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns); dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns);
@ -778,15 +857,10 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
stages_copy.back().filters = stage.filters; stages_copy.back().filters = stage.filters;
} }
const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true); prepareMutationStages(stages_copy, true);
InterpreterSelectQuery interpreter{
select_query, context, storage, metadata_snapshot,
SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits().ignoreProjections()};
auto first_stage_header = interpreter.getSampleBlock();
QueryPlan plan; QueryPlan plan;
auto source = std::make_shared<NullSource>(first_stage_header); initQueryPlan(stages_copy.front(), plan);
plan.addStep(std::make_unique<ReadFromPreparedSource>(Pipe(std::move(source))));
auto pipeline = addStreamsForLaterStages(stages_copy, plan); auto pipeline = addStreamsForLaterStages(stages_copy, plan);
updated_header = std::make_unique<Block>(pipeline.getHeader()); updated_header = std::make_unique<Block>(pipeline.getHeader());
} }
@ -801,21 +875,18 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
is_prepared = true; is_prepared = true;
return prepareInterpreterSelectQuery(stages, dry_run); prepareMutationStages(stages, dry_run);
} }
ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run) void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_stages, bool dry_run)
{ {
auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context); auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects(); auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto all_columns = storage_snapshot->getColumns(options); auto all_columns = storage_snapshot->getColumns(options);
/// Add _row_exists column if it is present in the part /// Add _row_exists column if it is present in the part
if (auto part_storage = dynamic_pointer_cast<DB::StorageFromMergeTreeDataPart>(storage)) if (source.hasLightweightDeleteMask())
{ all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
if (part_storage->hasLightweightDeletedMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
}
/// Next, for each stage calculate columns changed by this and previous stages. /// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i) for (size_t i = 0; i < prepared_stages.size(); ++i)
@ -839,7 +910,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
/// Now, calculate `expressions_chain` for each stage except the first. /// Now, calculate `expressions_chain` for each stage except the first.
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage. /// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
for (size_t i = prepared_stages.size() - 1; i > 0; --i) for (int64_t i = prepared_stages.size() - 1; i >= 0; --i)
{ {
auto & stage = prepared_stages[i]; auto & stage = prepared_stages[i];
@ -859,7 +930,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
/// e.g. ALTER referencing the same table in scalar subquery /// e.g. ALTER referencing the same table in scalar subquery
bool execute_scalar_subqueries = !dry_run; bool execute_scalar_subqueries = !dry_run;
auto syntax_result = TreeRewriter(context).analyze( auto syntax_result = TreeRewriter(context).analyze(
all_asts, all_columns, storage, storage_snapshot, all_asts, all_columns, source.getStorage(), storage_snapshot,
false, true, execute_scalar_subqueries); false, true, execute_scalar_subqueries);
if (execute_scalar_subqueries && context->hasQueryContext()) if (execute_scalar_subqueries && context->hasQueryContext())
@ -897,6 +968,9 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
} }
} }
if (i == 0 && actions_chain.steps.empty())
actions_chain.lastStep(syntax_result->required_source_columns);
/// Remove all intermediate columns. /// Remove all intermediate columns.
actions_chain.addStep(); actions_chain.addStep();
actions_chain.getLastStep().required_output.clear(); actions_chain.getLastStep().required_output.clear();
@ -908,49 +982,198 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
actions_chain.finalize(); actions_chain.finalize();
/// Propagate information about columns needed as input. if (i)
for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
prepared_stages[i - 1].output_columns.insert(column.name);
}
/// Execute first stage as a SELECT statement.
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
for (const auto & column_name : prepared_stages[0].output_columns)
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
/// Don't let select list be empty.
if (select->select()->children.empty())
select->select()->children.push_back(std::make_shared<ASTLiteral>(Field(0)));
if (!prepared_stages[0].filters.empty())
{
ASTPtr where_expression;
if (prepared_stages[0].filters.size() == 1)
where_expression = prepared_stages[0].filters[0];
else
{ {
auto coalesced_predicates = std::make_shared<ASTFunction>(); /// Propagate information about columns needed as input.
coalesced_predicates->name = "and"; for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>(); prepared_stages[i - 1].output_columns.insert(column.name);
coalesced_predicates->children.push_back(coalesced_predicates->arguments); }
coalesced_predicates->arguments->children = prepared_stages[0].filters; }
where_expression = std::move(coalesced_predicates); }
/// This structure re-implements adding virtual columns while reading from MergeTree part.
/// It would be good to unify it with IMergeTreeSelectAlgorithm.
struct VirtualColumns
{
struct ColumnAndPosition
{
ColumnWithTypeAndName column;
size_t position;
};
using Columns = std::vector<ColumnAndPosition>;
Columns virtuals;
Names columns_to_read;
VirtualColumns(Names required_columns, const MergeTreeData::DataPartPtr & part) : columns_to_read(std::move(required_columns))
{
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (columns_to_read[i] == LightweightDeleteDescription::FILTER_COLUMN.name)
{
LoadedMergeTreeDataPartInfoForReader part_info_reader(part);
if (!part_info_reader.getColumns().contains(LightweightDeleteDescription::FILTER_COLUMN.name))
{
ColumnWithTypeAndName mask_column;
mask_column.type = LightweightDeleteDescription::FILTER_COLUMN.type;
mask_column.column = mask_column.type->createColumnConst(0, 1);
mask_column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(mask_column), .position = i});
}
}
else if (columns_to_read[i] == "_partition_id")
{
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeString>();
column.column = column.type->createColumnConst(0, part->info.partition_id);
column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i});
}
}
if (!virtuals.empty())
{
Names columns_no_virtuals;
columns_no_virtuals.reserve(columns_to_read.size());
size_t next_virtual = 0;
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (next_virtual < virtuals.size() && i == virtuals[next_virtual].position)
++next_virtual;
else
columns_no_virtuals.emplace_back(std::move(columns_to_read[i]));
}
columns_to_read.swap(columns_no_virtuals);
} }
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
} }
return select; void addVirtuals(QueryPlan & plan)
{
auto dag = std::make_unique<ActionsDAG>(plan.getCurrentDataStream().header.getColumnsWithTypeAndName());
for (auto & column : virtuals)
{
const auto & adding_const = dag->addColumn(std::move(column.column));
auto & outputs = dag->getOutputs();
outputs.insert(outputs.begin() + column.position, &adding_const);
}
auto step = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), std::move(dag));
plan.addStep(std::move(step));
}
};
void MutationsInterpreter::Source::read(
Stage & first_stage,
QueryPlan & plan,
const StorageMetadataPtr & snapshot_,
const ContextPtr & context_,
bool apply_deleted_mask_,
bool can_execute_) const
{
auto required_columns = first_stage.expressions_chain.steps.front()->getRequiredColumns().getNames();
auto storage_snapshot = getStorageSnapshot(snapshot_, context_);
if (!can_execute_)
{
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
auto callback = []()
{
return DB::Exception(ErrorCodes::LOGICAL_ERROR, "Cannot execute a mutation because can_execute flag set to false");
};
Pipe pipe(std::make_shared<ThrowingExceptionSource>(header, callback));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
plan.addStep(std::move(read_from_pipe));
return;
}
if (data)
{
const auto & steps = first_stage.expressions_chain.steps;
const auto & names = first_stage.filter_column_names;
size_t num_filters = names.size();
ActionsDAGPtr filter;
if (!first_stage.filter_column_names.empty())
{
ActionsDAG::NodeRawConstPtrs nodes(num_filters);
for (size_t i = 0; i < num_filters; ++i)
nodes[i] = &steps[i]->actions()->findInOutputs(names[i]);
filter = ActionsDAG::buildFilterActionsDAG(nodes, {}, context_);
}
VirtualColumns virtual_columns(std::move(required_columns), part);
createMergeTreeSequentialSource(
plan, *data, storage_snapshot, part, std::move(virtual_columns.columns_to_read), apply_deleted_mask_, filter, context_,
&Poco::Logger::get("MutationsInterpreter"));
virtual_columns.addVirtuals(plan);
}
else
{
auto select = std::make_shared<ASTSelectQuery>();
select->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
for (const auto & column_name : first_stage.output_columns)
select->select()->children.push_back(std::make_shared<ASTIdentifier>(column_name));
/// Don't let select list be empty.
if (select->select()->children.empty())
select->select()->children.push_back(std::make_shared<ASTLiteral>(Field(0)));
if (!first_stage.filters.empty())
{
ASTPtr where_expression;
if (first_stage.filters.size() == 1)
where_expression = first_stage.filters[0];
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "and";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
coalesced_predicates->arguments->children = first_stage.filters;
where_expression = std::move(coalesced_predicates);
}
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
}
SelectQueryInfo query_info;
query_info.query = std::move(select);
size_t max_block_size = context_->getSettingsRef().max_block_size;
size_t max_streams = 1;
storage->read(plan, required_columns, storage_snapshot, query_info, context_, QueryProcessingStage::FetchColumns, max_block_size, max_streams);
if (!plan.isInitialized())
{
/// It may be possible when there is nothing to read from storage.
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(Pipe(std::make_shared<NullSource>(header)));
plan.addStep(std::move(read_from_pipe));
}
}
}
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute);
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
} }
QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const
{ {
for (size_t i_stage = 1; i_stage < prepared_stages.size(); ++i_stage) for (const Stage & stage : prepared_stages)
{ {
const Stage & stage = prepared_stages[i_stage];
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i) for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
{ {
const auto & step = stage.expressions_chain.steps[i]; const auto & step = stage.expressions_chain.steps[i];
@ -988,14 +1211,11 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v
void MutationsInterpreter::validate() void MutationsInterpreter::validate()
{ {
if (!select_interpreter)
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
const Settings & settings = context->getSettingsRef(); const Settings & settings = context->getSettingsRef();
/// For Replicated* storages mutations cannot employ non-deterministic functions /// For Replicated* storages mutations cannot employ non-deterministic functions
/// because that produces inconsistencies between replicas /// because that produces inconsistencies between replicas
if (startsWith(storage->getName(), "Replicated") && !settings.allow_nondeterministic_mutations) if (startsWith(source.getStorage()->getName(), "Replicated") && !settings.allow_nondeterministic_mutations)
{ {
for (const auto & command : commands) for (const auto & command : commands)
{ {
@ -1012,7 +1232,7 @@ void MutationsInterpreter::validate()
} }
QueryPlan plan; QueryPlan plan;
select_interpreter->buildQueryPlan(plan); initQueryPlan(stages.front(), plan);
auto pipeline = addStreamsForLaterStages(stages, plan); auto pipeline = addStreamsForLaterStages(stages, plan);
} }
@ -1021,23 +1241,8 @@ QueryPipelineBuilder MutationsInterpreter::execute()
if (!can_execute) if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
if (!select_interpreter)
{
/// Skip to apply deleted mask for MutateSomePartColumn cases when part has lightweight delete.
if (!apply_deleted_mask)
{
auto context_for_reading = Context::createCopy(context);
context_for_reading->setApplyDeletedMask(apply_deleted_mask);
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context_for_reading, storage, metadata_snapshot, select_limits);
}
else
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, metadata_snapshot, select_limits);
}
QueryPlan plan; QueryPlan plan;
select_interpreter->buildQueryPlan(plan); initQueryPlan(stages.front(), plan);
auto builder = addStreamsForLaterStages(stages, plan); auto builder = addStreamsForLaterStages(stages, plan);
/// Sometimes we update just part of columns (for example UPDATE mutation) /// Sometimes we update just part of columns (for example UPDATE mutation)
@ -1069,11 +1274,7 @@ const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const
size_t MutationsInterpreter::evaluateCommandsSize() size_t MutationsInterpreter::evaluateCommandsSize()
{ {
for (const MutationCommand & command : commands) return prepareQueryAffectedAST(commands, source.getStorage(), context)->size();
if (unlikely(!command.predicate && !command.partition)) /// The command touches all rows.
return mutation_ast->size();
return std::max(prepareQueryAffectedAST(commands, storage, context)->size(), mutation_ast->size());
} }
std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const
@ -1096,7 +1297,7 @@ std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIf
ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const
{ {
return DB::getPartitionAndPredicateExpressionForMutationCommand(command, storage, context); return DB::getPartitionAndPredicateExpressionForMutationCommand(command, source.getStorage(), context);
} }
bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const

View File

@ -19,7 +19,8 @@ using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
/// Return false if the data isn't going to be changed by mutations. /// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations( bool isStorageTouchedByMutations(
const StoragePtr & storage, MergeTreeData & storage,
MergeTreeData::DataPartPtr source_part,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands, const std::vector<MutationCommand> & commands,
ContextMutablePtr context_copy ContextMutablePtr context_copy
@ -35,6 +36,8 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
/// to this data. /// to this data.
class MutationsInterpreter class MutationsInterpreter
{ {
struct Stage;
public: public:
/// Storage to mutate, array of mutations commands and context. If you really want to execute mutation /// Storage to mutate, array of mutations commands and context. If you really want to execute mutation
/// use can_execute = true, in other cases (validation, amount of commands) it can be false /// use can_execute = true, in other cases (validation, amount of commands) it can be false
@ -47,8 +50,18 @@ public:
bool return_all_columns_ = false, bool return_all_columns_ = false,
bool return_deleted_rows_ = false); bool return_deleted_rows_ = false);
void validate(); /// Special case for MergeTree
MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
void validate();
size_t evaluateCommandsSize(); size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices. /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
@ -82,19 +95,60 @@ public:
void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; } void setApplyDeletedMask(bool apply) { apply_deleted_mask = apply; }
/// Internal class which represents a data part for MergeTree
/// or just storage for other storages.
/// The main idea is to create a dedicated reading from MergeTree part.
/// Additionally we propagate some storage properties.
struct Source
{
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const;
StoragePtr getStorage() const;
const MergeTreeData * getMergeTreeData() const;
bool supportsLightweightDelete() const;
bool hasLightweightDeleteMask() const;
bool materializeTTLRecalculateOnly() const;
void read(
Stage & first_stage,
QueryPlan & plan,
const StorageMetadataPtr & snapshot_,
const ContextPtr & context_,
bool apply_deleted_mask_,
bool can_execute_) const;
explicit Source(StoragePtr storage_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_);
private:
StoragePtr storage;
/// Special case for MergeTree.
MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part;
};
private: private:
ASTPtr prepare(bool dry_run); MutationsInterpreter(
Source source_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_);
struct Stage; void prepare(bool dry_run);
ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run); void initQueryPlan(Stage & first_stage, QueryPlan & query_plan);
void prepareMutationStages(std::vector<Stage> &prepared_stages, bool dry_run);
QueryPipelineBuilder addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const; QueryPipelineBuilder addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const;
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const; std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const; ASTPtr getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const;
StoragePtr storage; Source source;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
MutationCommands commands; MutationCommands commands;
ContextPtr context; ContextPtr context;
@ -103,12 +157,6 @@ private:
bool apply_deleted_mask = true; bool apply_deleted_mask = true;
ASTPtr mutation_ast;
/// We have to store interpreter because it use own copy of context
/// and some streams from execute method may use it.
std::unique_ptr<InterpreterSelectQuery> select_interpreter;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the
/// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away /// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away

View File

@ -64,7 +64,6 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.save_marks_in_cache = true, .save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read, .checksum_on_read = settings.checksum_on_read,
.read_in_order = query_info.input_order_info != nullptr, .read_in_order = query_info.input_order_info != nullptr,
.apply_deleted_mask = context->applyDeletedMask(),
.use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree .use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree
&& (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1), && (settings.max_streams_to_max_threads_ratio > 1 || settings.max_streams_for_merge_tree_reading > 1),
}; };

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/ISource.h>
namespace DB
{
/// This source is throwing exception at the first attempt to read from it.
/// Can be used as a additional check that pipeline (or its part) is never executed.
class ThrowingExceptionSource : public ISource
{
public:
using CallBack = std::function<Exception()>;
explicit ThrowingExceptionSource(Block header, CallBack callback_)
: ISource(std::move(header))
, callback(std::move(callback_))
{}
String getName() const override { return "ThrowingExceptionSource"; }
protected:
Chunk generate() override
{
throw callback();
}
CallBack callback;
};
}

View File

@ -110,6 +110,8 @@ public:
/// The name of the table. /// The name of the table.
StorageID getStorageID() const; StorageID getStorageID() const;
virtual bool isMergeTree() const { return false; }
/// Returns true if the storage receives data from a remote server or servers. /// Returns true if the storage receives data from a remote server or servers.
virtual bool isRemote() const { return false; } virtual bool isRemote() const { return false; }

View File

@ -424,6 +424,8 @@ public:
StoragePolicyPtr getStoragePolicy() const override; StoragePolicyPtr getStoragePolicy() const override;
bool isMergeTree() const override { return true; }
bool supportsPrewhere() const override { return true; } bool supportsPrewhere() const override { return true; }
bool supportsFinal() const override; bool supportsFinal() const override;

View File

@ -66,6 +66,13 @@ public:
size_t num_streams, size_t num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const; std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
static MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings,
Poco::Logger * log);
private: private:
const MergeTreeData & data; const MergeTreeData & data;
Poco::Logger * log; Poco::Logger * log;
@ -78,13 +85,6 @@ private:
const Settings & settings, const Settings & settings,
Poco::Logger * log); Poco::Logger * log);
static MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings,
Poco::Logger * log);
static MarkRanges filterMarksUsingIndex( static MarkRanges filterMarksUsingIndex(
MergeTreeIndexPtr index_helper, MergeTreeIndexPtr index_helper,
MergeTreeIndexConditionPtr condition, MergeTreeIndexConditionPtr condition,

View File

@ -1,9 +1,14 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h> #include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h> #include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h> #include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Processors/Transforms/FilterTransform.h> #include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/FilterStep.h>
namespace DB namespace DB
{ {
@ -25,6 +30,8 @@ public:
const StorageSnapshotPtr & storage_snapshot_, const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_, MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_, Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
bool read_with_direct_io_, bool read_with_direct_io_,
bool take_column_types_from_storage, bool take_column_types_from_storage,
bool quiet = false); bool quiet = false);
@ -56,6 +63,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeSequentialSource"); Poco::Logger * log = &Poco::Logger::get("MergeTreeSequentialSource");
std::optional<MarkRanges> mark_ranges;
std::shared_ptr<MarkCache> mark_cache; std::shared_ptr<MarkCache> mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>; using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader; MergeTreeReaderPtr reader;
@ -76,6 +85,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
const StorageSnapshotPtr & storage_snapshot_, const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_, MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_, Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
bool read_with_direct_io_, bool read_with_direct_io_,
bool take_column_types_from_storage, bool take_column_types_from_storage,
bool quiet) bool quiet)
@ -85,6 +96,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
, data_part(std::move(data_part_)) , data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_)) , columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_) , read_with_direct_io(read_with_direct_io_)
, mark_ranges(std::move(mark_ranges_))
, mark_cache(storage.getContext()->getMarkCache()) , mark_cache(storage.getContext()->getMarkCache())
{ {
if (!quiet) if (!quiet)
@ -126,11 +138,15 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
MergeTreeReaderSettings reader_settings = MergeTreeReaderSettings reader_settings =
{ {
.read_settings = read_settings, .read_settings = read_settings,
.save_marks_in_cache = false .save_marks_in_cache = false,
.apply_deleted_mask = apply_deleted_mask,
}; };
if (!mark_ranges)
mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())});
reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata, reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata,
MarkRanges{MarkRange(0, data_part->getMarksCount())}, *mark_ranges,
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {}); /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings, {}, {});
} }
@ -224,8 +240,10 @@ Pipe createMergeTreeSequentialSource(
if (need_to_filter_deleted_rows) if (need_to_filter_deleted_rows)
columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name); columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name);
bool apply_deleted_mask = false;
auto column_part_source = std::make_shared<MergeTreeSequentialSource>( auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
storage, storage_snapshot, data_part, columns, read_with_direct_io, take_column_types_from_storage, quiet); storage, storage_snapshot, data_part, columns, std::optional<MarkRanges>{}, apply_deleted_mask, read_with_direct_io, take_column_types_from_storage, quiet);
Pipe pipe(std::move(column_part_source)); Pipe pipe(std::move(column_part_source));
@ -242,4 +260,92 @@ Pipe createMergeTreeSequentialSource(
return pipe; return pipe;
} }
/// A Query Plan step to read from a single Merge Tree part
/// using Merge Tree Sequential Source (which reads strictly sequentially in a single thread).
/// This step is used for mutations because the usual reading is too tricky.
/// Previously, sequential reading was achieved by changing some settings like max_threads,
/// however, this approach lead to data corruption after some new settings were introduced.
class ReadFromPart final : public ISourceStep
{
public:
ReadFromPart(
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool apply_deleted_mask_,
ActionsDAGPtr filter_,
ContextPtr context_,
Poco::Logger * log_)
: ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)})
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, apply_deleted_mask(apply_deleted_mask_)
, filter(std::move(filter_))
, context(std::move(context_))
, log(log_)
{
}
String getName() const override { return fmt::format("ReadFromPart({})", data_part->name); }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
std::optional<MarkRanges> mark_ranges;
const auto & metadata_snapshot = storage_snapshot->metadata;
if (filter && metadata_snapshot->hasPrimaryKey())
{
const auto & primary_key = storage_snapshot->metadata->getPrimaryKey();
const Names & primary_key_column_names = primary_key.column_names;
KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression, NameSet{});
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
if (!key_condition.alwaysFalse())
mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(
data_part, metadata_snapshot, key_condition, context->getSettingsRef(), log);
if (mark_ranges && mark_ranges->empty())
{
pipeline.init(Pipe(std::make_unique<NullSource>(output_stream->header)));
return;
}
}
auto source = std::make_unique<MergeTreeSequentialSource>(
storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges), apply_deleted_mask, false, true);
pipeline.init(Pipe(std::move(source)));
}
private:
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
MergeTreeData::DataPartPtr data_part;
Names columns_to_read;
bool apply_deleted_mask;
ActionsDAGPtr filter;
ContextPtr context;
Poco::Logger * log;
};
void createMergeTreeSequentialSource(
QueryPlan & plan,
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
bool apply_deleted_mask,
ActionsDAGPtr filter,
ContextPtr context,
Poco::Logger * log)
{
auto reading = std::make_unique<ReadFromPart>(
storage, storage_snapshot, std::move(data_part), std::move(columns_to_read), apply_deleted_mask, filter, std::move(context), log);
plan.addStep(std::move(reading));
}
} }

View File

@ -20,4 +20,17 @@ Pipe createMergeTreeSequentialSource(
bool quiet, bool quiet,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count); std::shared_ptr<std::atomic<size_t>> filtered_rows_count);
class QueryPlan;
void createMergeTreeSequentialSource(
QueryPlan & plan,
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
bool apply_deleted_mask,
ActionsDAGPtr filter,
ContextPtr context,
Poco::Logger * log);
} }

View File

@ -714,8 +714,6 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part; FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part; MergeTreeData::DataPartPtr source_part;
StoragePtr storage_from_source_part;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
MutationCommandsConstPtr commands; MutationCommandsConstPtr commands;
@ -1478,10 +1476,9 @@ MutateTask::MutateTask(
ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical(); ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical();
ctx->txn = txn; ctx->txn = txn;
ctx->source_part = ctx->future_part->parts[0]; ctx->source_part = ctx->future_part->parts[0];
ctx->storage_from_source_part = std::make_shared<StorageFromMergeTreeDataPart>(ctx->source_part);
ctx->need_prefix = need_prefix_; ctx->need_prefix = need_prefix_;
auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_); auto storage_snapshot = ctx->data->getStorageSnapshot(ctx->metadata_snapshot, context_);
extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false); extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false);
} }
@ -1554,7 +1551,7 @@ bool MutateTask::prepare()
} }
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations( if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
{ {
NameSet files_to_copy_instead_of_hardlinks; NameSet files_to_copy_instead_of_hardlinks;
auto settings_ptr = ctx->data->getSettings(); auto settings_ptr = ctx->data->getSettings();
@ -1597,7 +1594,7 @@ bool MutateTask::prepare()
if (!ctx->for_interpreter.empty()) if (!ctx->for_interpreter.empty())
{ {
ctx->interpreter = std::make_unique<MutationsInterpreter>( ctx->interpreter = std::make_unique<MutationsInterpreter>(
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); *ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind(); ctx->mutation_kind = ctx->interpreter->getMutationKind();