Aggregate Projections analysis using query plan [In progress]

This commit is contained in:
Nikolai Kochetov 2023-01-30 20:09:58 +00:00
parent 286a58801e
commit f09f8f80af

View File

@ -6,6 +6,9 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsLogical.h>
#include <stack>
namespace DB::QueryPlanOptimizations
@ -119,7 +122,6 @@ struct AggregateProjectionInfo
AggregateProjectionInfo getAggregatingProjectionInfo(
const ProjectionDescription & projection,
const ContextPtr & context,
StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot)
{
/// This is a bad approach.
@ -128,8 +130,7 @@ AggregateProjectionInfo getAggregatingProjectionInfo(
InterpreterSelectQuery interpreter(
projection.query_ast,
context,
storage,
metadata_snapshot,
Pipe(std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock())),
SelectQueryOptions{QueryProcessingStage::WithMergeableState});
const auto & analysis_result = interpreter.getAnalysisResult();
@ -147,13 +148,12 @@ AggregateProjectionInfo getAggregatingProjectionInfo(
struct AggregateProjectionCandidate
{
AggregateProjectionInfo info;
ProjectionDescription * projection;
const ProjectionDescription * projection;
ActionsDAGPtr dag;
};
ActionsDAGPtr analyzeAggregateProjection(
//ProjectionDescription & projection,
AggregateProjectionInfo info,
const AggregateProjectionInfo & info,
ActionsDAG & query_dag,
const Names & keys,
const AggregateDescriptions & aggregates)
@ -373,22 +373,66 @@ void optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes &)
if (!reading_node)
return;
ActionsDAGPtr dag;
ActionsDAG::NodeRawConstPtrs filter_nodes;
if (!buildAggregatingDAG(node, dag, filter_nodes))
return;
// const auto & keys = aggregating->getParams().keys;
// const auto & aggregates = aggregating->getParams().aggregates;
auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get());
if (!reading)
return;
// const auto metadata = reading->getStorageMetadata();
// const auto & projections = metadata->projections;
const auto metadata = reading->getStorageMetadata();
const auto & projections = metadata->projections;
std::vector<const ProjectionDescription *> agg_projections;
for (const auto & projection : projections)
if (projection.type == ProjectionDescription::Type::Aggregate)
agg_projections.push_back(&projection);
if (agg_projections.empty())
return;
ActionsDAGPtr dag;
ActionsDAG::NodeRawConstPtrs filter_nodes;
if (!buildAggregatingDAG(*node.children.front(), dag, filter_nodes))
return;
const ActionsDAG::Node * filter_node = nullptr;
if (!filter_nodes.empty())
{
filter_node = filter_nodes.front();
if (filter_nodes.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and =
std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionAnd>());
filter_node = &dag->addFunction(func_builder_and, std::move(filter_nodes), {});
}
dag->getOutputs().insert(dag->getOutputs().begin(), filter_node);
}
ContextPtr context = reading->getContext();
const auto & keys = aggregating->getParams().keys;
const auto & aggregates = aggregating->getParams().aggregates;
std::vector<AggregateProjectionCandidate> candidates;
candidates.reserve(agg_projections.size());
for (const auto * projection : agg_projections)
{
auto info = getAggregatingProjectionInfo(*projection, context, metadata);
if (auto proj_dag = analyzeAggregateProjection(info, *dag, keys, aggregates))
{
candidates.emplace_back(AggregateProjectionCandidate{
.info = std::move(info),
.projection = projection,
.dag = std::move(proj_dag),
});
}
}
if (candidates.empty())
return;
}
}