From 2d99f0ce13eab84b03bef5ceedb7247de20d6386 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 May 2022 12:16:15 +0000 Subject: [PATCH] Simplify code a little bit. --- src/Interpreters/ExpressionAnalyzer.cpp | 6 - src/Interpreters/InterpreterSelectQuery.cpp | 22 ++-- src/Interpreters/InterpreterSelectQuery.h | 13 --- src/Processors/QueryPlan/AggregatingStep.cpp | 116 ++++++++++--------- src/QueryPipeline/Pipe.cpp | 19 ++- src/QueryPipeline/Pipe.h | 2 +- src/QueryPipeline/QueryPipelineBuilder.cpp | 4 +- src/QueryPipeline/QueryPipelineBuilder.h | 2 +- 8 files changed, 86 insertions(+), 98 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 25967919183..01769742071 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1560,12 +1560,6 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio } } - if (getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY && useGroupingSetKey()) - { - result_columns.emplace_back("__grouping_set", "__grouping_set"); - step.addRequiredOutput("__grouping_set"); - } - auto actions = chain.getLastActions(); actions->project(result_columns); return actions; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e8c1be5330e..6bf90a4359a 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2187,7 +2187,10 @@ void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsD query_plan.addStep(std::move(where_step)); } -Aggregator::Params InterpreterSelectQuery::getAggregatorParams( +static Aggregator::Params getAggregatorParams( + const ASTPtr & query_ptr, + const SelectQueryExpressionAnalyzer & query_analyzer, + const Context & context, const Block & current_data_stream_header, const ColumnNumbers & keys, const AggregateDescriptions & aggregates, @@ -2212,8 +2215,8 @@ Aggregator::Params InterpreterSelectQuery::getAggregatorParams( settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty() - && query_analyzer->hasConstAggregationKeys()), - context->getTemporaryVolume(), + && query_analyzer.hasConstAggregationKeys()), + context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, @@ -2223,15 +2226,16 @@ Aggregator::Params InterpreterSelectQuery::getAggregatorParams( }; } -GroupingSetsParamsList InterpreterSelectQuery::getAggregatorGroupingSetsParams( +static GroupingSetsParamsList getAggregatorGroupingSetsParams( + const SelectQueryExpressionAnalyzer & query_analyzer, const Block & header_before_aggregation, const ColumnNumbers & all_keys ) { GroupingSetsParamsList result; - if (query_analyzer->useGroupingSetKey()) + if (query_analyzer.useGroupingSetKey()) { - auto const & aggregation_keys_list = query_analyzer->aggregationKeysList(); + auto const & aggregation_keys_list = query_analyzer.aggregationKeysList(); ColumnNumbersList grouping_sets_with_keys; ColumnNumbersList missing_columns_per_set; @@ -2281,10 +2285,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac for (const auto & key : query_analyzer->aggregationKeys()) keys.push_back(header_before_aggregation.getPositionByName(key.name)); - auto aggregator_params = getAggregatorParams(header_before_aggregation, keys, aggregates, overflow_row, settings, + auto aggregator_params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_aggregation, keys, aggregates, overflow_row, settings, settings.group_by_two_level_threshold, settings.group_by_two_level_threshold_bytes); - auto grouping_sets_params = getAggregatorGroupingSetsParams(header_before_aggregation, keys); + auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, header_before_aggregation, keys); SortDescription group_by_sort_description; @@ -2374,7 +2378,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific for (const auto & key : query_analyzer->aggregationKeys()) keys.push_back(header_before_transform.getPositionByName(key.name)); - auto params = getAggregatorParams(header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0); + auto params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0); auto transform_params = std::make_shared(std::move(params), true); QueryPlanStepPtr step; diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index ed6320963ed..3adbcad909c 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -15,7 +14,6 @@ #include #include -#include namespace Poco { @@ -34,7 +32,6 @@ using GroupingSetsParamsList = std::vector; struct TreeRewriterResult; using TreeRewriterResultPtr = std::shared_ptr; -using AggregatorParamsPtr = std::unique_ptr; /** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage. @@ -146,16 +143,6 @@ private: void executeImpl(QueryPlan & query_plan, std::optional prepared_pipe); /// Different stages of query execution. - - Aggregator::Params getAggregatorParams( - const Block & current_data_stream_header, - const ColumnNumbers & keys, - const AggregateDescriptions & aggregates, - bool overflow_row, const Settings & settings, - size_t group_by_two_level_threshold, size_t group_by_two_level_threshold_bytes); - GroupingSetsParamsList getAggregatorGroupingSetsParams( - const Block & header_before_aggregation, - const ColumnNumbers & all_keys); void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan); void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter); void executeAggregation( diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 40316ae4adc..d7d62d07d92 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -133,7 +133,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B pipeline.transform([&](OutputPortRawPtrs ports) { assert(streams * grouping_sets_size == ports.size()); - Processors aggregators; + Processors processors; for (size_t i = 0; i < grouping_sets_size; ++i) { Aggregator::Params params_for_set @@ -167,79 +167,85 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B // For each input stream we have `grouping_sets_size` copies, so port index // for transform #j should skip ports of first (j-1) streams. connect(*ports[i + grouping_sets_size * j], aggregation_for_set->getInputs().front()); - aggregators.push_back(aggregation_for_set); + ports[i + grouping_sets_size * j] = &aggregation_for_set->getOutputs().front(); + processors.push_back(aggregation_for_set); } } else { auto aggregation_for_set = std::make_shared(input_header, transform_params_for_set); connect(*ports[i], aggregation_for_set->getInputs().front()); - aggregators.push_back(aggregation_for_set); + ports[i] = &aggregation_for_set->getOutputs().front(); + processors.push_back(aggregation_for_set); } } - return aggregators; - }, false); - if (streams > 1) - { - pipeline.transform([&](OutputPortRawPtrs ports) + if (streams > 1) { - Processors resizes; + OutputPortRawPtrs new_ports; + new_ports.reserve(grouping_sets_size); + for (size_t i = 0; i < grouping_sets_size; ++i) { - auto output_it = ports.begin() + i * streams; - auto resize = std::make_shared((*output_it)->getHeader(), streams, 1); + size_t output_it = i; + auto resize = std::make_shared(ports[output_it]->getHeader(), streams, 1); auto & inputs = resize->getInputs(); - for (auto input_it = inputs.begin(); input_it != inputs.end(); ++output_it, ++input_it) - connect(**output_it, *input_it); - resizes.push_back(resize); + for (auto input_it = inputs.begin(); input_it != inputs.end(); output_it += grouping_sets_size, ++input_it) + connect(*ports[output_it], *input_it); + new_ports.push_back(&resize->getOutputs().front()); + processors.push_back(resize); } - return resizes; - }, false); - } - assert(pipeline.getNumStreams() == grouping_sets_size); - size_t set_counter = 0; - auto output_header = transform_params->getHeader(); - pipeline.addSimpleTransform([&](const Block & header) - { - /// Here we create a DAG which fills missing keys and adds `__grouping_set` column - auto dag = std::make_shared(header.getColumnsWithTypeAndName()); - ActionsDAG::NodeRawConstPtrs index; - index.reserve(output_header.columns() + 1); - - auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0); - const auto * grouping_node = &dag->addColumn( - {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); - - grouping_node = &dag->materializeNode(*grouping_node); - index.push_back(grouping_node); - - size_t missign_column_index = 0; - const auto & missing_columns = grouping_sets_params[set_counter].missing_keys; - - for (size_t i = 0; i < output_header.columns(); ++i) - { - auto & col = output_header.getByPosition(i); - if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i) - { - ++missign_column_index; - auto column = ColumnConst::create(col.column->cloneResized(1), 0); - const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name}); - node = &dag->materializeNode(*node); - index.push_back(node); - } - else - index.push_back(dag->getIndex()[header.getPositionByName(col.name)]); + ports.swap(new_ports); } - dag->getIndex().swap(index); - auto expression = std::make_shared(dag, settings.getActionsSettings()); - auto transform = std::make_shared(header, expression); + assert(ports.size() == grouping_sets_size); + auto output_header = transform_params->getHeader(); - ++set_counter; - return transform; + for (size_t set_counter = 0; set_counter < grouping_sets_size; ++set_counter) + { + auto & header = ports[set_counter]->getHeader(); + + /// Here we create a DAG which fills missing keys and adds `__grouping_set` column + auto dag = std::make_shared(header.getColumnsWithTypeAndName()); + ActionsDAG::NodeRawConstPtrs index; + index.reserve(output_header.columns() + 1); + + auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0); + const auto * grouping_node = &dag->addColumn( + {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); + + grouping_node = &dag->materializeNode(*grouping_node); + index.push_back(grouping_node); + + size_t missign_column_index = 0; + const auto & missing_columns = grouping_sets_params[set_counter].missing_keys; + + for (size_t i = 0; i < output_header.columns(); ++i) + { + auto & col = output_header.getByPosition(i); + if (missign_column_index < missing_columns.size() && missing_columns[missign_column_index] == i) + { + ++missign_column_index; + auto column = ColumnConst::create(col.column->cloneResized(1), 0); + const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name}); + node = &dag->materializeNode(*node); + index.push_back(node); + } + else + index.push_back(dag->getIndex()[header.getPositionByName(col.name)]); + } + + dag->getIndex().swap(index); + auto expression = std::make_shared(dag, settings.getActionsSettings()); + auto transform = std::make_shared(header, expression); + + connect(*ports[set_counter], transform->getInputPort()); + processors.emplace_back(std::move(transform)); + } + + return processors; }); aggregating = collector.detachProcessors(0); diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 480f2046e23..19009d9692a 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -786,7 +786,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) header.clear(); } -void Pipe::transform(const Transformer & transformer, bool check_block_structure) +void Pipe::transform(const Transformer & transformer) { if (output_ports.empty()) throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR); @@ -852,18 +852,15 @@ void Pipe::transform(const Transformer & transformer, bool check_block_structure throw Exception( "Transformation of Pipe is not valid because processors don't have any disconnected output ports", ErrorCodes::LOGICAL_ERROR); - if (check_block_structure) - { - header = output_ports.front()->getHeader(); - for (size_t i = 1; i < output_ports.size(); ++i) - assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe"); + header = output_ports.front()->getHeader(); + for (size_t i = 1; i < output_ports.size(); ++i) + assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe"); - if (totals_port) - assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes"); + if (totals_port) + assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes"); - if (extremes_port) - assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes"); - } + if (extremes_port) + assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes"); if (collected_processors) { diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 0fce25a3fae..6f85b7a6a88 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -89,7 +89,7 @@ public: using Transformer = std::function; /// Transform Pipe in general way. - void transform(const Transformer & transformer, bool check_block_structure = true); + void transform(const Transformer & transformer); /// Unite several pipes together. They should have same header. static Pipe unitePipes(Pipes pipes); diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index aaf8837a5d7..5e074861110 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -159,10 +159,10 @@ void QueryPipelineBuilder::addChain(Chain chain) pipe.addChains(std::move(chains)); } -void QueryPipelineBuilder::transform(const Transformer & transformer, bool check_block_structure) +void QueryPipelineBuilder::transform(const Transformer & transformer) { checkInitializedAndNotCompleted(); - pipe.transform(transformer, check_block_structure); + pipe.transform(transformer); } void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 86b8a1323f1..ad25985ab48 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -63,7 +63,7 @@ public: using Transformer = std::function; /// Transform pipeline in general way. - void transform(const Transformer & transformer, bool check_block_structure = true); + void transform(const Transformer & transformer); /// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port. void addTotalsHavingTransform(ProcessorPtr transform);