This commit is contained in:
Nikolai Kochetov 2022-12-30 17:45:25 +00:00
parent 5f38e17941
commit 9ecb9195ac
2 changed files with 67 additions and 89 deletions

View File

@ -289,6 +289,15 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func;
}
MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_))
{
}
MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_)
: data(&storage_), part(std::move(source_part_))
{
}
StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const
{
if (data)
@ -297,14 +306,6 @@ StorageSnapshotPtr MutationsInterpreter::Source::getStorageSnapshot(const Storag
return storage->getStorageSnapshot(snapshot_, context_);
}
bool MutationsInterpreter::Source::supportsLightweightDelete() const
{
if (part)
return part->supportLightweightDeleteMutate();
return storage->supportsLightweightDelete();
}
StoragePtr MutationsInterpreter::Source::getStorage() const
{
if (data)
@ -313,8 +314,35 @@ StoragePtr MutationsInterpreter::Source::getStorage() const
return storage;
}
const MergeTreeData * MutationsInterpreter::Source::getMergeTreeData() const
{
if (data)
return data;
return dynamic_cast<const MergeTreeData *>(storage.get());
}
bool MutationsInterpreter::Source::supportsLightweightDelete() const
{
if (part)
return part->supportLightweightDeleteMutate();
return storage->supportsLightweightDelete();
}
bool MutationsInterpreter::Source::hasLightweightDeleteMask() const
{
return part && part->hasLightweightDelete();
}
bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const
{
return data && data->getSettings()->materialize_ttl_recalculate_only;
}
MutationsInterpreter::MutationsInterpreter(
const StoragePtr & storage_,
StoragePtr storage_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
@ -322,17 +350,16 @@ MutationsInterpreter::MutationsInterpreter(
bool return_all_columns_,
bool return_deleted_rows_)
: MutationsInterpreter(
Source{.storage = storage_},
Source(std::move(storage_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
{
if (can_execute_ && (dynamic_cast<const MergeTreeData *>(storage_.get()) || storage_->getName() == "MergeTree"))
if (can_execute_ && dynamic_cast<const MergeTreeData *>(source.getStorage().get()))
{
const auto & t = *storage_;
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot execute mutation for {} {}. Mutation should be applied to every part separately.",
storage_->getName(), typeid(t).name());
"Cannot execute mutation for {}. Mutation should be applied to every part separately.",
source.getStorage()->getName());
}
}
@ -346,7 +373,7 @@ MutationsInterpreter::MutationsInterpreter(
bool return_all_columns_,
bool return_deleted_rows_)
: MutationsInterpreter(
Source{.data = &storage_, .part = std::move(source_part_)},
Source(storage_, std::move(source_part_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
{
@ -374,7 +401,7 @@ MutationsInterpreter::MutationsInterpreter(
static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot)
{
const MergeTreeData * merge_tree_data = source.data ? source.data : dynamic_cast<const MergeTreeData *>(source.storage.get());
const MergeTreeData * merge_tree_data = source.getMergeTreeData();
if (!merge_tree_data)
return {};
@ -495,7 +522,7 @@ void MutationsInterpreter::prepare(bool dry_run)
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
NameSet updated_columns;
bool materialize_ttl_recalculate_only = source.data && source.data->getSettings()->materialize_ttl_recalculate_only;
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
for (const MutationCommand & command : commands)
{
@ -853,7 +880,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
auto all_columns = storage_snapshot->getColumns(options);
/// Add _row_exists column if it is present in the part
if (source.part && source.part->hasLightweightDelete())
if (source.hasLightweightDeleteMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
/// Next, for each stage calculate columns changed by this and previous stages.
@ -959,6 +986,8 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
}
}
/// This structure re-implements adding virtual columns while reading from MergeTree part.
/// It would be good to unify it with IMergeTreeSelectAlgorithm.
struct VirtualColumns
{
struct ColumnAndPosition
@ -1059,18 +1088,12 @@ void MutationsInterpreter::Source::read(
return;
}
// for (const auto & name : required_columns)
// std::cerr << "====== Required column " + name << std::endl;
const auto & steps = first_stage.expressions_chain.steps;
const auto & names = first_stage.filter_column_names;
size_t num_filters = names.size();
// for (size_t i = 0; i < steps.size(); ++i)
// std::cerr << steps[i]->actions()->dumpDAG() << std::endl;
if (data)
{
const auto & steps = first_stage.expressions_chain.steps;
const auto & names = first_stage.filter_column_names;
size_t num_filters = names.size();
ActionsDAGPtr filter;
if (!first_stage.filter_column_names.empty())
{
@ -1089,23 +1112,6 @@ void MutationsInterpreter::Source::read(
&Poco::Logger::get("MutationsInterpreter"));
virtual_columns.addVirtuals(plan);
// std::cerr << "<<<<<<<<< " << plan.getCurrentDataStream().header.dumpStructure() << std::endl;
// for (size_t i = 0; i < steps.size(); ++i)
// {
// const auto & step = steps[i];
// if (i < names.size())
// {
// /// Execute DELETEs.
// plan.addStep(std::make_unique<FilterStep>(plan.getCurrentDataStream(), step->actions(), names[i], false));
// }
// else
// {
// /// Execute UPDATE or final projection.
// plan.addStep(std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), step->actions()));
// }
// }
}
else
{
@ -1156,25 +1162,6 @@ void MutationsInterpreter::Source::read(
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
source.read(first_stage, plan, metadata_snapshot, context, apply_deleted_mask, can_execute);
// const auto & steps = first_stage.expressions_chain.steps;
// const auto & names = first_stage.filter_column_names;
// for (size_t i = 0; i < steps.size(); ++i)
// {
// const auto & step = steps[i];
// if (i < names.size())
// {
// /// Execute DELETEs.
// plan.addStep(std::make_unique<FilterStep>(plan.getCurrentDataStream(), step->actions(), names[i], false));
// }
// else
// {
// /// Execute UPDATE or final projection.
// plan.addStep(std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), step->actions()));
// }
// }
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
}
@ -1200,18 +1187,6 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v
addCreatingSetsStep(plan, stage.analyzer->getPreparedSets(), context);
}
{
if (can_execute && source.storage && source.storage->getName() == "MergeTree")
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot execute mutation for {}. Mutation should be applied to every part separately.",
source.storage->getName());
// WriteBufferFromOwnString buf;
// plan.explainPlan(buf, {.header = true, .actions = true});
// std::cerr << "Plan 2 " + (source.storage ? source.storage->getName() : "") + " \n" + buf.str() << std::endl;
}
QueryPlanOptimizationSettings do_not_optimize_plan;
do_not_optimize_plan.optimize_plan = false;
@ -1251,11 +1226,6 @@ void MutationsInterpreter::validate()
QueryPlan plan;
initQueryPlan(stages.front(), plan);
// {
// WriteBufferFromOwnString buf;
// plan.explainPlan(buf, {.header = true, .actions = true});
// std::cerr << "Plan 1\n" + buf.str() << std::endl;
// }
auto pipeline = addStreamsForLaterStages(stages, plan);
}

View File

@ -42,7 +42,7 @@ public:
/// Storage to mutate, array of mutations commands and context. If you really want to execute mutation
/// use can_execute = true, in other cases (validation, amount of commands) it can be false
MutationsInterpreter(
const StoragePtr & storage_,
StoragePtr storage_,
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
@ -97,15 +97,13 @@ public:
struct Source
{
StoragePtr storage;
/// Special case for MergeTree.
MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part;
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & snapshot_, const ContextPtr & context_) const;
bool supportsLightweightDelete() const;
StoragePtr getStorage() const;
const MergeTreeData * getMergeTreeData() const;
bool supportsLightweightDelete() const;
bool hasLightweightDeleteMask() const;
bool materializeTTLRecalculateOnly() const;
void read(
Stage & first_stage,
@ -114,6 +112,16 @@ public:
const ContextPtr & context_,
bool apply_deleted_mask_,
bool can_execute_) const;
explicit Source(StoragePtr storage_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_);
private:
StoragePtr storage;
/// Special case for MergeTree.
MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part;
};
private: