Refactor a bit more.

This commit is contained in:
Nikolai Kochetov 2023-02-24 19:59:13 +00:00
parent 030a0ba7fb
commit 53b006dd5c

View File

@ -124,7 +124,7 @@ bool QueryDAG::build(QueryPlan::Node & node)
return false;
}
bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
static bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
{
/// Probably some projection already was applied.
if (reading->hasAnalyzedResult())
@ -615,8 +615,8 @@ ActionsDAGPtr analyzeAggregateProjection(
struct MinMaxProjectionCandidate
{
AggregateProjectionCandidate candidate;
Block minmax_count_projection_block;
MergeTreeData::DataPartsVector minmax_projection_normal_parts;
Block block;
MergeTreeData::DataPartsVector normal_parts;
};
struct AggregateProjectionCandidates
@ -693,8 +693,8 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
{
MinMaxProjectionCandidate minmax;
minmax.candidate = std::move(candidate);
minmax.minmax_count_projection_block = std::move(block);
minmax.minmax_projection_normal_parts = std::move(minmax_projection_normal_parts);
minmax.block = std::move(block);
minmax.normal_parts = std::move(minmax_projection_normal_parts);
minmax.candidate.projection = projection;
candidates.minmax_projection.emplace(std::move(minmax));
}
@ -722,6 +722,19 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
return candidates;
}
static std::shared_ptr<PartitionIdToMaxBlock> getMaxAddedBlocks(ReadFromMergeTree * reading)
{
ContextPtr context = reading->getContext();
if (context->getSettingsRef().select_sequential_consistency)
{
if (const auto * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
return std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
}
return {};
}
bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
if (node.children.size() != 1)
@ -745,15 +758,7 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
if (!canUseProjectionForReadingStep(reading))
return false;
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (context->getSettingsRef().select_sequential_consistency)
{
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
}
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks);
@ -763,10 +768,13 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
else if (candidates.real.empty())
return false;
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
const auto & parts = reading->getParts();
const auto & query_info = reading->getQueryInfo();
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
/// Selecting best candidate.
for (auto & candidate : candidates.real)
{
MergeTreeData::DataPartsVector projection_parts;
@ -830,25 +838,28 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
QueryPlanStepPtr projection_reading;
bool has_nornal_parts;
/// Add reading from projection step.
if (candidates.minmax_projection)
{
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}", candidates.minmax_projection->minmax_count_projection_block.dumpStructure());
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}",
// candidates.minmax_projection->block.dumpStructure());
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->minmax_count_projection_block)));
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
has_nornal_parts = !candidates.minmax_projection->minmax_projection_normal_parts.empty();
has_nornal_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_nornal_parts)
reading->resetParts(std::move(candidates.minmax_projection->minmax_projection_normal_parts));
reading->resetParts(std::move(candidates.minmax_projection->normal_parts));
}
else
{
auto storage_snapshot = reading->getStorageSnapshot();
auto proj_snapshot = std::make_shared<StorageSnapshot>(
storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data);
storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns);
proj_snapshot->addProjection(best_candidate->projection);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}",
// proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;
@ -867,7 +878,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
if (!projection_reading)
{
Pipe pipe(std::make_shared<NullSource>(proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames())));
auto header = proj_snapshot->getSampleBlockForColumns(best_candidate->dag->getRequiredColumnsNames());
Pipe pipe(std::make_shared<NullSource>(std::move(header)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
}
@ -876,7 +888,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
reading->setAnalyzedResult(std::move(best_candidate->merge_tree_normal_select_result_ptr));
}
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure());
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}",
// projection_reading->getOutputStream().header.dumpStructure());
projection_reading->setStepDescription(best_candidate->projection->name);
@ -901,12 +914,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
if (!has_nornal_parts)
{
/// All parts are taken from projection
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Expr stream {}", expr_or_filter_node.step->getOutputStream().header.dumpStructure());
aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream());
node.children.front() = &expr_or_filter_node;
//optimizeAggregationInOrder(node, nodes);
}
else
{
@ -917,7 +926,8 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
return true;
}
ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_header)
static ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_header)
{
/// Materialize constants in case we don't have it in output header.
/// This may happen e.g. if we have PREWHERE.
@ -951,6 +961,16 @@ ActionsDAGPtr makeMaterializingDAG(const Block & proj_header, const Block main_h
return dag;
}
static bool hasAllRequiredColumns(const ProjectionDescription * projection, const Names & required_columns)
{
for (const auto & col : required_columns)
{
if (!projection->sample_block.has(col))
return false;
}
return true;
}
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
@ -1009,28 +1029,14 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
auto ordinary_reading_select_result = reading->selectRangesToRead(parts);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Marks for ordinary reading {}", ordinary_reading_marks);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"),
// "Marks for ordinary reading {}", ordinary_reading_marks);
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (context->getSettingsRef().select_sequential_consistency)
{
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
}
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
for (const auto * projection : normal_projections)
{
bool has_all_columns = true;
for (const auto & col : required_columns)
{
if (!projection->sample_block.has(col))
{
has_all_columns = false;
break;
}
}
if (!has_all_columns)
if (!hasAllRequiredColumns(projection, required_columns))
continue;
MergeTreeData::DataPartsVector projection_parts;
@ -1086,7 +1092,15 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
}
}
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Marks for projection {} {}", projection->name ,candidate.sum_marks);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"),
// "Marks for projection {} {}", projection->name ,candidate.sum_marks);
// if (candidate.sum_marks > ordinary_reading_marks)
// continue;
// if (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks)
// best_candidate = &candidate;
if (candidate.sum_marks < ordinary_reading_marks && (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks))
best_candidate = &candidate;
}
@ -1102,7 +1116,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns); //, storage_snapshot->data);
proj_snapshot->addProjection(best_candidate->projection);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}", proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}",
// proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;
@ -1129,7 +1144,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (has_nornal_parts)
reading->setAnalyzedResult(std::move(best_candidate->merge_tree_normal_select_result_ptr));
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}", projection_reading->getOutputStream().header.dumpStructure());
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection reading header {}",
// projection_reading->getOutputStream().header.dumpStructure());
projection_reading->setStepDescription(best_candidate->projection->name);
@ -1169,12 +1185,6 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (auto materializing = makeMaterializingDAG(proj_stream->header, main_stream.header))
{
// auto convert_actions_dag = ActionsDAG::makeConvertingActions(
// proj_stream->header.getColumnsWithTypeAndName(),
// main_stream.header.getColumnsWithTypeAndName(),
// ActionsDAG::MatchColumnsMode::Name,
// true);
auto converting = std::make_unique<ExpressionStep>(*proj_stream, materializing);
proj_stream = &converting->getOutputStream();
auto & expr_node = nodes.emplace_back();
@ -1189,6 +1199,8 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
union_node.children = {iter->node->children.front(), next_node};
iter->node->children.front() = &union_node;
/// Here we remove last steps from stack to be able to optimize again.
/// In theory, read-in-order can be applied to projection.
iter->next_child = 0;
stack.resize(iter.base() - stack.begin() + 1);
}