prepare to add meta for proj part

This commit is contained in:
jsc0218 2024-11-18 02:09:50 +00:00
parent 130a1151fe
commit 445879a2ac
4 changed files with 144 additions and 12 deletions

View File

@ -450,7 +450,7 @@ MutationsInterpreter::MutationsInterpreter(
}
context = std::move(new_context);
prepare(!settings.can_execute);
metadata_snapshot ? prepare(!settings.can_execute) : prepareForProjection(!settings.can_execute);
}
static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot)
@ -1177,7 +1177,7 @@ void MutationsInterpreter::prepareForProjection(bool dry_run)
}
is_prepared = true;
prepareMutationStages(stages, dry_run);
prepareMutationStagesForProjection(stages, dry_run);
}
void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_stages, bool dry_run)
@ -1309,6 +1309,132 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
}
}
void MutationsInterpreter::prepareMutationStagesForProjection(std::vector<Stage> & prepared_stages, bool dry_run)
{
auto all_columns = source.getColumns();
/// Add _row_exists column if it is present in the part
if (source.hasLightweightDeleteMask() || deleted_mask_updated)
all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
bool has_filters = false;
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
{
if (settings.return_all_columns || !prepared_stages[i].filters.empty())
{
for (const auto & column : all_columns)
{
if (column.name == RowExistsColumn::name && !deleted_mask_updated)
continue;
prepared_stages[i].output_columns.insert(column.name);
}
has_filters = true;
settings.apply_deleted_mask = true;
}
else
{
if (i > 0)
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
/// Make sure that all updated columns are included into output_columns set.
/// This is important for a "hidden" column like _row_exists gets because it is a virtual column
/// and so it is not in the list of AllPhysical columns.
for (const auto & [column_name, _] : prepared_stages[i].column_to_updated)
{
if (column_name == RowExistsColumn::name && has_filters && !deleted_mask_updated)
continue;
prepared_stages[i].output_columns.insert(column_name);
}
}
}
/// Now, calculate `expressions_chain` for each stage except the first.
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
for (int64_t i = prepared_stages.size() - 1; i >= 0; --i)
{
auto & stage = prepared_stages[i];
ASTPtr all_asts = std::make_shared<ASTExpressionList>();
for (const auto & ast : stage.filters)
all_asts->children.push_back(ast);
for (const auto & kv : stage.column_to_updated)
all_asts->children.push_back(kv.second);
/// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
for (const auto & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
/// Executing scalar subquery on that stage can lead to deadlock
/// e.g. ALTER referencing the same table in scalar subquery
bool execute_scalar_subqueries = !dry_run;
auto syntax_result = TreeRewriter(context).analyze(
all_asts, all_columns, source.getStorage(), nullptr,
false, true, execute_scalar_subqueries);
stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context);
ExpressionActionsChain & actions_chain = stage.expressions_chain;
if (!stage.filters.empty())
{
auto ast = stage.filters.front();
if (stage.filters.size() > 1)
ast = makeASTForLogicalAnd(std::move(stage.filters));
if (!actions_chain.steps.empty())
actions_chain.addStep();
stage.analyzer->appendExpression(actions_chain, ast, dry_run);
stage.filter_column_names.push_back(ast->getColumnName());
}
if (!stage.column_to_updated.empty())
{
if (!actions_chain.steps.empty())
actions_chain.addStep();
for (const auto & kv : stage.column_to_updated)
stage.analyzer->appendExpression(actions_chain, kv.second, dry_run);
auto & actions = actions_chain.getLastStep().actions();
for (const auto & kv : stage.column_to_updated)
{
auto column_name = kv.second->getColumnName();
const auto & dag_node = actions->dag.findInOutputs(column_name);
const auto & alias = actions->dag.addAlias(dag_node, kv.first);
actions->dag.addOrReplaceInOutputs(alias);
}
}
if (i == 0 && actions_chain.steps.empty())
actions_chain.lastStep(syntax_result->required_source_columns);
/// Remove all intermediate columns.
actions_chain.addStep();
actions_chain.getLastStep().required_output.clear();
ActionsDAG::NodeRawConstPtrs new_index;
for (const auto & name : stage.output_columns)
actions_chain.getLastStep().addRequiredOutput(name);
actions_chain.getLastActions();
actions_chain.finalize();
if (i)
{
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
prepared_stages[i - 1].output_columns.insert(column.name);
}
}
}
void MutationsInterpreter::Source::read(
Stage & first_stage,
QueryPlan & plan,

View File

@ -166,6 +166,7 @@ private:
void initQueryPlan(Stage & first_stage, QueryPlan & query_plan);
void prepareMutationStages(std::vector<Stage> &prepared_stages, bool dry_run);
void prepareMutationStagesForProjection(std::vector<Stage> &prepared_stages, bool dry_run);
QueryPipelineBuilder addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const;
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;

View File

@ -1008,7 +1008,7 @@ TreeRewriterResult::TreeRewriterResult(
/// Special columns are non physical columns, for example ALIAS
void TreeRewriterResult::collectSourceColumns(bool add_special)
{
if (storage)
if (storage && storage_snapshot)
{
auto options = GetColumnsOptions(add_special ? GetColumnsOptions::All : GetColumnsOptions::AllPhysical);
options.withExtendedObjects();

View File

@ -1017,6 +1017,8 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part;
StorageMetadataPtr metadata_snapshot;
/// for projection part only, changed from table part
StorageMetadataPtr projection_metadata_snapshot;
MutationCommandsConstPtr commands;
time_t time_of_mutation;
@ -2270,10 +2272,6 @@ bool MutateTask::prepare()
if (!ctx->for_interpreter.empty())
{
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second;
/// Always disable filtering in mutations: we want to read and write all rows because for updates we rewrite only some of the
/// columns and preserve the columns that are not affected, but after the update all columns must have the same number of row
MutationsInterpreter::Settings settings(true);
@ -2284,11 +2282,6 @@ bool MutateTask::prepare()
ctx->metadata_snapshot, ctx->for_interpreter,
ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings);
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, projection_part, alter_conversions,
ctx->metadata_snapshot, ctx->for_interpreter,
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_statistics = ctx->interpreter->grabMaterializedStatistics();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
@ -2296,6 +2289,18 @@ bool MutateTask::prepare()
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
/// might be better to create metadata_snapshot for projection part.
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second;
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, projection_part, alter_conversions,
nullptr, ctx->for_interpreter,
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->projection_mutating_pipeline_builder = projection_interpreter->execute();
lightweight_delete_mode = ctx->updated_header.has(RowExistsColumn::name);
/// If under the condition of lightweight delete mode with rebuild option, add projections again here as we can only know
/// the condition as early as from here.