From 77061db95595cea33c2e5f84804c1f9a799ec6d6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 14:37:02 +0000 Subject: [PATCH] Adding comments and checks. --- .../QueryPlan/MergingAggregatedStep.cpp | 13 +++++++++++++ .../Transforms/MergingAggregatedTransform.cpp | 16 +++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 8332ad73df6..d35c38a4e32 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -10,6 +10,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static bool memoryBoundMergingWillBeUsed( const DataStream & input_stream, bool memory_bound_merging_of_aggregation_results_enabled, @@ -93,6 +98,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (memoryBoundMergingWillBeUsed()) { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory bound merging of aggregated results is not supported for grouping sets."); + auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), @@ -123,6 +132,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (!memory_efficient_aggregation) { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory efficient merging of aggregated results is not supported for grouping sets."); + /// We union several sources into one, paralleling the work. pipeline.resize(1); diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 114a32b3d83..99fbf3bf4f0 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -13,6 +13,10 @@ namespace ErrorCodes Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) { + /// __grouping_set is neigher GROUP BY key nor an aggregate function. + /// It behaves like a GROUP BY key, but we cannot append it to keys + /// because it changes hashing method and buckets for two level aggregation. + /// Now, this column is processed "manually" by merging each group separately. if (in_header.has("__grouping_set")) out_header.insert(0, in_header.getByName("__grouping_set")); @@ -39,32 +43,41 @@ void MergingAggregatedTransform::addBlock(Block block) auto grouping_column = block.getByPosition(grouping_position).column; block.erase(grouping_position); + /// Split a block by __grouping_set values. + const auto * grouping_column_typed = typeid_cast(grouping_column.get()); if (!grouping_column_typed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); - const auto & grouping_data = grouping_column_typed->getData(); + /// Enumerate groups and fill the selector. std::map enumerated_groups; IColumn::Selector selector; + const auto & grouping_data = grouping_column_typed->getData(); size_t num_rows = grouping_data.size(); UInt64 last_group = grouping_data[0]; for (size_t row = 1; row < num_rows; ++row) { auto group = grouping_data[row]; + + /// Optimization for equal ranges. if (last_group == group) continue; + /// Optimization for single group. if (enumerated_groups.empty()) { selector.reserve(num_rows); enumerated_groups.emplace(last_group, enumerated_groups.size()); } + /// Fill the last equal range. selector.resize_fill(row, enumerated_groups[last_group]); + /// Enumerate new group if did not see it before. enumerated_groups.emplace(last_group, enumerated_groups.size()); } + /// Optimization for single group. if (enumerated_groups.empty()) { auto & bucket_to_blocks = grouping_sets[last_group]; @@ -72,6 +85,7 @@ void MergingAggregatedTransform::addBlock(Block block) return; } + /// Fill the last equal range. selector.resize_fill(num_rows, enumerated_groups[last_group]); const size_t num_groups = enumerated_groups.size();