Aggregate Projections analysis using query plan [In progress]

This commit is contained in:
Nikolai Kochetov 2023-01-31 20:33:01 +00:00
parent f09f8f80af
commit e2c32ccbca
7 changed files with 179 additions and 18 deletions

View File

@ -406,6 +406,11 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
return;
}
if (input_streams.size() > 1)
{
}
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
{
@ -465,6 +470,24 @@ void AggregatingStep::describePipeline(FormatSettings & settings) const
}
}
void AggregatingStep::requestOnlyMergeForAggregateProjection(const DataStream & input_stream)
{
auto cur_header = getOutputStream().header;
input_streams.front() = input_stream;
params.only_merge = true;
updateOutputStream();
assertBlocksHaveEqualStructure(cur_header, getOutputStream().header, "AggregatingStep");
}
void AggregatingStep::appendAggregateProjection(const DataStream & input_stream)
{
input_streams.emplace_back(input_stream);
params.only_merge = true;
auto added_header = appendGroupingColumn(params.getHeader(input_streams.front().header, final), params.keys, !grouping_sets_params.empty(), group_by_use_nulls);
assertBlocksHaveEqualStructure(getOutputStream().header, added_header, "AggregatingStep");
params.only_merge = false;
}
void AggregatingStep::updateOutputStream()
{
output_stream = createOutputStream(

View File

@ -60,6 +60,9 @@ public:
void applyOrder(SortDescription sort_description_for_merging_, SortDescription group_by_sort_description_);
bool memoryBoundMergingWillBeUsed() const;
void requestOnlyMergeForAggregateProjection(const DataStream & input_stream);
void appendAggregateProjection(const DataStream & input_stream);
private:
void updateOutputStream() override;

View File

@ -9,6 +9,8 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsLogical.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <stack>
namespace DB::QueryPlanOptimizations
@ -16,7 +18,7 @@ namespace DB::QueryPlanOptimizations
QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();\
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
/// Already read-in-order, skip.
@ -150,6 +152,9 @@ struct AggregateProjectionCandidate
AggregateProjectionInfo info;
const ProjectionDescription * projection;
ActionsDAGPtr dag;
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;
};
ActionsDAGPtr analyzeAggregateProjection(
@ -360,7 +365,7 @@ ActionsDAGPtr analyzeAggregateProjection(
return query_dag.foldActionsByProjection(new_inputs);
}
void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &)
void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
if (node.children.size() != 1)
return;
@ -432,6 +437,128 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &)
if (candidates.empty())
return;
AggregateProjectionCandidate * best_candidate = nullptr;
size_t best_candidate_marks = 0;
const auto & parts = reading->getParts();
const auto & query_info = reading->getQueryInfo();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (context->getSettingsRef().select_sequential_consistency)
{
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
}
for (auto & candidate : candidates)
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
{
const auto & created_projections = part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end())
projection_parts.push_back(it->second);
else
normal_parts.push_back(part);
}
if (projection_parts.empty())
continue;
ActionDAGNodes added_filter_nodes;
if (filter_node)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
auto projection_result_ptr = reader.estimateNumMarksToRead(
projection_parts,
nullptr,
candidate.dag->getRequiredColumnsNames(),
metadata,
candidate.projection->metadata,
query_info, /// How it is actually used? I hope that for index we need only added_filter_nodes
added_filter_nodes,
context,
context->getSettingsRef().max_threads,
max_added_blocks);
if (projection_result_ptr->error())
continue;
size_t sum_marks = projection_result_ptr->marks();
if (!normal_parts.empty())
{
auto normal_result_ptr = reading->selectRangesToRead(std::move(normal_parts));
if (normal_result_ptr->error())
continue;
if (normal_result_ptr->marks() != 0)
{
sum_marks += normal_result_ptr->marks();
candidate.merge_tree_normal_select_result_ptr = std::move(normal_result_ptr);
}
}
candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr);
if (best_candidate == nullptr || best_candidate_marks > sum_marks)
{
best_candidate = &candidate;
best_candidate_marks = sum_marks;
}
}
if (!best_candidate)
return;
auto projection_reading = reader.readFromParts(
{},
best_candidate->dag->getRequiredColumnsNames(),
reading->getStorageSnapshot(),
query_info,
context,
reading->getMaxBlockSize(),
reading->getNumStreams(),
max_added_blocks,
best_candidate->merge_tree_projection_select_result_ptr,
reading->isParallelReadingEnabled());
projection_reading->setStepDescription(best_candidate->projection->name);
if (!best_candidate->merge_tree_normal_select_result_ptr)
{
/// All parts are taken from projection
auto & projection_reading_node = nodes.emplace_back(QueryPlan::Node{.step = std::move(projection_reading)});
auto & expr_or_filter_node = nodes.emplace_back();
if (filter_node)
{
expr_or_filter_node.step = std::make_unique<FilterStep>(
projection_reading_node.step->getOutputStream(),
best_candidate->dag,
best_candidate->dag->getOutputs().front()->result_name,
true);
}
else
expr_or_filter_node.step = std::make_unique<ExpressionStep>(
projection_reading_node.step->getOutputStream(),
best_candidate->dag);
expr_or_filter_node.children.push_back(&projection_reading_node);
aggregating->requestOnlyMergeForAggregateProjection(expr_or_filter_node.step->getOutputStream());
node.children.front() = &expr_or_filter_node;
optimizeAggregationInOrder(node, nodes);
return;
}
}

View File

@ -147,13 +147,23 @@ public:
bool sample_factor_column_queried,
Poco::Logger * log);
MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
ContextPtr getContext() const { return context; }
const SelectQueryInfo & getQueryInfo() const { return query_info; }
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
StorageSnapshotPtr getStorageSnapshot() const { return storage_snapshot; }
const PrewhereInfo * getPrewhereInfo() const { return prewhere_info.get(); }
void requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const MergeTreeData & getMergeTreeData() const { return data; }
const Names & getRealColumnNames() const { return real_column_names; }
size_t getMaxBlockSize() const { return max_block_size; }
size_t getNumStreams() const { return requested_num_streams; }
bool isParallelReadingEnabled() const { return read_task_callback != std::nullopt; }
private:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,
@ -232,7 +242,6 @@ private:
const Names & column_names,
ActionsDAGPtr & out_projection);
MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr;

View File

@ -156,7 +156,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!query_info.projection)
{
auto plan = readFromParts(
auto step = readFromParts(
query_info.merge_tree_select_result_ptr ? MergeTreeData::DataPartsVector{} : parts,
column_names_to_return,
storage_snapshot,
@ -168,12 +168,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
query_info.merge_tree_select_result_ptr,
enable_parallel_reading);
if (plan->isInitialized() && settings.allow_experimental_projection_optimization && settings.force_optimize_projection
if (!step && settings.allow_experimental_projection_optimization && settings.force_optimize_projection
&& !metadata_for_reading->projections.empty())
throw Exception(
"No projection is used when allow_experimental_projection_optimization = 1 and force_optimize_projection = 1",
ErrorCodes::PROJECTION_NOT_USED);
auto plan = std::make_unique<QueryPlan>();
plan->addStep(std::move(step));
return plan;
}
@ -197,7 +199,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
else if (query_info.projection->merge_tree_projection_select_result_ptr)
{
LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", "));
projection_plan = readFromParts(
projection_plan->addStep(readFromParts(
{},
query_info.projection->required_columns,
storage_snapshot,
@ -207,7 +209,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
num_streams,
max_block_numbers_to_read,
query_info.projection->merge_tree_projection_select_result_ptr,
enable_parallel_reading);
enable_parallel_reading));
}
if (projection_plan->isInitialized())
@ -1337,7 +1339,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
log);
}
QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageSnapshotPtr & storage_snapshot,
@ -1353,10 +1355,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (merge_tree_select_result_ptr)
{
if (merge_tree_select_result_ptr->marks() == 0)
return std::make_unique<QueryPlan>();
return {};
}
else if (parts.empty())
return std::make_unique<QueryPlan>();
return {};
Names real_column_names;
Names virt_column_names;
@ -1366,7 +1368,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(
return std::make_unique<ReadFromMergeTree>(
std::move(parts),
real_column_names,
virt_column_names,
@ -1382,10 +1384,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
merge_tree_select_result_ptr,
enable_parallel_reading
);
QueryPlanPtr plan = std::make_unique<QueryPlan>();
plan->addStep(std::move(read_from_merge_tree));
return plan;
}

View File

@ -39,7 +39,7 @@ public:
bool enable_parallel_reading = false) const;
/// The same as read, but with specified set of parts.
QueryPlanPtr readFromParts(
QueryPlanStepPtr readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -322,12 +322,13 @@ public:
const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper);
bool canUseZeroCopyReplication() const;
private:
std::atomic_bool are_restoring_replica {false};
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
private:
std::atomic_bool are_restoring_replica {false};
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();