Mostly support minmax projection.

This commit is contained in:
Nikolai Kochetov 2023-02-13 16:52:21 +00:00
parent e7dba2a85b
commit d6ea566b20

View File

@ -15,6 +15,7 @@
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sources/NullSource.h>
#include <Parsers/queryToString.h>
#include <stack>
namespace DB::QueryPlanOptimizations
@ -121,8 +122,11 @@ struct AggregateProjectionInfo
AggregateProjectionInfo getAggregatingProjectionInfo(
const ProjectionDescription & projection,
const ContextPtr & context,
const StorageMetadataPtr & metadata_snapshot)
const StorageMetadataPtr & metadata_snapshot,
const Block & key_virtual_columns)
{
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Proj query : {}", queryToString(projection.query_ast));
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Sample for keys : {}", projection.sample_block_for_keys.dumpStructure());
/// This is a bad approach.
/// We'd better have a separate interpreter for projections.
/// Now it's not obvious we didn't miss anything here.
@ -141,6 +145,13 @@ AggregateProjectionInfo getAggregatingProjectionInfo(
info.keys = query_analyzer->aggregationKeys();
info.aggregates = query_analyzer->aggregates();
for (const auto & virt_column : key_virtual_columns)
{
const auto * input = &info.before_aggregation->addInput(virt_column);
info.before_aggregation->getOutputs().push_back(input);
info.keys.push_back(NameAndTypePair{virt_column.name, virt_column.type});
}
return info;
}
@ -487,9 +498,18 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
const auto & keys = aggregating->getParams().keys;
const auto & aggregates = aggregating->getParams().aggregates;
Block key_virtual_columns = reading->getMergeTreeData().getSampleBlockWithVirtualColumns();
std::vector<AggregateProjectionCandidate> candidates;
std::optional<AggregateProjectionCandidate> minmax_projection;
Block minmax_count_projection_block;
MergeTreeData::DataPartsVector minmax_projection_normal_parts;
const auto & parts = reading->getParts();
const auto & query_info = reading->getQueryInfo();
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
AggregateProjectionCandidate * best_candidate = nullptr;
@ -497,7 +517,7 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
const auto * projection = &*(metadata->minmax_count_projection);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name);
auto info = getAggregatingProjectionInfo(*projection, context, metadata);
auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates))
{
@ -509,7 +529,20 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
});
}
best_candidate = &*minmax_projection;
minmax_count_projection_block = reading->getMergeTreeData().getMinMaxCountProjectionBlock(
metadata,
minmax_projection->dag->getRequiredColumnsNames(),
filter_node != nullptr,
query_info,
parts,
minmax_projection_normal_parts,
max_added_blocks.get(),
context);
if (!minmax_count_projection_block)
minmax_projection.reset();
else
best_candidate = &*minmax_projection;
}
if (!minmax_projection)
@ -519,7 +552,7 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
{
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Try projection {}", projection->name);
auto info = getAggregatingProjectionInfo(*projection, context, metadata);
auto info = getAggregatingProjectionInfo(*projection, context, metadata, key_virtual_columns);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection DAG {}", info.before_aggregation->dumpDAG());
if (auto proj_dag = analyzeAggregateProjection(info, *dag, filter_node, keys, aggregates))
{
@ -536,12 +569,8 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
return false;
}
const auto & parts = reading->getParts();
const auto & query_info = reading->getQueryInfo();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (context->getSettingsRef().select_sequential_consistency)
{
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(&reading->getMergeTreeData()))
@ -609,29 +638,18 @@ bool optimizeUseProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
return false;
QueryPlanStepPtr projection_reading;
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;
bool has_nornal_parts;
if (minmax_projection)
{
MergeTreeData::DataPartsVector normal_parts;
auto minmax_count_projection_block = reading->getMergeTreeData().getMinMaxCountProjectionBlock(
metadata,
minmax_projection->dag->getRequiredColumnsNames(),
filter_node != nullptr,
query_info,
parts,
normal_parts,
max_added_blocks.get(),
context);
LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Minmax proj block {}", minmax_count_projection_block.dumpStructure());
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(minmax_count_projection_block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
has_nornal_parts = !normal_parts.empty();
has_nornal_parts = !minmax_projection_normal_parts.empty();
if (has_nornal_parts)
reading->resetParts(std::move(normal_parts));
reading->resetParts(std::move(minmax_projection_normal_parts));
}
else
{