From de2f1adf9167db66469efd8e8b5d2f828f993ec1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 22 Aug 2024 18:02:03 +0000 Subject: [PATCH 01/16] Fix mergine of aggregated data for grouping sets. --- src/Interpreters/InterpreterSelectQuery.cpp | 2 - src/Planner/Planner.cpp | 2 - .../QueryPlan/MergingAggregatedStep.cpp | 2 +- .../Transforms/MergingAggregatedTransform.cpp | 118 ++++++++++++++++-- .../Transforms/MergingAggregatedTransform.h | 10 +- .../02165_replicated_grouping_sets.reference | 54 ++++++++ .../02165_replicated_grouping_sets.sql | 5 + 7 files changed, 179 insertions(+), 14 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 0c79f4310ce..9e5fffac6e4 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2010,8 +2010,6 @@ static void executeMergeAggregatedImpl( SortDescription group_by_sort_description) { auto keys = aggregation_keys.getNames(); - if (has_grouping_sets) - keys.insert(keys.begin(), "__grouping_set"); /** There are two modes of distributed aggregation. * diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index d3d20c6fba0..c0efed8550f 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -504,8 +504,6 @@ void addMergingAggregatedStep(QueryPlan & query_plan, */ auto keys = aggregation_analysis_result.aggregation_keys; - if (!aggregation_analysis_result.grouping_sets_parameters_list.empty()) - keys.insert(keys.begin(), "__grouping_set"); Aggregator::Params params(keys, aggregation_analysis_result.aggregate_descriptions, diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index a5062ac8216..50bd1a882ef 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -48,7 +48,7 @@ MergingAggregatedStep::MergingAggregatedStep( bool memory_bound_merging_of_aggregation_results_enabled_) : ITransformingStep( input_stream_, - params_.getHeader(input_stream_.header, final_), + MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)), getTraits(should_produce_results_in_order_of_bucket_number_)) , params(std::move(params_)) , final(final_) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 446e60a0b81..114a32b3d83 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -10,13 +11,106 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) +{ + if (in_header.has("__grouping_set")) + out_header.insert(0, in_header.getByName("__grouping_set")); + + return out_header; +} + MergingAggregatedTransform::MergingAggregatedTransform( Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_) - : IAccumulatingTransform(std::move(header_), params_->getHeader()) - , params(std::move(params_)), max_threads(max_threads_) + : IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params_->getHeader())) + , params(std::move(params_)), max_threads(max_threads_), has_grouping_sets(header_.has("__grouping_set")) { } +void MergingAggregatedTransform::addBlock(Block block) +{ + if (!has_grouping_sets) + { + auto & bucket_to_blocks = grouping_sets[0]; + bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + return; + } + + auto grouping_position = block.getPositionByName("__grouping_set"); + auto grouping_column = block.getByPosition(grouping_position).column; + block.erase(grouping_position); + + const auto * grouping_column_typed = typeid_cast(grouping_column.get()); + if (!grouping_column_typed) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); + + const auto & grouping_data = grouping_column_typed->getData(); + std::map enumerated_groups; + IColumn::Selector selector; + + size_t num_rows = grouping_data.size(); + UInt64 last_group = grouping_data[0]; + for (size_t row = 1; row < num_rows; ++row) + { + auto group = grouping_data[row]; + if (last_group == group) + continue; + + if (enumerated_groups.empty()) + { + selector.reserve(num_rows); + enumerated_groups.emplace(last_group, enumerated_groups.size()); + } + + selector.resize_fill(row, enumerated_groups[last_group]); + enumerated_groups.emplace(last_group, enumerated_groups.size()); + } + + if (enumerated_groups.empty()) + { + auto & bucket_to_blocks = grouping_sets[last_group]; + bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + return; + } + + selector.resize_fill(num_rows, enumerated_groups[last_group]); + + const size_t num_groups = enumerated_groups.size(); + Blocks splitted_blocks(num_groups); + + for (size_t group_id = 0; group_id < num_groups; ++group_id) + splitted_blocks[group_id] = block.cloneEmpty(); + + size_t columns_in_block = block.columns(); + for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block) + { + MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_groups, selector); + for (size_t group_id = 0; group_id < num_groups; ++group_id) + splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]); + } + + for (auto [group, group_id] : enumerated_groups) + { + auto & bucket_to_blocks = grouping_sets[group]; + auto & splitted_block = splitted_blocks[group_id]; + splitted_block.info = block.info; + bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block)); + } +} + +void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) +{ + auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); + for (auto & block : block_list) + { + auto num_rows = block.rows(); + ColumnWithTypeAndName col; + col.type = std::make_shared(); + col.name = "__grouping_set"; + col.column = ColumnUInt64::create(num_rows, group); + block.insert(grouping_position, std::move(col)); + } +} + void MergingAggregatedTransform::consume(Chunk chunk) { if (!consume_started) @@ -46,7 +140,7 @@ void MergingAggregatedTransform::consume(Chunk chunk) block.info.is_overflows = agg_info->is_overflows; block.info.bucket_num = agg_info->bucket_num; - bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block)); + addBlock(std::move(block)); } else if (chunk.getChunkInfos().get()) { @@ -54,7 +148,7 @@ void MergingAggregatedTransform::consume(Chunk chunk) block.info.is_overflows = false; block.info.bucket_num = -1; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + addBlock(std::move(block)); } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); @@ -70,9 +164,19 @@ Chunk MergingAggregatedTransform::generate() /// Exception safety. Make iterator valid in case any method below throws. next_block = blocks.begin(); - /// TODO: this operation can be made async. Add async for IAccumulatingTransform. - params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); - blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); + for (auto & [group, group_blocks] : grouping_sets) + { + /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + AggregatedDataVariants data_variants; + params->aggregator.mergeBlocks(std::move(group_blocks), data_variants, max_threads, is_cancelled); + auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); + + if (has_grouping_sets) + appendGroupingColumn(group, merged_blocks); + + blocks.splice(blocks.end(), std::move(merged_blocks)); + } + next_block = blocks.begin(); } diff --git a/src/Processors/Transforms/MergingAggregatedTransform.h b/src/Processors/Transforms/MergingAggregatedTransform.h index ade76b2f304..1d801f7a94d 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.h +++ b/src/Processors/Transforms/MergingAggregatedTransform.h @@ -15,6 +15,8 @@ public: MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_); String getName() const override { return "MergingAggregatedTransform"; } + static Block appendGroupingIfNeeded(const Block & in_header, Block out_header); + protected: void consume(Chunk chunk) override; Chunk generate() override; @@ -24,8 +26,9 @@ private: LoggerPtr log = getLogger("MergingAggregatedTransform"); size_t max_threads; - AggregatedDataVariants data_variants; - Aggregator::BucketToBlocks bucket_to_blocks; + using GroupingSets = std::unordered_map; + GroupingSets grouping_sets; + const bool has_grouping_sets; UInt64 total_input_rows = 0; UInt64 total_input_blocks = 0; @@ -35,6 +38,9 @@ private: bool consume_started = false; bool generate_started = false; + + void addBlock(Block block); + void appendGroupingColumn(UInt64 group, BlocksList & block_list); }; } diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference index 659cd98368d..4589dc7d7a5 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference @@ -11,3 +11,57 @@ 0 6 4 1 10 4 2 14 4 +['.'] +['.','.'] +['.','.','.'] +['.','.','.','.'] +['.','.','.','.','.'] +['.','.','.','.','.','.'] +['.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.','.'] +['.'] +['.'] +['.','.'] +['.','.'] +['.','.','.'] +['.','.','.'] +['.','.','.','.'] +['.','.','.','.'] +['.','.','.','.','.'] +['.','.','.','.','.'] +['.','.','.','.','.','.'] +['.','.','.','.','.','.'] +['.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.','.'] +1 +2 +3 +4 +5 +6 +7 +8 +9 +1 +1 +2 +2 +3 +3 +4 +4 +5 +5 +6 +6 +7 +7 +8 +8 +9 +9 diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql index d92d92c3e72..333dab79575 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql @@ -43,3 +43,8 @@ GROUP BY ORDER BY sum_value ASC, count_value ASC; + +SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; From 0e136ded28dc1191dd344500d031f43d7a5750e2 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 22 Aug 2024 19:06:04 +0000 Subject: [PATCH 02/16] Fixing header. --- src/Processors/QueryPlan/MergingAggregatedStep.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 50bd1a882ef..8332ad73df6 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -154,7 +154,9 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const void MergingAggregatedStep::updateOutputStream() { - output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits()); + const auto & in_header = input_streams.front().header; + output_stream = createOutputStream(input_streams.front(), + MergingAggregatedTransform::appendGroupingIfNeeded(in_header, params.getHeader(in_header, final)), getDataStreamTraits()); if (is_order_overwritten) /// overwrite order again applyOrder(group_by_sort_description, overwritten_sort_scope); } From 77061db95595cea33c2e5f84804c1f9a799ec6d6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 14:37:02 +0000 Subject: [PATCH 03/16] Adding comments and checks. --- .../QueryPlan/MergingAggregatedStep.cpp | 13 +++++++++++++ .../Transforms/MergingAggregatedTransform.cpp | 16 +++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 8332ad73df6..d35c38a4e32 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -10,6 +10,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static bool memoryBoundMergingWillBeUsed( const DataStream & input_stream, bool memory_bound_merging_of_aggregation_results_enabled, @@ -93,6 +98,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (memoryBoundMergingWillBeUsed()) { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory bound merging of aggregated results is not supported for grouping sets."); + auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), @@ -123,6 +132,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (!memory_efficient_aggregation) { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory efficient merging of aggregated results is not supported for grouping sets."); + /// We union several sources into one, paralleling the work. pipeline.resize(1); diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 114a32b3d83..99fbf3bf4f0 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -13,6 +13,10 @@ namespace ErrorCodes Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) { + /// __grouping_set is neigher GROUP BY key nor an aggregate function. + /// It behaves like a GROUP BY key, but we cannot append it to keys + /// because it changes hashing method and buckets for two level aggregation. + /// Now, this column is processed "manually" by merging each group separately. if (in_header.has("__grouping_set")) out_header.insert(0, in_header.getByName("__grouping_set")); @@ -39,32 +43,41 @@ void MergingAggregatedTransform::addBlock(Block block) auto grouping_column = block.getByPosition(grouping_position).column; block.erase(grouping_position); + /// Split a block by __grouping_set values. + const auto * grouping_column_typed = typeid_cast(grouping_column.get()); if (!grouping_column_typed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); - const auto & grouping_data = grouping_column_typed->getData(); + /// Enumerate groups and fill the selector. std::map enumerated_groups; IColumn::Selector selector; + const auto & grouping_data = grouping_column_typed->getData(); size_t num_rows = grouping_data.size(); UInt64 last_group = grouping_data[0]; for (size_t row = 1; row < num_rows; ++row) { auto group = grouping_data[row]; + + /// Optimization for equal ranges. if (last_group == group) continue; + /// Optimization for single group. if (enumerated_groups.empty()) { selector.reserve(num_rows); enumerated_groups.emplace(last_group, enumerated_groups.size()); } + /// Fill the last equal range. selector.resize_fill(row, enumerated_groups[last_group]); + /// Enumerate new group if did not see it before. enumerated_groups.emplace(last_group, enumerated_groups.size()); } + /// Optimization for single group. if (enumerated_groups.empty()) { auto & bucket_to_blocks = grouping_sets[last_group]; @@ -72,6 +85,7 @@ void MergingAggregatedTransform::addBlock(Block block) return; } + /// Fill the last equal range. selector.resize_fill(num_rows, enumerated_groups[last_group]); const size_t num_groups = enumerated_groups.size(); From 42e7cc476e4e733839370681366ffde64185ba6c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 14:48:29 +0000 Subject: [PATCH 04/16] Fixing typos. --- src/Processors/Transforms/MergingAggregatedTransform.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 99fbf3bf4f0..9b107b70075 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -13,7 +13,7 @@ namespace ErrorCodes Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) { - /// __grouping_set is neigher GROUP BY key nor an aggregate function. + /// __grouping_set is neither GROUP BY key nor an aggregate function. /// It behaves like a GROUP BY key, but we cannot append it to keys /// because it changes hashing method and buckets for two level aggregation. /// Now, this column is processed "manually" by merging each group separately. From 5f587af078eb6f9c962ee1ba0dccfefcab400f3a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 15:15:16 +0000 Subject: [PATCH 05/16] Review fix. --- src/Processors/Transforms/MergingAggregatedTransform.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 9b107b70075..78fb2f340bf 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -74,7 +74,9 @@ void MergingAggregatedTransform::addBlock(Block block) /// Fill the last equal range. selector.resize_fill(row, enumerated_groups[last_group]); /// Enumerate new group if did not see it before. - enumerated_groups.emplace(last_group, enumerated_groups.size()); + enumerated_groups.emplace(group, enumerated_groups.size()); + + last_group = group; } /// Optimization for single group. From 9d9ef691968f4d93bc90bad9624af2b3390b98e2 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 15:21:48 +0000 Subject: [PATCH 06/16] Fixing check. --- src/Processors/QueryPlan/MergingAggregatedStep.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index d35c38a4e32..7207b5e6c7f 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -132,10 +132,6 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (!memory_efficient_aggregation) { - if (input_streams.front().header.has("__grouping_set")) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Memory efficient merging of aggregated results is not supported for grouping sets."); - /// We union several sources into one, paralleling the work. pipeline.resize(1); @@ -145,6 +141,9 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c } else { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory efficient merging of aggregated results is not supported for grouping sets."); auto num_merge_threads = memory_efficient_merge_threads ? memory_efficient_merge_threads : max_threads; From c7d0d790e2b37bcd91f5e147d775e656bf3d22a7 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 27 Aug 2024 12:38:37 +0000 Subject: [PATCH 07/16] fix materialized views with optimize_functions_to_subcolumns --- .../Passes/FunctionToSubcolumnsPass.cpp | 10 +++-- .../0_stateless/03230_subcolumns_mv.reference | 1 + .../0_stateless/03230_subcolumns_mv.sql | 37 +++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 tests/queries/0_stateless/03230_subcolumns_mv.reference create mode 100644 tests/queries/0_stateless/03230_subcolumns_mv.sql diff --git a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp index 1fc3eec6833..6caf69e3a2c 100644 --- a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp +++ b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp @@ -209,7 +209,7 @@ std::map, NodeToSubcolumnTransformer> node_transfor }, }; -std::tuple getTypedNodesForOptimization(const QueryTreeNodePtr & node) +std::tuple getTypedNodesForOptimization(const QueryTreeNodePtr & node, const ContextPtr & context) { auto * function_node = node->as(); if (!function_node) @@ -232,6 +232,10 @@ std::tuple getTypedNodesForOptimizati const auto & storage_snapshot = table_node->getStorageSnapshot(); auto column = first_argument_column_node->getColumn(); + auto view_source = context->getViewSource(); + if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted()) + return {}; + if (!storage->supportsOptimizationToSubcolumns() || storage->isVirtualColumn(column.name, storage_snapshot->metadata)) return {}; @@ -266,7 +270,7 @@ public: return; } - auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node); + auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node, getContext()); if (function_node && first_argument_node && table_node) { enterImpl(*function_node, *first_argument_node, *table_node); @@ -416,7 +420,7 @@ public: if (!getSettings().optimize_functions_to_subcolumns) return; - auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node); + auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node, getContext()); if (!function_node || !first_argument_column_node || !table_node) return; diff --git a/tests/queries/0_stateless/03230_subcolumns_mv.reference b/tests/queries/0_stateless/03230_subcolumns_mv.reference new file mode 100644 index 00000000000..03528148b49 --- /dev/null +++ b/tests/queries/0_stateless/03230_subcolumns_mv.reference @@ -0,0 +1 @@ +['key1','key2'] ['value1','value2'] diff --git a/tests/queries/0_stateless/03230_subcolumns_mv.sql b/tests/queries/0_stateless/03230_subcolumns_mv.sql new file mode 100644 index 00000000000..e2e577f54c1 --- /dev/null +++ b/tests/queries/0_stateless/03230_subcolumns_mv.sql @@ -0,0 +1,37 @@ +DROP TABLE IF EXISTS rawtable; +DROP TABLE IF EXISTS raw_to_attributes_mv; +DROP TABLE IF EXISTS attributes; + +SET optimize_functions_to_subcolumns = 1; + +CREATE TABLE rawtable +( + `Attributes` Map(String, String), +) +ENGINE = MergeTree +ORDER BY tuple(); + +CREATE MATERIALIZED VIEW raw_to_attributes_mv TO attributes +( + `AttributeKeys` Array(String), + `AttributeValues` Array(String) +) +AS SELECT + mapKeys(Attributes) AS AttributeKeys, + mapValues(Attributes) AS AttributeValues +FROM rawtable; + +CREATE TABLE attributes +( + `AttributeKeys` Array(String), + `AttributeValues` Array(String) +) +ENGINE = ReplacingMergeTree +ORDER BY tuple(); + +INSERT INTO rawtable VALUES ({'key1': 'value1', 'key2': 'value2'}); +SELECT * FROM raw_to_attributes_mv ORDER BY AttributeKeys; + +DROP TABLE IF EXISTS rawtable; +DROP TABLE IF EXISTS raw_to_attributes_mv; +DROP TABLE IF EXISTS attributes; From 90cc6199664705c0c0214f60b4cbb246480d372d Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 15:06:43 +0000 Subject: [PATCH 08/16] Better care about grouping keys order for GROUPING SETS. --- src/Interpreters/Aggregator.h | 12 + src/Interpreters/InterpreterSelectQuery.cpp | 51 ++-- src/Planner/Planner.cpp | 1 + src/Processors/QueryPlan/AggregatingStep.cpp | 101 ++++--- src/Processors/QueryPlan/AggregatingStep.h | 19 +- .../QueryPlan/MergingAggregatedStep.cpp | 14 +- .../QueryPlan/MergingAggregatedStep.h | 2 + .../Transforms/MergingAggregatedTransform.cpp | 196 ++++++++++--- .../Transforms/MergingAggregatedTransform.h | 26 +- .../02165_replicated_grouping_sets.reference | 266 ++++++++++++++---- .../02165_replicated_grouping_sets.sql | 23 +- 11 files changed, 517 insertions(+), 194 deletions(-) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index f4f1e9a1df3..2cb04fc7c51 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder; class NativeWriter; struct OutputBlockColumns; +struct GroupingSetsParams +{ + GroupingSetsParams() = default; + + GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { } + + Names used_keys; + Names missing_keys; +}; + +using GroupingSetsParamsList = std::vector; + /** How are "total" values calculated with WITH TOTALS? * (For more details, see TotalsHavingTransform.) * diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 9e5fffac6e4..ca0e84a5267 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id) return false; } +GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys) +{ + GroupingSetsParamsList result; + + for (const auto & aggregation_keys : aggregation_keys_list) + { + NameSet keys; + for (const auto & key : aggregation_keys) + keys.insert(key.name); + + Names missing_keys; + for (const auto & key : all_keys) + if (!keys.contains(key)) + missing_keys.push_back(key); + + result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys)); + } + + return result; +} + } InterpreterSelectQuery::InterpreterSelectQuery( @@ -2005,6 +2026,7 @@ static void executeMergeAggregatedImpl( bool has_grouping_sets, const Settings & settings, const NamesAndTypesList & aggregation_keys, + const NamesAndTypesLists & aggregation_keys_list, const AggregateDescriptions & aggregates, bool should_produce_results_in_order_of_bucket_number, SortDescription group_by_sort_description) @@ -2027,10 +2049,12 @@ static void executeMergeAggregatedImpl( */ Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization); + auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys); auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), params, + grouping_sets_params, final, /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989) settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets, @@ -2651,30 +2675,6 @@ static Aggregator::Params getAggregatorParams( }; } -static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys) -{ - GroupingSetsParamsList result; - if (query_analyzer.useGroupingSetKey()) - { - auto const & aggregation_keys_list = query_analyzer.aggregationKeysList(); - - for (const auto & aggregation_keys : aggregation_keys_list) - { - NameSet keys; - for (const auto & key : aggregation_keys) - keys.insert(key.name); - - Names missing_keys; - for (const auto & key : all_keys) - if (!keys.contains(key)) - missing_keys.push_back(key); - - result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys)); - } - } - return result; -} - void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info) { executeExpression(query_plan, expression, "Before GROUP BY"); @@ -2694,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac settings.group_by_two_level_threshold, settings.group_by_two_level_threshold_bytes); - auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys); + auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys); SortDescription group_by_sort_description; SortDescription sort_description_for_merging; @@ -2762,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool has_grouping_sets, context->getSettingsRef(), query_analyzer->aggregationKeys(), + query_analyzer->aggregationKeysList(), query_analyzer->aggregates(), should_produce_results_in_order_of_bucket_number, std::move(group_by_sort_description)); diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index c0efed8550f..7b5101c5c7d 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -528,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan, auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), params, + aggregation_analysis_result.grouping_sets_parameters_list, query_analysis_result.aggregate_final, /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989) settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets, diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 8a5ed7fde65..a4d707704b1 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_, explicit_sorting_required_for_aggregation_in_order = false; } +ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG( + const Block & in_header, + const Block & out_header, + const GroupingSetsParamsList & grouping_sets_params, + UInt64 group, + bool group_by_use_nulls) +{ + /// Here we create a DAG which fills missing keys and adds `__grouping_set` column + ActionsDAG dag(in_header.getColumnsWithTypeAndName()); + ActionsDAG::NodeRawConstPtrs outputs; + outputs.reserve(out_header.columns() + 1); + + auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0); + const auto * grouping_node = &dag.addColumn( + {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); + + grouping_node = &dag.materializeNode(*grouping_node); + outputs.push_back(grouping_node); + + const auto & missing_columns = grouping_sets_params[group].missing_keys; + const auto & used_keys = grouping_sets_params[group].used_keys; + + auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr); + for (size_t i = 0; i < out_header.columns(); ++i) + { + const auto & col = out_header.getByPosition(i); + const auto missing_it = std::find_if( + missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; }); + const auto used_it = std::find_if( + used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; }); + if (missing_it != missing_columns.end()) + { + auto column_with_default = col.column->cloneEmpty(); + col.type->insertDefaultInto(*column_with_default); + column_with_default->finalize(); + + auto column = ColumnConst::create(std::move(column_with_default), 0); + const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name}); + node = &dag.materializeNode(*node); + outputs.push_back(node); + } + else + { + const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)]; + if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable()) + outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name)); + else + outputs.push_back(column_node); + } + } + + dag.getOutputs().swap(outputs); + return dag; +} + void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) { QueryPipelineProcessorsCollector collector(pipeline, this); @@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B { const auto & header = ports[set_counter]->getHeader(); - /// Here we create a DAG which fills missing keys and adds `__grouping_set` column - ActionsDAG dag(header.getColumnsWithTypeAndName()); - ActionsDAG::NodeRawConstPtrs outputs; - outputs.reserve(output_header.columns() + 1); - - auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0); - const auto * grouping_node = &dag.addColumn( - {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); - - grouping_node = &dag.materializeNode(*grouping_node); - outputs.push_back(grouping_node); - - const auto & missing_columns = grouping_sets_params[set_counter].missing_keys; - const auto & used_keys = grouping_sets_params[set_counter].used_keys; - - auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr); - for (size_t i = 0; i < output_header.columns(); ++i) - { - auto & col = output_header.getByPosition(i); - const auto missing_it = std::find_if( - missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; }); - const auto used_it = std::find_if( - used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; }); - if (missing_it != missing_columns.end()) - { - auto column_with_default = col.column->cloneEmpty(); - col.type->insertDefaultInto(*column_with_default); - column_with_default->finalize(); - - auto column = ColumnConst::create(std::move(column_with_default), 0); - const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name}); - node = &dag.materializeNode(*node); - outputs.push_back(node); - } - else - { - const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)]; - if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable()) - outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name)); - else - outputs.push_back(column_node); - } - } - - dag.getOutputs().swap(outputs); + auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls); auto expression = std::make_shared(std::move(dag), settings.getActionsSettings()); auto transform = std::make_shared(header, expression); diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index ae43295024a..4e4078047f1 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -7,18 +7,6 @@ namespace DB { -struct GroupingSetsParams -{ - GroupingSetsParams() = default; - - GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { } - - Names used_keys; - Names missing_keys; -}; - -using GroupingSetsParamsList = std::vector; - Block appendGroupingSetColumn(Block header); Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls); @@ -77,6 +65,13 @@ public: /// Argument input_stream would be the second input (from projection). std::unique_ptr convertToAggregatingProjection(const DataStream & input_stream) const; + static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG( + const Block & in_header, + const Block & out_header, + const GroupingSetsParamsList & grouping_sets_params, + UInt64 group, + bool group_by_use_nulls); + private: void updateOutputStream() override; diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 7207b5e6c7f..f3eb352faac 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -42,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_ MergingAggregatedStep::MergingAggregatedStep( const DataStream & input_stream_, Aggregator::Params params_, + GroupingSetsParamsList grouping_sets_params_, bool final_, bool memory_efficient_aggregation_, size_t max_threads_, @@ -56,6 +57,7 @@ MergingAggregatedStep::MergingAggregatedStep( MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)), getTraits(should_produce_results_in_order_of_bucket_number_)) , params(std::move(params_)) + , grouping_sets_params(std::move(grouping_sets_params_)) , final(final_) , memory_efficient_aggregation(memory_efficient_aggregation_) , max_threads(max_threads_) @@ -94,14 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - auto transform_params = std::make_shared(pipeline.getHeader(), std::move(params), final); - if (memoryBoundMergingWillBeUsed()) { - if (input_streams.front().header.has("__grouping_set")) + if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Memory bound merging of aggregated results is not supported for grouping sets."); + auto transform_params = std::make_shared(pipeline.getHeader(), std::move(params), final); auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), @@ -136,18 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c pipeline.resize(1); /// Now merge the aggregated blocks - pipeline.addSimpleTransform([&](const Block & header) - { return std::make_shared(header, transform_params, max_threads); }); + auto transform = std::make_shared(pipeline.getHeader(), params, final, grouping_sets_params, max_threads); + pipeline.addTransform(std::move(transform)); } else { - if (input_streams.front().header.has("__grouping_set")) + if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Memory efficient merging of aggregated results is not supported for grouping sets."); auto num_merge_threads = memory_efficient_merge_threads ? memory_efficient_merge_threads : max_threads; + auto transform_params = std::make_shared(pipeline.getHeader(), std::move(params), final); pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads); } diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.h b/src/Processors/QueryPlan/MergingAggregatedStep.h index 654f794d5f5..5c3842a6c33 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.h +++ b/src/Processors/QueryPlan/MergingAggregatedStep.h @@ -16,6 +16,7 @@ public: MergingAggregatedStep( const DataStream & input_stream_, Aggregator::Params params_, + GroupingSetsParamsList grouping_sets_params_, bool final_, bool memory_efficient_aggregation_, size_t max_threads_, @@ -43,6 +44,7 @@ private: Aggregator::Params params; + GroupingSetsParamsList grouping_sets_params; bool final; bool memory_efficient_aggregation; size_t max_threads; diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 78fb2f340bf..cf383cfcf9d 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -1,7 +1,9 @@ #include #include #include +#include #include +#include #include namespace DB @@ -23,19 +25,93 @@ Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header return out_header; } +MergingAggregatedTransform::~MergingAggregatedTransform() = default; + MergingAggregatedTransform::MergingAggregatedTransform( - Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_) - : IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params_->getHeader())) - , params(std::move(params_)), max_threads(max_threads_), has_grouping_sets(header_.has("__grouping_set")) + Block header_, + Aggregator::Params params, + bool final, + GroupingSetsParamsList grouping_sets_params, + size_t max_threads_) + : IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final))) + , max_threads(max_threads_) { + if (!grouping_sets_params.empty()) + { + if (!header_.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets." + "Header {}", header_.dumpStructure()); + + auto in_header = header_; + in_header.erase(header_.getPositionByName("__grouping_set")); + auto out_header = params.getHeader(header_, final); + + grouping_sets.reserve(grouping_sets_params.size()); + for (const auto & grouping_set_params : grouping_sets_params) + { + size_t group = grouping_sets.size(); + + ActionsDAG reordering(in_header.getColumnsWithTypeAndName()); + auto & outputs = reordering.getOutputs(); + ActionsDAG::NodeRawConstPtrs new_outputs; + new_outputs.reserve(in_header.columns() + grouping_set_params.used_keys.size() - grouping_set_params.used_keys.size()); + + std::unordered_map index; + for (size_t pos = 0; pos < outputs.size(); ++pos) + index.emplace(outputs[pos]->result_name, pos); + + for (const auto & used_name : grouping_set_params.used_keys) + { + auto & idx = index[used_name]; + new_outputs.push_back(outputs[idx]); + } + + for (const auto & used_name : grouping_set_params.used_keys) + index[used_name] = outputs.size(); + for (const auto & missing_name : grouping_set_params.missing_keys) + index[missing_name] = outputs.size(); + + for (const auto * output : outputs) + { + if (index[output->result_name] != outputs.size()) + new_outputs.push_back(output); + } + + outputs.swap(new_outputs); + + Aggregator::Params set_params(grouping_set_params.used_keys, + params.aggregates, + params.overflow_row, + params.max_threads, + params.max_block_size, + params.min_hit_rate_to_use_consecutive_keys_optimization); + + auto transform_params = std::make_shared(reordering.updateHeader(in_header), std::move(set_params), final); + + auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG( + transform_params->getHeader(), + out_header, + grouping_sets_params, group, false); + + auto & groupiung_set = grouping_sets.emplace_back(); + groupiung_set.reordering_key_columns_actions = std::make_shared(std::move(reordering)); + groupiung_set.creating_missing_keys_actions = std::make_shared(std::move(creating)); + groupiung_set.params = std::move(transform_params); + } + } + else + { + auto & groupiung_set = grouping_sets.emplace_back(); + groupiung_set.params = std::make_shared(header_, std::move(params), final); + } } void MergingAggregatedTransform::addBlock(Block block) { - if (!has_grouping_sets) + if (grouping_sets.size() == 1) { - auto & bucket_to_blocks = grouping_sets[0]; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + grouping_sets[0].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); return; } @@ -49,13 +125,12 @@ void MergingAggregatedTransform::addBlock(Block block) if (!grouping_column_typed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); - /// Enumerate groups and fill the selector. - std::map enumerated_groups; IColumn::Selector selector; const auto & grouping_data = grouping_column_typed->getData(); size_t num_rows = grouping_data.size(); UInt64 last_group = grouping_data[0]; + UInt64 max_group = last_group; for (size_t row = 1; row < num_rows; ++row) { auto group = grouping_data[row]; @@ -65,32 +140,32 @@ void MergingAggregatedTransform::addBlock(Block block) continue; /// Optimization for single group. - if (enumerated_groups.empty()) - { + if (selector.empty()) selector.reserve(num_rows); - enumerated_groups.emplace(last_group, enumerated_groups.size()); - } /// Fill the last equal range. - selector.resize_fill(row, enumerated_groups[last_group]); - /// Enumerate new group if did not see it before. - enumerated_groups.emplace(group, enumerated_groups.size()); - + selector.resize_fill(row, last_group); last_group = group; + max_group = std::max(last_group, max_group); } + if (max_group >= grouping_sets.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size()); + /// Optimization for single group. - if (enumerated_groups.empty()) + if (selector.empty()) { - auto & bucket_to_blocks = grouping_sets[last_group]; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + auto bucket = block.info.bucket_num; + grouping_sets[last_group].reordering_key_columns_actions->execute(block); + grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block)); return; } /// Fill the last equal range. - selector.resize_fill(num_rows, enumerated_groups[last_group]); + selector.resize_fill(num_rows, last_group); - const size_t num_groups = enumerated_groups.size(); + const size_t num_groups = max_group + 1; Blocks splitted_blocks(num_groups); for (size_t group_id = 0; group_id < num_groups; ++group_id) @@ -104,28 +179,28 @@ void MergingAggregatedTransform::addBlock(Block block) splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]); } - for (auto [group, group_id] : enumerated_groups) + for (size_t group = 0; group < num_groups; ++group) { - auto & bucket_to_blocks = grouping_sets[group]; - auto & splitted_block = splitted_blocks[group_id]; + auto & splitted_block = splitted_blocks[group]; splitted_block.info = block.info; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block)); + grouping_sets[group].reordering_key_columns_actions->execute(splitted_block); + grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block)); } } -void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) -{ - auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); - for (auto & block : block_list) - { - auto num_rows = block.rows(); - ColumnWithTypeAndName col; - col.type = std::make_shared(); - col.name = "__grouping_set"; - col.column = ColumnUInt64::create(num_rows, group); - block.insert(grouping_position, std::move(col)); - } -} +// void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) +// { +// auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); +// for (auto & block : block_list) +// { +// auto num_rows = block.rows(); +// ColumnWithTypeAndName col; +// col.type = std::make_shared(); +// col.name = "__grouping_set"; +// col.column = ColumnUInt64::create(num_rows, group); +// block.insert(grouping_position, std::move(col)); +// } +// } void MergingAggregatedTransform::consume(Chunk chunk) { @@ -170,6 +245,25 @@ void MergingAggregatedTransform::consume(Chunk chunk) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); } +// static void debugBlock(const Block & block) +// { +// std::cerr << block.dumpStructure() << std::endl; +// size_t rows = block.rows(); +// for (size_t row = 0; row < rows; ++row) +// { +// for (size_t col = 0; col < block.columns(); ++col) +// { +// const auto & c = block.getByPosition(col); +// if (c.column->isNumeric()) +// std::cerr << c.column->getUInt(row) << ' '; +// else +// std::cerr << c.column->getDataAt(row).toString() << ' '; + +// } +// std::cerr << std::endl; +// } +// } + Chunk MergingAggregatedTransform::generate() { if (!generate_started) @@ -180,15 +274,31 @@ Chunk MergingAggregatedTransform::generate() /// Exception safety. Make iterator valid in case any method below throws. next_block = blocks.begin(); - for (auto & [group, group_blocks] : grouping_sets) + for (auto & grouping_set : grouping_sets) { - /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + auto & params = grouping_set.params; + auto & bucket_to_blocks = grouping_set.bucket_to_blocks; AggregatedDataVariants data_variants; - params->aggregator.mergeBlocks(std::move(group_blocks), data_variants, max_threads, is_cancelled); + + // std::cerr << "== Group " << group << std::endl; + // for (const auto & [buk, lst] : bucket_to_blocks) + // { + // std::cerr << ".. buk " << buk << std::endl; + // for (const auto & b : lst) + // debugBlock(b); + // } + + /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); - if (has_grouping_sets) - appendGroupingColumn(group, merged_blocks); + if (grouping_set.creating_missing_keys_actions) + for (auto & block : merged_blocks) + grouping_set.creating_missing_keys_actions->execute(block); + + // std::cerr << "== Merged " << group << std::endl; + // for (const auto & b : merged_blocks) + // debugBlock(b); blocks.splice(blocks.end(), std::move(merged_blocks)); } diff --git a/src/Processors/Transforms/MergingAggregatedTransform.h b/src/Processors/Transforms/MergingAggregatedTransform.h index 1d801f7a94d..3a043ad74b8 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.h +++ b/src/Processors/Transforms/MergingAggregatedTransform.h @@ -6,13 +6,24 @@ namespace DB { +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + /** A pre-aggregate stream of blocks in which each block is already aggregated. * Aggregate functions in blocks should not be finalized so that their states can be merged. */ class MergingAggregatedTransform : public IAccumulatingTransform { public: - MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_); + MergingAggregatedTransform( + Block header_, + Aggregator::Params params_, + bool final_, + GroupingSetsParamsList grouping_sets_params, + size_t max_threads_); + + ~MergingAggregatedTransform() override; + String getName() const override { return "MergingAggregatedTransform"; } static Block appendGroupingIfNeeded(const Block & in_header, Block out_header); @@ -22,13 +33,19 @@ protected: Chunk generate() override; private: - AggregatingTransformParamsPtr params; LoggerPtr log = getLogger("MergingAggregatedTransform"); size_t max_threads; - using GroupingSets = std::unordered_map; + struct GroupingSet + { + Aggregator::BucketToBlocks bucket_to_blocks; + ExpressionActionsPtr reordering_key_columns_actions; + ExpressionActionsPtr creating_missing_keys_actions; + AggregatingTransformParamsPtr params; + }; + + using GroupingSets = std::vector; GroupingSets grouping_sets; - const bool has_grouping_sets; UInt64 total_input_rows = 0; UInt64 total_input_blocks = 0; @@ -40,7 +57,6 @@ private: bool generate_started = false; void addBlock(Block block); - void appendGroupingColumn(UInt64 group, BlocksList & block_list); }; } diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference index 4589dc7d7a5..31cbf2ad670 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference @@ -11,57 +11,215 @@ 0 6 4 1 10 4 2 14 4 -['.'] -['.','.'] -['.','.','.'] -['.','.','.','.'] -['.','.','.','.','.'] -['.','.','.','.','.','.'] -['.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.','.'] -['.'] -['.'] -['.','.'] -['.','.'] -['.','.','.'] -['.','.','.'] -['.','.','.','.'] -['.','.','.','.'] -['.','.','.','.','.'] -['.','.','.','.','.'] -['.','.','.','.','.','.'] -['.','.','.','.','.','.'] -['.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.','.'] -1 -2 -3 -4 -5 -6 -7 -8 -9 -1 -1 -2 -2 -3 -3 -4 -4 -5 -5 -6 -6 -7 -7 -8 -8 -9 -9 +-- { echo On } + +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 ['.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 ['.'] +1 ['.'] +2 ['.','.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +2 2 +2 3 +2 4 +2 5 +2 6 +2 7 +2 8 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +1 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +1 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +1 1 +1 1 +2 2 +2 2 +2 2 +2 3 +2 3 +2 3 +2 4 +2 4 +2 4 +2 5 +2 5 +2 5 +2 6 +2 6 +2 6 +2 7 +2 7 +2 7 +2 8 +2 8 +2 8 +2 9 +2 9 +2 9 +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 ['.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 ['.'] +2 ['.'] +2 ['.','.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 2 +2 3 +2 4 +2 5 +2 6 +2 7 +2 8 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 1 +2 1 +2 2 +2 2 +2 2 +2 3 +2 3 +2 3 +2 4 +2 4 +2 4 +2 5 +2 5 +2 5 +2 6 +2 6 +2 6 +2 7 +2 7 +2 7 +2 8 +2 8 +2 8 +2 9 +2 9 +2 9 diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql index 333dab79575..47d4446f348 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql @@ -44,7 +44,22 @@ ORDER BY sum_value ASC, count_value ASC; -SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; -SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; -SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; -SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +set prefer_localhost_replica = 1; + +-- { echo On } + +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; + +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; + +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; + +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; From 2b495e22cdeea4c769c73b7e21f448be2c5ffcae Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 15:38:52 +0000 Subject: [PATCH 09/16] Fixing a test. --- src/Processors/Transforms/MergingAggregatedTransform.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index cf383cfcf9d..dd97364f879 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -111,7 +111,10 @@ void MergingAggregatedTransform::addBlock(Block block) { if (grouping_sets.size() == 1) { - grouping_sets[0].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + auto bucket = block.info.bucket_num; + if (grouping_sets[0].reordering_key_columns_actions) + grouping_sets[0].reordering_key_columns_actions->execute(block); + grouping_sets[0].bucket_to_blocks[bucket].emplace_back(std::move(block)); return; } From ac91471042ebac5fc5467aef9efe806124293f1a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 27 Aug 2024 16:06:12 +0000 Subject: [PATCH 10/16] add comment --- src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp index 6caf69e3a2c..6f1c3937880 100644 --- a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp +++ b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp @@ -232,6 +232,8 @@ std::tuple getTypedNodesForOptimizati const auto & storage_snapshot = table_node->getStorageSnapshot(); auto column = first_argument_column_node->getColumn(); + /// If view source is set we cannot optimize because it doesn't support moving functions to subcolumns. + /// The storage is replaced to the view source but it happens only after building a query tree and applying passes. auto view_source = context->getViewSource(); if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted()) return {}; From a7584bbb80c68917702a31492a5024faf7c2aaf4 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 16:07:31 +0000 Subject: [PATCH 11/16] Remove comments. --- .../Transforms/MergingAggregatedTransform.cpp | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index dd97364f879..edd544fb6af 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -191,20 +191,6 @@ void MergingAggregatedTransform::addBlock(Block block) } } -// void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) -// { -// auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); -// for (auto & block : block_list) -// { -// auto num_rows = block.rows(); -// ColumnWithTypeAndName col; -// col.type = std::make_shared(); -// col.name = "__grouping_set"; -// col.column = ColumnUInt64::create(num_rows, group); -// block.insert(grouping_position, std::move(col)); -// } -// } - void MergingAggregatedTransform::consume(Chunk chunk) { if (!consume_started) @@ -248,25 +234,6 @@ void MergingAggregatedTransform::consume(Chunk chunk) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); } -// static void debugBlock(const Block & block) -// { -// std::cerr << block.dumpStructure() << std::endl; -// size_t rows = block.rows(); -// for (size_t row = 0; row < rows; ++row) -// { -// for (size_t col = 0; col < block.columns(); ++col) -// { -// const auto & c = block.getByPosition(col); -// if (c.column->isNumeric()) -// std::cerr << c.column->getUInt(row) << ' '; -// else -// std::cerr << c.column->getDataAt(row).toString() << ' '; - -// } -// std::cerr << std::endl; -// } -// } - Chunk MergingAggregatedTransform::generate() { if (!generate_started) @@ -283,14 +250,6 @@ Chunk MergingAggregatedTransform::generate() auto & bucket_to_blocks = grouping_set.bucket_to_blocks; AggregatedDataVariants data_variants; - // std::cerr << "== Group " << group << std::endl; - // for (const auto & [buk, lst] : bucket_to_blocks) - // { - // std::cerr << ".. buk " << buk << std::endl; - // for (const auto & b : lst) - // debugBlock(b); - // } - /// TODO: this operation can be made async. Add async for IAccumulatingTransform. params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); @@ -299,10 +258,6 @@ Chunk MergingAggregatedTransform::generate() for (auto & block : merged_blocks) grouping_set.creating_missing_keys_actions->execute(block); - // std::cerr << "== Merged " << group << std::endl; - // for (const auto & b : merged_blocks) - // debugBlock(b); - blocks.splice(blocks.end(), std::move(merged_blocks)); } From a1517cb9d6598c6ae7cfef5d574702966ea244a9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 16:56:03 +0000 Subject: [PATCH 12/16] Refactor a bit and add a comment. --- .../Transforms/MergingAggregatedTransform.cpp | 64 +++++++++++-------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index edd544fb6af..9b76acb8081 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -25,6 +25,42 @@ Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header return out_header; } +/// We should keep the order for GROUPING SET keys. +/// Initiator creates a separate Aggregator for every group, so should we do here. +/// Otherwise, two-level aggregation will split the data into different buckets, +/// and the result may have duplicating rows. +static ActionsDAG makeReorderingActions(const Block & in_header, const GroupingSetsParams & params) +{ + ActionsDAG reordering(in_header.getColumnsWithTypeAndName()); + auto & outputs = reordering.getOutputs(); + ActionsDAG::NodeRawConstPtrs new_outputs; + new_outputs.reserve(in_header.columns() + params.used_keys.size() - params.used_keys.size()); + + std::unordered_map index; + for (size_t pos = 0; pos < outputs.size(); ++pos) + index.emplace(outputs[pos]->result_name, pos); + + for (const auto & used_name : params.used_keys) + { + auto & idx = index[used_name]; + new_outputs.push_back(outputs[idx]); + } + + for (const auto & used_name : params.used_keys) + index[used_name] = outputs.size(); + for (const auto & missing_name : params.missing_keys) + index[missing_name] = outputs.size(); + + for (const auto * output : outputs) + { + if (index[output->result_name] != outputs.size()) + new_outputs.push_back(output); + } + + outputs.swap(new_outputs); + return reordering; +} + MergingAggregatedTransform::~MergingAggregatedTransform() = default; MergingAggregatedTransform::MergingAggregatedTransform( @@ -52,33 +88,7 @@ MergingAggregatedTransform::MergingAggregatedTransform( { size_t group = grouping_sets.size(); - ActionsDAG reordering(in_header.getColumnsWithTypeAndName()); - auto & outputs = reordering.getOutputs(); - ActionsDAG::NodeRawConstPtrs new_outputs; - new_outputs.reserve(in_header.columns() + grouping_set_params.used_keys.size() - grouping_set_params.used_keys.size()); - - std::unordered_map index; - for (size_t pos = 0; pos < outputs.size(); ++pos) - index.emplace(outputs[pos]->result_name, pos); - - for (const auto & used_name : grouping_set_params.used_keys) - { - auto & idx = index[used_name]; - new_outputs.push_back(outputs[idx]); - } - - for (const auto & used_name : grouping_set_params.used_keys) - index[used_name] = outputs.size(); - for (const auto & missing_name : grouping_set_params.missing_keys) - index[missing_name] = outputs.size(); - - for (const auto * output : outputs) - { - if (index[output->result_name] != outputs.size()) - new_outputs.push_back(output); - } - - outputs.swap(new_outputs); + auto reordering = makeReorderingActions(in_header, grouping_set_params); Aggregator::Params set_params(grouping_set_params.used_keys, params.aggregates, From a842994af121631a055186e7d628650bbe3a3521 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 28 Aug 2024 15:21:49 +0200 Subject: [PATCH 13/16] fix false leak detect in libfiu --- contrib/libfiu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/libfiu b/contrib/libfiu index b85edbde4cf..a1290d8cd3d 160000 --- a/contrib/libfiu +++ b/contrib/libfiu @@ -1 +1 @@ -Subproject commit b85edbde4cf974b1b40d27828a56f0505f4e2ee5 +Subproject commit a1290d8cd3d7b4541d6c976e0a54f572ac03f2a3 From d6127d5f4966b7fad22073e26ee466d654b7529b Mon Sep 17 00:00:00 2001 From: Konstantin Smirnov <46676677+konnectr@users.noreply.github.com> Date: Wed, 28 Aug 2024 22:30:12 +0500 Subject: [PATCH 14/16] add ON CLUSTER --- docs/ru/sql-reference/statements/system.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ru/sql-reference/statements/system.md b/docs/ru/sql-reference/statements/system.md index 3e7d67d90ff..d17e5acd693 100644 --- a/docs/ru/sql-reference/statements/system.md +++ b/docs/ru/sql-reference/statements/system.md @@ -280,7 +280,7 @@ SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, но не более `receive_timeout` секунд: ``` sql -SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL] +SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL] ``` После выполнения этого запроса таблица `[db.]replicated_merge_tree_family_table_name` загружает команды из общего реплицированного лога в свою собственную очередь репликации. Затем запрос ждет, пока реплика не обработает все загруженные команды. Поддерживаются следующие модификаторы: From cd9be01c65767ff88131c110b5a2daf0a663034e Mon Sep 17 00:00:00 2001 From: Aleksa Cukovic Date: Thu, 29 Aug 2024 14:40:38 +0200 Subject: [PATCH 15/16] Fix row policy documentation grammar --- .../statements/create/row-policy.md | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/en/sql-reference/statements/create/row-policy.md b/docs/en/sql-reference/statements/create/row-policy.md index cd7718793bd..8be766710fd 100644 --- a/docs/en/sql-reference/statements/create/row-policy.md +++ b/docs/en/sql-reference/statements/create/row-policy.md @@ -8,7 +8,7 @@ title: "CREATE ROW POLICY" Creates a [row policy](../../../guides/sre/user-management/index.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table. :::tip -Row policies makes sense only for users with readonly access. If user can modify table or copy partitions between tables, it defeats the restrictions of row policies. +Row policies make sense only for users with readonly access. If a user can modify a table or copy partitions between tables, it defeats the restrictions of row policies. ::: Syntax: @@ -24,40 +24,40 @@ CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluste ## USING Clause -Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row. +Allows specifying a condition to filter rows. A user will see a row if the condition is calculated to non-zero for the row. ## TO Clause -In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`. +In the `TO` section you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`. -Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost` +Keyword `ALL` means all the ClickHouse users, including current user. Keyword `ALL EXCEPT` allows excluding some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost` :::note -If there are no row policies defined for a table then any user can `SELECT` all the row from the table. Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if those row policies are defined for the current user or not. For example, the following policy +If there are no row policies defined for a table, then any user can `SELECT` all the rows from the table. Defining one or more row policies for the table makes access to the table dependent on the row policies, no matter if those row policies are defined for the current user or not. For example, the following policy: `CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter` -forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all. +forbids the users `mira` and `peter` from seeing the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all. -If that's not desirable it can't be fixed by adding one more row policy, like the following: +If that's not desirable, it can be fixed by adding one more row policy, like the following: `CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter` ::: ## AS Clause -It's allowed to have more than one policy enabled on the same table for the same user at the one time. So we need a way to combine the conditions from multiple policies. +It's allowed to have more than one policy enabled on the same table for the same user at one time. So we need a way to combine the conditions from multiple policies. -By default policies are combined using the boolean `OR` operator. For example, the following policies +By default, policies are combined using the boolean `OR` operator. For example, the following policies: ``` sql CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio ``` -enables the user `peter` to see rows with either `b=1` or `c=2`. +enable the user `peter` to see rows with either `b=1` or `c=2`. -The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default policies are permissive, which means they are combined using the boolean `OR` operator. +The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default, policies are permissive, which means they are combined using the boolean `OR` operator. A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator. @@ -68,25 +68,25 @@ row_is_visible = (one or more of the permissive policies' conditions are non-zer (all of the restrictive policies's conditions are non-zero) ``` -For example, the following policies +For example, the following policies: ``` sql CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio ``` -enables the user `peter` to see rows only if both `b=1` AND `c=2`. +enable the user `peter` to see rows only if both `b=1` AND `c=2`. Database policies are combined with table policies. -For example, the following policies +For example, the following policies: ``` sql CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio ``` -enables the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although +enable the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although any other table in mydb would have only `b=1` policy applied for the user. From 93143b61169e751b256acc1ef83bc71470c56704 Mon Sep 17 00:00:00 2001 From: Tyler Hannan Date: Thu, 29 Aug 2024 19:11:41 +0200 Subject: [PATCH 16/16] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index d12f1a6ff37..652f5e0751d 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ Other upcoming meetups * [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31 * [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19 * [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21 +* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26