From ddd1799743bd7b2f8cc7824706423868a48782ab Mon Sep 17 00:00:00 2001 From: MaxTheHuman Date: Sun, 16 May 2021 22:44:20 +0300 Subject: [PATCH] grouping sets development --- src/Interpreters/Aggregator.h | 4 +- src/Interpreters/ExpressionAnalyzer.cpp | 1 + src/Interpreters/InterpreterSelectQuery.cpp | 45 +++++++++---------- .../Transforms/AggregatingTransform.cpp | 1 + src/QueryPipeline/Pipe.cpp | 22 ++++++++- src/QueryPipeline/Pipe.h | 3 ++ src/QueryPipeline/QueryPipelineBuilder.h | 2 +- 7 files changed, 52 insertions(+), 26 deletions(-) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index ae4986f5135..87da0d7492d 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -881,7 +881,7 @@ public: ColumnNumbers keys; const ColumnNumbersTwoDimension keys_vector; const AggregateDescriptions aggregates; - const size_t keys_size; + size_t keys_size; const size_t aggregates_size; /// The settings of approximate calculation of GROUP BY. @@ -942,6 +942,7 @@ public: /// two dimensional vector of aggregating keys in params Params( const Block & src_header_, + const ColumnNumbers & keys_, const ColumnNumbersTwoDimension & keys_vector_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, @@ -954,6 +955,7 @@ public: const Block & intermediate_header_ = {}) : src_header(src_header_), intermediate_header(intermediate_header_), + keys(keys_), keys_vector(keys_vector_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index eb1bd592673..8ac892d8834 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -390,6 +390,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) if (select_query->group_by_with_grouping_sets) { aggregation_keys_list.push_back({key}); + aggregation_keys.push_back(key); LOG_DEBUG(poco_log, "pushed grouping set of 1 column: " + key.name); } else diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index edf93c20fd9..f69e52364fc 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1049,11 +1049,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

aggregates(); for (auto & descr : aggregates) if (descr.arguments.empty()) for (const auto & name : descr.argument_names) descr.arguments.push_back(header_before_aggregation.getPositionByName(name)); - + LOG_DEBUG(log, "GroupingSets debug 2"); const Settings & settings = context->getSettingsRef(); - - // Aggregator::Params params( - // header_before_aggregation, - // keys, - // aggregates, - // overflow_row, - // settings.max_rows_to_group_by, - // settings.group_by_overflow_mode, - // settings.group_by_two_level_threshold, - // settings.group_by_two_level_threshold_bytes, - // settings.max_bytes_before_external_group_by, - // settings.empty_result_for_aggregation_by_empty_set - // || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty() - // && query_analyzer->hasConstAggregationKeys()), - // context->getTemporaryVolume(), - // settings.max_threads, - // settings.min_free_disk_space_for_temporary_data, - // settings.compile_aggregate_expressions, - // settings.min_count_to_compile_aggregate_expression); + LOG_DEBUG(log, "GroupingSets debug 3"); std::shared_ptr params_ptr; if (query.group_by_with_grouping_sets) { + LOG_DEBUG(log, "GroupingSets debug 4"); params_ptr = std::make_shared( header_before_aggregation, + all_keys, keys_vector, aggregates, overflow_row, @@ -2157,7 +2153,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression); } - + LOG_DEBUG(log, "GroupingSets debug 5"); SortDescription group_by_sort_description; if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets) @@ -2171,7 +2167,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac : static_cast(settings.max_threads); bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead(); - + LOG_DEBUG(log, "GroupingSets debug 6"); + LOG_DEBUG(log, "GroupingSets step header structure: {}", query_plan.getCurrentDataStream().header.dumpStructure()); auto aggregating_step = std::make_unique( query_plan.getCurrentDataStream(), *params_ptr, @@ -2183,8 +2180,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac storage_has_evenly_distributed_read, std::move(group_by_info), std::move(group_by_sort_description)); - + LOG_DEBUG(log, "GroupingSets step header structure: {}", aggregating_step->getOutputStream().header.dumpStructure()); + LOG_DEBUG(log, "GroupingSets debug 7"); query_plan.addStep(std::move(aggregating_step)); + LOG_DEBUG(log, "GroupingSets debug 8"); } void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final) diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index 8357a997960..47f5f80857e 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -501,6 +501,7 @@ void AggregatingTransform::work() Processors AggregatingTransform::expandPipeline() { + LOG_DEBUG(log, "in AggregatingTransform::expandPipeline"); auto & out = processors.back()->getOutputs().front(); inputs.emplace_back(out.getHeader(), this); connect(out, inputs.back()); diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 093491eed2a..d9917b8636d 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -434,6 +434,8 @@ void Pipe::addTransform(ProcessorPtr transform) void Pipe::addParallelTransforms(Processors transforms) { + LOG_DEBUG(log, "Begin addParallelTransforms, have {} transforms", transforms.size()); + if (output_ports.empty()) throw Exception("Cannot add parallel transforms to empty Pipe.", ErrorCodes::LOGICAL_ERROR); @@ -452,6 +454,7 @@ void Pipe::addParallelTransforms(Processors transforms) throw Exception("Each parallel transform should have one output port", ErrorCodes::LOGICAL_ERROR); outputs.push_back(current_transform_outputs.front()); + LOG_DEBUG(log, "addParallelTransforms, added inputs and outputs for processor {}", transform->getName()); } if (inputs.size() != output_ports.size()) @@ -464,6 +467,7 @@ void Pipe::addParallelTransforms(Processors transforms) { connect(*output_ports[next_output], input); ++next_output; + LOG_DEBUG(log, "addParallelTransforms connect current output to new input {}", next_output); } output_ports.clear(); @@ -471,15 +475,22 @@ void Pipe::addParallelTransforms(Processors transforms) for (auto & output : outputs) { + LOG_DEBUG(log, "addParallelTransforms change outputs to new output"); + LOG_DEBUG(log, "addParallelTransforms is output connected: {}", output.isConnected()); output_ports.emplace_back(&output); } /// do not check output formats because they are different in case of parallel aggregations + LOG_DEBUG(log, "addParallelTransforms do not check format"); if (collected_processors) collected_processors->insert(collected_processors->end(), transforms.begin(), transforms.end()); - processors.insert(processors.end(), transforms.begin(), transforms.end()); + for (auto & transform_ptr : transforms) + { + processors.emplace_back(std::move(transform_ptr)); + } + LOG_DEBUG(log, "addParallelTransforms inserted processors, now processors is of size {}", processors.size()); /// Should not change streams number, so maybe not need max_parallel_streams update max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); @@ -512,6 +523,15 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort size_t next_output = 0; for (auto & input : inputs) { + LOG_DEBUG(log, "Pipe: is input connected {}", input.isConnected()); + LOG_DEBUG(log, "Pipe: is output connected {}", output_ports[next_output]->isConnected()); + if (output_ports[next_output]->isConnected()) + { + if (output_ports[next_output]->getHeader()) + LOG_DEBUG(log, "output header structure is: {}", output_ports[next_output]->getHeader().dumpStructure()); + else + LOG_DEBUG(log, "could not retrieve info about output"); + } connect(*output_ports[next_output], input); ++next_output; } diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index e655f807ec8..21e8fbfe039 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -137,6 +138,8 @@ private: /// It is needed for debug. See QueryPipelineProcessorsCollector. Processors * collected_processors = nullptr; + Poco::Logger * log = &Poco::Logger::get("Pipe"); + /// This methods are for QueryPipeline. It is allowed to complete graph only there. /// So, we may be sure that Pipe always has output port if not empty. bool isCompleted() const { return !empty() && output_ports.empty(); } diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 9428bbc5d0a..d920f8a7e81 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -67,7 +67,7 @@ public: /// Transform pipeline in general way. void transform(const Transformer & transformer); - /// Add transform and connects it to outputs[stream_index] stream + /// Add transforms and connect it to outputs streams void addParallelTransforms(Processors transform); /// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port. void addTotalsHavingTransform(ProcessorPtr transform);