add metadata for projpart and remove duplicated code

This commit is contained in:
jsc0218 2024-11-19 00:08:14 +00:00
parent 445879a2ac
commit 8fa8d5ac0f
4 changed files with 15 additions and 243 deletions

View File

@ -450,7 +450,7 @@ MutationsInterpreter::MutationsInterpreter(
}
context = std::move(new_context);
metadata_snapshot ? prepare(!settings.can_execute) : prepareForProjection(!settings.can_execute);
prepare(!settings.can_execute);
}
static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot)
@ -1070,116 +1070,6 @@ void MutationsInterpreter::prepare(bool dry_run)
prepareMutationStages(stages, dry_run);
}
void MutationsInterpreter::prepareForProjection(bool dry_run)
{
if (is_prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "MutationsInterpreter is already prepared. It is a bug.");
if (commands.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty mutation commands list");
auto all_columns = source.getColumns();
NameSet available_columns_set(available_columns.begin(), available_columns.end());
/// Add _row_exists column if it is physically present in the part
if (source.hasLightweightDeleteMask())
{
all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
available_columns_set.insert(RowExistsColumn::name);
}
NameSet updated_columns;
for (auto & command : commands)
{
for (const auto & [name, _] : command.column_to_update_expression)
{
if (!available_columns_set.contains(name) && name != RowExistsColumn::name)
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
"Column {} is updated but not requested to read", name);
updated_columns.insert(name);
}
}
std::vector<String> read_columns;
/// First, break a sequence of commands into stages.
for (const auto & command : commands)
{
if (command.type == MutationCommand::UPDATE)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
for (const auto & [column_name, update_expr] : command.column_to_update_expression)
{
/// When doing UPDATE column = expression WHERE condition
/// we will replace column to the result of the following expression:
///
/// CAST(if(condition, CAST(expression, type), column), type)
///
/// Inner CAST is needed to make 'if' work when branches have no common type,
/// example: type is UInt64, UPDATE x = -1 or UPDATE x = x - 1.
///
/// Outer CAST is added just in case if we don't trust the returning type of 'if'.
DataTypePtr type;
if (column_name == RowExistsColumn::name)
{
type = RowExistsColumn::type;
deleted_mask_updated = true;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name);
}
auto type_literal = std::make_shared<ASTLiteral>(type->getName());
ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command);
auto updated_column = makeASTFunction("_CAST",
makeASTFunction("if",
condition,
makeASTFunction("_CAST",
update_expr->clone(),
type_literal),
std::make_shared<ASTIdentifier>(column_name)),
type_literal);
stages.back().column_to_updated.emplace(column_name, updated_column);
if (condition && settings.return_mutated_rows)
stages.back().filters.push_back(condition);
}
}
else if (command.type == MutationCommand::READ_COLUMN)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
read_columns.emplace_back(command.column_name);
}
else
throw Exception(ErrorCodes::UNKNOWN_MUTATION_COMMAND, "Unknown mutation command type: {}", DB::toString<int>(command.type));
}
if (!read_columns.empty())
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
for (auto & column_name : read_columns)
stages.back().column_to_updated.emplace(column_name, std::make_shared<ASTIdentifier>(column_name));
}
is_prepared = true;
prepareMutationStagesForProjection(stages, dry_run);
}
void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_stages, bool dry_run)
{
auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);
@ -1309,132 +1199,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
}
}
void MutationsInterpreter::prepareMutationStagesForProjection(std::vector<Stage> & prepared_stages, bool dry_run)
{
auto all_columns = source.getColumns();
/// Add _row_exists column if it is present in the part
if (source.hasLightweightDeleteMask() || deleted_mask_updated)
all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
bool has_filters = false;
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
{
if (settings.return_all_columns || !prepared_stages[i].filters.empty())
{
for (const auto & column : all_columns)
{
if (column.name == RowExistsColumn::name && !deleted_mask_updated)
continue;
prepared_stages[i].output_columns.insert(column.name);
}
has_filters = true;
settings.apply_deleted_mask = true;
}
else
{
if (i > 0)
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
/// Make sure that all updated columns are included into output_columns set.
/// This is important for a "hidden" column like _row_exists gets because it is a virtual column
/// and so it is not in the list of AllPhysical columns.
for (const auto & [column_name, _] : prepared_stages[i].column_to_updated)
{
if (column_name == RowExistsColumn::name && has_filters && !deleted_mask_updated)
continue;
prepared_stages[i].output_columns.insert(column_name);
}
}
}
/// Now, calculate `expressions_chain` for each stage except the first.
/// Do it backwards to propagate information about columns required as input for a stage to the previous stage.
for (int64_t i = prepared_stages.size() - 1; i >= 0; --i)
{
auto & stage = prepared_stages[i];
ASTPtr all_asts = std::make_shared<ASTExpressionList>();
for (const auto & ast : stage.filters)
all_asts->children.push_back(ast);
for (const auto & kv : stage.column_to_updated)
all_asts->children.push_back(kv.second);
/// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns.
for (const auto & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
/// Executing scalar subquery on that stage can lead to deadlock
/// e.g. ALTER referencing the same table in scalar subquery
bool execute_scalar_subqueries = !dry_run;
auto syntax_result = TreeRewriter(context).analyze(
all_asts, all_columns, source.getStorage(), nullptr,
false, true, execute_scalar_subqueries);
stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context);
ExpressionActionsChain & actions_chain = stage.expressions_chain;
if (!stage.filters.empty())
{
auto ast = stage.filters.front();
if (stage.filters.size() > 1)
ast = makeASTForLogicalAnd(std::move(stage.filters));
if (!actions_chain.steps.empty())
actions_chain.addStep();
stage.analyzer->appendExpression(actions_chain, ast, dry_run);
stage.filter_column_names.push_back(ast->getColumnName());
}
if (!stage.column_to_updated.empty())
{
if (!actions_chain.steps.empty())
actions_chain.addStep();
for (const auto & kv : stage.column_to_updated)
stage.analyzer->appendExpression(actions_chain, kv.second, dry_run);
auto & actions = actions_chain.getLastStep().actions();
for (const auto & kv : stage.column_to_updated)
{
auto column_name = kv.second->getColumnName();
const auto & dag_node = actions->dag.findInOutputs(column_name);
const auto & alias = actions->dag.addAlias(dag_node, kv.first);
actions->dag.addOrReplaceInOutputs(alias);
}
}
if (i == 0 && actions_chain.steps.empty())
actions_chain.lastStep(syntax_result->required_source_columns);
/// Remove all intermediate columns.
actions_chain.addStep();
actions_chain.getLastStep().required_output.clear();
ActionsDAG::NodeRawConstPtrs new_index;
for (const auto & name : stage.output_columns)
actions_chain.getLastStep().addRequiredOutput(name);
actions_chain.getLastActions();
actions_chain.finalize();
if (i)
{
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front()->getRequiredColumns())
prepared_stages[i - 1].output_columns.insert(column.name);
}
}
}
void MutationsInterpreter::Source::read(
Stage & first_stage,
QueryPlan & plan,

View File

@ -162,11 +162,8 @@ private:
void prepare(bool dry_run);
void prepareForProjection(bool dry_run);
void initQueryPlan(Stage & first_stage, QueryPlan & query_plan);
void prepareMutationStages(std::vector<Stage> &prepared_stages, bool dry_run);
void prepareMutationStagesForProjection(std::vector<Stage> &prepared_stages, bool dry_run);
QueryPipelineBuilder addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, QueryPlan & plan) const;
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;

View File

@ -1008,7 +1008,7 @@ TreeRewriterResult::TreeRewriterResult(
/// Special columns are non physical columns, for example ALIAS
void TreeRewriterResult::collectSourceColumns(bool add_special)
{
if (storage && storage_snapshot)
if (storage)
{
auto options = GetColumnsOptions(add_special ? GetColumnsOptions::All : GetColumnsOptions::AllPhysical);
options.withExtendedObjects();

View File

@ -1002,6 +1002,16 @@ void finalizeMutatedPart(
new_data_part->default_codec = codec;
}
static StorageMetadataPtr generateProjectionPartMetadata(
MergeTreeData::DataPartPtr projection_part,
StorageMetadataPtr table_metadata[[maybe_unused]])
{
auto res = std::make_shared<StorageInMemoryMetadata>();
/// Currently only ColumnsDescription, later add as needed.
res->columns = projection_part->getColumnsDescription();
return res;
}
}
struct MutationContext
@ -2289,14 +2299,15 @@ bool MutateTask::prepare()
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
/// might be better to create metadata_snapshot for projection part.
const auto & proj_desc = *(ctx->metadata_snapshot->getProjections().begin());
const auto & projections_name_and_part = ctx->source_part->getProjectionParts();
MergeTreeData::DataPartPtr projection_part = projections_name_and_part.begin()->second;
ctx->projection_metadata_snapshot = MutationHelpers::generateProjectionPartMetadata(projection_part, ctx->metadata_snapshot);
auto projection_interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, projection_part, alter_conversions,
nullptr, ctx->for_interpreter,
ctx->projection_metadata_snapshot, ctx->for_interpreter,
proj_desc.metadata->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->projection_mutating_pipeline_builder = projection_interpreter->execute();