Refactor a bit.

This commit is contained in:
Nikolai Kochetov 2023-02-27 16:43:54 +00:00
parent 0400677483
commit 9bf828cc98

View File

@ -735,6 +735,68 @@ static std::shared_ptr<PartitionIdToMaxBlock> getMaxAddedBlocks(ReadFromMergeTre
return {};
}
static bool analyzeProjectionCandidate(
ProjectionCandidate & candidate,
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks,
const ActionDAGNodes & added_filter_nodes)
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
{
const auto & created_projections = part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end())
projection_parts.push_back(it->second);
else
normal_parts.push_back(part);
}
if (projection_parts.empty())
return false;
auto projection_result_ptr = reader.estimateNumMarksToRead(
std::move(projection_parts),
nullptr,
required_column_names,
metadata,
candidate.projection->metadata,
query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
added_filter_nodes,
context,
context->getSettingsRef().max_threads,
max_added_blocks);
if (projection_result_ptr->error())
return false;
candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks();
if (!normal_parts.empty())
{
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts));
if (normal_result_ptr->error())
return false;
if (normal_result_ptr->marks() != 0)
{
candidate.sum_marks += normal_result_ptr->marks();
candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr);
}
}
return true;
}
bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
if (node.children.size() != 1)
@ -777,57 +839,18 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
/// Selecting best candidate.
for (auto & candidate : candidates.real)
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
{
const auto & created_projections = part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end())
projection_parts.push_back(it->second);
else
normal_parts.push_back(part);
}
if (projection_parts.empty())
continue;
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
auto projection_result_ptr = reader.estimateNumMarksToRead(
std::move(projection_parts),
nullptr,
candidate.dag->getRequiredColumnsNames(),
metadata,
candidate.projection->metadata,
query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
added_filter_nodes,
context,
context->getSettingsRef().max_threads,
max_added_blocks);
bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_column_names, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
if (projection_result_ptr->error())
if (!analyzed)
continue;
candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks();
if (!normal_parts.empty())
{
auto normal_result_ptr = reading->selectRangesToRead(std::move(normal_parts));
if (normal_result_ptr->error())
continue;
if (normal_result_ptr->marks() != 0)
{
candidate.sum_marks += normal_result_ptr->marks();
candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr);
}
}
if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
best_candidate = &candidate;
}
@ -858,9 +881,6 @@ bool optimizeUseAggProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
storage_snapshot->storage, storage_snapshot->metadata, storage_snapshot->object_columns);
proj_snapshot->addProjection(best_candidate->projection);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj snapshot {}",
// proj_snapshot->getColumns(GetColumnsOptions::Kind::All).toString());
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;
@ -1043,59 +1063,20 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (!hasAllRequiredColumns(projection, required_columns))
continue;
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
{
const auto & created_projections = part->getProjectionParts();
auto it = created_projections.find(projection->name);
if (it != created_projections.end())
projection_parts.push_back(it->second);
else
normal_parts.push_back(part);
}
if (projection_parts.empty())
continue;
auto & candidate = candidates.emplace_back();
candidate.projection = projection;
ActionDAGNodes added_filter_nodes;
if (query.filter_node)
added_filter_nodes.nodes.push_back(query.filter_node);
auto projection_result_ptr = reader.estimateNumMarksToRead(
std::move(projection_parts),
nullptr,
required_columns,
metadata,
projection->metadata,
query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
added_filter_nodes,
context,
context->getSettingsRef().max_threads,
max_added_blocks);
bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_columns, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
if (projection_result_ptr->error())
if (!analyzed)
continue;
auto & candidate = candidates.emplace_back();
candidate.projection = projection;
candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks();
if (!normal_parts.empty())
{
auto normal_result_ptr = reading->selectRangesToRead(std::move(normal_parts));
if (normal_result_ptr->error())
continue;
if (normal_result_ptr->marks() != 0)
{
candidate.sum_marks += normal_result_ptr->marks();
candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr);
}
}
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"),
// "Marks for projection {} {}", projection->name ,candidate.sum_marks);