Simplify code a little bit.

This commit is contained in:
Nikolai Kochetov 2022-05-11 12:16:15 +00:00
parent 4b8a2e2d80
commit 2d99f0ce13
8 changed files with 86 additions and 98 deletions

View File

@ -1560,12 +1560,6 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio
}
}
if (getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY && useGroupingSetKey())
{
result_columns.emplace_back("__grouping_set", "__grouping_set");
step.addRequiredOutput("__grouping_set");
}
auto actions = chain.getLastActions();
actions->project(result_columns);
return actions;

View File

@ -2187,7 +2187,10 @@ void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsD
query_plan.addStep(std::move(where_step));
}
Aggregator::Params InterpreterSelectQuery::getAggregatorParams(
static Aggregator::Params getAggregatorParams(
const ASTPtr & query_ptr,
const SelectQueryExpressionAnalyzer & query_analyzer,
const Context & context,
const Block & current_data_stream_header,
const ColumnNumbers & keys,
const AggregateDescriptions & aggregates,
@ -2212,8 +2215,8 @@ Aggregator::Params InterpreterSelectQuery::getAggregatorParams(
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer->hasConstAggregationKeys()),
context->getTemporaryVolume(),
&& query_analyzer.hasConstAggregationKeys()),
context.getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
@ -2223,15 +2226,16 @@ Aggregator::Params InterpreterSelectQuery::getAggregatorParams(
};
}
GroupingSetsParamsList InterpreterSelectQuery::getAggregatorGroupingSetsParams(
static GroupingSetsParamsList getAggregatorGroupingSetsParams(
const SelectQueryExpressionAnalyzer & query_analyzer,
const Block & header_before_aggregation,
const ColumnNumbers & all_keys
)
{
GroupingSetsParamsList result;
if (query_analyzer->useGroupingSetKey())
if (query_analyzer.useGroupingSetKey())
{
auto const & aggregation_keys_list = query_analyzer->aggregationKeysList();
auto const & aggregation_keys_list = query_analyzer.aggregationKeysList();
ColumnNumbersList grouping_sets_with_keys;
ColumnNumbersList missing_columns_per_set;
@ -2281,10 +2285,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_aggregation.getPositionByName(key.name));
auto aggregator_params = getAggregatorParams(header_before_aggregation, keys, aggregates, overflow_row, settings,
auto aggregator_params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_aggregation, keys, aggregates, overflow_row, settings,
settings.group_by_two_level_threshold, settings.group_by_two_level_threshold_bytes);
auto grouping_sets_params = getAggregatorGroupingSetsParams(header_before_aggregation, keys);
auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, header_before_aggregation, keys);
SortDescription group_by_sort_description;
@ -2374,7 +2378,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_transform.getPositionByName(key.name));
auto params = getAggregatorParams(header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0);
auto params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), true);
QueryPlanStepPtr step;

View File

@ -3,7 +3,6 @@
#include <memory>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
@ -15,7 +14,6 @@
#include <Storages/TableLockHolder.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/ActionsDAG.h>
namespace Poco
{
@ -34,7 +32,6 @@ using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
struct TreeRewriterResult;
using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
using AggregatorParamsPtr = std::unique_ptr<Aggregator::Params>;
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
@ -146,16 +143,6 @@ private:
void executeImpl(QueryPlan & query_plan, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
Aggregator::Params getAggregatorParams(
const Block & current_data_stream_header,
const ColumnNumbers & keys,
const AggregateDescriptions & aggregates,
bool overflow_row, const Settings & settings,
size_t group_by_two_level_threshold, size_t group_by_two_level_threshold_bytes);
GroupingSetsParamsList getAggregatorGroupingSetsParams(
const Block & header_before_aggregation,
const ColumnNumbers & all_keys);
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan);
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
void executeAggregation(

View File

@ -133,7 +133,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
pipeline.transform([&](OutputPortRawPtrs ports)
{
assert(streams * grouping_sets_size == ports.size());
Processors aggregators;
Processors processors;
for (size_t i = 0; i < grouping_sets_size; ++i)
{
Aggregator::Params params_for_set
@ -167,79 +167,85 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
// For each input stream we have `grouping_sets_size` copies, so port index
// for transform #j should skip ports of first (j-1) streams.
connect(*ports[i + grouping_sets_size * j], aggregation_for_set->getInputs().front());
aggregators.push_back(aggregation_for_set);
ports[i + grouping_sets_size * j] = &aggregation_for_set->getOutputs().front();
processors.push_back(aggregation_for_set);
}
}
else
{
auto aggregation_for_set = std::make_shared<AggregatingTransform>(input_header, transform_params_for_set);
connect(*ports[i], aggregation_for_set->getInputs().front());
aggregators.push_back(aggregation_for_set);
ports[i] = &aggregation_for_set->getOutputs().front();
processors.push_back(aggregation_for_set);
}
}
return aggregators;
}, false);
if (streams > 1)
{
pipeline.transform([&](OutputPortRawPtrs ports)
if (streams > 1)
{
Processors resizes;
OutputPortRawPtrs new_ports;
new_ports.reserve(grouping_sets_size);
for (size_t i = 0; i < grouping_sets_size; ++i)
{
auto output_it = ports.begin() + i * streams;
auto resize = std::make_shared<ResizeProcessor>((*output_it)->getHeader(), streams, 1);
size_t output_it = i;
auto resize = std::make_shared<ResizeProcessor>(ports[output_it]->getHeader(), streams, 1);
auto & inputs = resize->getInputs();
for (auto input_it = inputs.begin(); input_it != inputs.end(); ++output_it, ++input_it)
connect(**output_it, *input_it);
resizes.push_back(resize);
for (auto input_it = inputs.begin(); input_it != inputs.end(); output_it += grouping_sets_size, ++input_it)
connect(*ports[output_it], *input_it);
new_ports.push_back(&resize->getOutputs().front());
processors.push_back(resize);
}
return resizes;
}, false);
}
assert(pipeline.getNumStreams() == grouping_sets_size);
size_t set_counter = 0;
auto output_header = transform_params->getHeader();
pipeline.addSimpleTransform([&](const Block & header)
{
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs index;
index.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag->addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
index.push_back(grouping_node);
size_t missign_column_index = 0;
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i)
{
++missign_column_index;
auto column = ColumnConst::create(col.column->cloneResized(1), 0);
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag->materializeNode(*node);
index.push_back(node);
}
else
index.push_back(dag->getIndex()[header.getPositionByName(col.name)]);
ports.swap(new_ports);
}
dag->getIndex().swap(index);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);
assert(ports.size() == grouping_sets_size);
auto output_header = transform_params->getHeader();
++set_counter;
return transform;
for (size_t set_counter = 0; set_counter < grouping_sets_size; ++set_counter)
{
auto & header = ports[set_counter]->getHeader();
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs index;
index.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag->addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
index.push_back(grouping_node);
size_t missign_column_index = 0;
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i)
{
++missign_column_index;
auto column = ColumnConst::create(col.column->cloneResized(1), 0);
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag->materializeNode(*node);
index.push_back(node);
}
else
index.push_back(dag->getIndex()[header.getPositionByName(col.name)]);
}
dag->getIndex().swap(index);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);
connect(*ports[set_counter], transform->getInputPort());
processors.emplace_back(std::move(transform));
}
return processors;
});
aggregating = collector.detachProcessors(0);

View File

@ -786,7 +786,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
header.clear();
}
void Pipe::transform(const Transformer & transformer, bool check_block_structure)
void Pipe::transform(const Transformer & transformer)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
@ -852,18 +852,15 @@ void Pipe::transform(const Transformer & transformer, bool check_block_structure
throw Exception(
"Transformation of Pipe is not valid because processors don't have any disconnected output ports", ErrorCodes::LOGICAL_ERROR);
if (check_block_structure)
{
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe");
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
}
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
if (collected_processors)
{

View File

@ -89,7 +89,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.
void transform(const Transformer & transformer, bool check_block_structure = true);
void transform(const Transformer & transformer);
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);

View File

@ -159,10 +159,10 @@ void QueryPipelineBuilder::addChain(Chain chain)
pipe.addChains(std::move(chains));
}
void QueryPipelineBuilder::transform(const Transformer & transformer, bool check_block_structure)
void QueryPipelineBuilder::transform(const Transformer & transformer)
{
checkInitializedAndNotCompleted();
pipe.transform(transformer, check_block_structure);
pipe.transform(transformer);
}
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)

View File

@ -63,7 +63,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
void transform(const Transformer & transformer, bool check_block_structure = true);
void transform(const Transformer & transformer);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);