diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 28147cec58e..995f25816bb 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -967,7 +968,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

QueryProcessingStage::WithMergeableState && - !query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube; + !query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube && !query.group_by_with_grouping_sets; // if (query.group_by_with_grouping_sets && query.group_by_with_totals) // throw Exception("WITH TOTALS and GROUPING SETS are not supported together", ErrorCodes::NOT_IMPLEMENTED); @@ -1262,6 +1263,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

aggregationKeys()) - keys.push_back(header_before_transform.getPositionByName(key.name)); + ColumnNumbers all_keys; + ColumnNumbersTwoDimension keys_vector; + auto & query = getSelectQuery(); + if (query.group_by_with_grouping_sets) + { + std::set keys_set; + for (const auto & aggregation_keys : query_analyzer->aggregationKeysList()) + { + keys.clear(); + for (const auto & key : aggregation_keys) + { + size_t key_name_pos = header_before_transform.getPositionByName(key.name); + if (!keys_set.count(key_name_pos)) + { + keys_set.insert(key_name_pos); + } + keys.push_back(key_name_pos); + } + keys_vector.push_back(keys); + LOG_DEBUG( + log, + "execute aggregation with grouping sets add keys set of size {}", + keys.size()); + } + all_keys.assign(keys_set.begin(), keys_set.end()); + LOG_DEBUG( + log, + "execute aggregation with grouping sets add all keys of size {}", + all_keys.size()); + } + else + { + for (const auto & key : query_analyzer->aggregationKeys()) + { + keys.push_back(header_before_transform.getPositionByName(key.name)); + } + } const Settings & settings = context->getSettingsRef(); - Aggregator::Params params( - header_before_transform, - keys, - query_analyzer->aggregates(), - false, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - 0, - 0, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data, - settings.compile_aggregate_expressions, - settings.min_count_to_compile_aggregate_expression); + std::shared_ptr params_ptr; + if (query.group_by_with_grouping_sets) + { + params_ptr = std::make_shared( + header_before_transform, + all_keys, + keys_vector, + query_analyzer->aggregates(), + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + 0, + 0, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + settings.compile_aggregate_expressions, + settings.min_count_to_compile_aggregate_expression); + } + else + { + params_ptr = std::make_shared( + header_before_transform, + keys, + query_analyzer->aggregates(), + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + 0, + 0, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + settings.compile_aggregate_expressions, + settings.min_count_to_compile_aggregate_expression); + } - auto transform_params = std::make_shared(params, true); + auto transform_params = std::make_shared(*params_ptr, true); QueryPlanStepPtr step; if (modificator == Modificator::ROLLUP) step = std::make_unique(query_plan.getCurrentDataStream(), std::move(transform_params)); - else + else if (modificator == Modificator::CUBE) step = std::make_unique(query_plan.getCurrentDataStream(), std::move(transform_params)); + else + step = std::make_unique(query_plan.getCurrentDataStream(), std::move(transform_params)); query_plan.addStep(std::move(step)); } diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index 34d56a7354b..71a8ac5e1d5 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -157,7 +157,8 @@ private: enum class Modificator { ROLLUP = 0, - CUBE = 1 + CUBE = 1, + GROUPING_SETS = 2 }; void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator); diff --git a/src/Processors/QueryPlan/GroupingSetsStep.cpp b/src/Processors/QueryPlan/GroupingSetsStep.cpp new file mode 100644 index 00000000000..383ecf5ea1b --- /dev/null +++ b/src/Processors/QueryPlan/GroupingSetsStep.cpp @@ -0,0 +1,46 @@ +#include +#include +#include "QueryPipeline/QueryPipelineBuilder.h" + +namespace DB +{ + +static ITransformingStep::Traits getTraits() +{ + return ITransformingStep::Traits + { + { + .preserves_distinct_columns = false, + .returns_single_stream = true, + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +GroupingSetsStep::GroupingSetsStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) + : ITransformingStep(input_stream_, params_->getHeader(), getTraits()) + , params(std::move(params_)) +{ + /// Aggregation keys are distinct + for (auto key : params->params.keys) + output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name); +} + +void GroupingSetsStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + pipeline.resize(1); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == QueryPipelineBuilder::StreamType::Totals) + return nullptr; + + return std::make_shared(header, std::move(params)); + }); +} + +} diff --git a/src/Processors/QueryPlan/GroupingSetsStep.h b/src/Processors/QueryPlan/GroupingSetsStep.h new file mode 100644 index 00000000000..3dc7c3b5a99 --- /dev/null +++ b/src/Processors/QueryPlan/GroupingSetsStep.h @@ -0,0 +1,25 @@ +#pragma once +#include +#include +#include "QueryPipeline/QueryPipelineBuilder.h" + +namespace DB +{ + +struct AggregatingTransformParams; +using AggregatingTransformParamsPtr = std::shared_ptr; + +class GroupingSetsStep : public ITransformingStep +{ +public: + GroupingSetsStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_); + + String getName() const override { return "GroupingSets"; } + + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + +private: + AggregatingTransformParamsPtr params; +}; + +} diff --git a/src/Processors/Transforms/GroupingSetsTransform.cpp b/src/Processors/Transforms/GroupingSetsTransform.cpp new file mode 100644 index 00000000000..9672b8e32c2 --- /dev/null +++ b/src/Processors/Transforms/GroupingSetsTransform.cpp @@ -0,0 +1,76 @@ +#include +#include + +namespace DB +{ + +GroupingSetsTransform::GroupingSetsTransform(Block header, AggregatingTransformParamsPtr params_) + : IAccumulatingTransform(std::move(header), params_->getHeader()) + , params(std::move(params_)) + , keys(params->params.keys) + , keys_vector(params->params.keys_vector) + , keys_vector_idx(0) +{ +} + +void GroupingSetsTransform::consume(Chunk chunk) +{ + consumed_chunks.emplace_back(std::move(chunk)); +} + +Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final) +{ + BlocksList grouping_sets_blocks; + for (auto & chunk : chunks) + grouping_sets_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); + + auto grouping_sets_block = params->aggregator.mergeBlocks(grouping_sets_blocks, final); + auto num_rows = grouping_sets_block.rows(); + return Chunk(grouping_sets_block.getColumns(), num_rows); +} + +Chunk GroupingSetsTransform::generate() +{ + if (!consumed_chunks.empty()) + { + if (consumed_chunks.size() > 1) + grouping_sets_chunk = merge(std::move(consumed_chunks), false); + else + grouping_sets_chunk = std::move(consumed_chunks.front()); + + consumed_chunks.clear(); + + auto num_rows = grouping_sets_chunk.getNumRows(); + + current_columns = grouping_sets_chunk.getColumns(); + current_zero_columns.clear(); + + for (auto key : keys) + current_zero_columns.emplace(key, current_columns[key]->cloneEmpty()->cloneResized(num_rows)); + } + + Chunk gen_chunk; + + if (keys_vector_idx < keys_vector.size()) + { + auto columns = current_columns; + std::set key_vector(keys_vector[keys_vector_idx].begin(), keys_vector[keys_vector_idx].end()); + + for (auto key : keys) + { + if (!key_vector.contains(key)) + columns[key] = current_zero_columns[key]; + } + + Chunks chunks; + chunks.emplace_back(std::move(columns), current_columns.front()->size()); + gen_chunk = merge(std::move(chunks), false); + + ++keys_vector_idx; + } + + finalizeChunk(gen_chunk); + return gen_chunk; +} + +} diff --git a/src/Processors/Transforms/GroupingSetsTransform.h b/src/Processors/Transforms/GroupingSetsTransform.h new file mode 100644 index 00000000000..4019b2ffa52 --- /dev/null +++ b/src/Processors/Transforms/GroupingSetsTransform.h @@ -0,0 +1,35 @@ +#pragma once +#include +#include + +namespace DB +{ + +class GroupingSetsTransform : public IAccumulatingTransform +{ +public: + GroupingSetsTransform(Block header, AggregatingTransformParamsPtr params); + String getName() const override { return "GroupingSetsTransform"; } + +protected: + void consume(Chunk chunk) override; + Chunk generate() override; + +private: + AggregatingTransformParamsPtr params; + ColumnNumbers keys; + ColumnNumbersTwoDimension keys_vector; + + Chunks consumed_chunks; + Chunk grouping_sets_chunk; + Columns current_columns; + std::unordered_map current_zero_columns; + + UInt64 keys_vector_idx = 0; + + Poco::Logger * log = &Poco::Logger::get("GroupingSetsTransform"); + + Chunk merge(Chunks && chunks, bool final); +}; + +} diff --git a/tests/queries/0_stateless/01883_with_grouping_sets.reference b/tests/queries/0_stateless/01883_with_grouping_sets.reference index fd128444ebc..258fa6f5abd 100644 --- a/tests/queries/0_stateless/01883_with_grouping_sets.reference +++ b/tests/queries/0_stateless/01883_with_grouping_sets.reference @@ -1,20 +1,69 @@ - 1 40 4 - 2 80 4 -a 0 70 4 -b 0 50 4 - 1 40 4 - 2 80 4 -a 0 70 4 -b 0 50 4 +1 0 1 4500 +1 0 3 4700 +1 0 5 4900 +1 0 7 5100 +1 0 9 5300 +1 1 0 4500 +1 2 0 5100 +1 3 0 4700 +1 4 0 5300 +1 5 0 4900 +2 0 2 4600 +2 0 4 4800 +2 0 6 5000 +2 0 8 5200 +2 0 10 5400 +2 1 0 5000 +2 2 0 4600 +2 3 0 5200 +2 4 0 4800 +2 5 0 5400 - 0 240 16 - 1 40 4 - 2 80 4 -a 0 70 4 -b 0 50 4 - 1 40 4 - 2 80 4 -a 0 70 4 -b 0 50 4 +0 0 1 1 4500 +0 0 2 2 4600 +0 0 3 3 4700 +0 0 4 4 4800 +0 0 5 5 4900 +0 0 6 6 5000 +0 0 7 7 5100 +0 0 8 8 5200 +0 0 9 9 5300 +0 0 10 10 5400 +1 1 0 0 4500 +1 2 0 0 5100 +1 3 0 0 4700 +1 4 0 0 5300 +1 5 0 0 4900 +2 1 0 0 5000 +2 2 0 0 4600 +2 3 0 0 5200 +2 4 0 0 4800 +2 5 0 0 5400 - 0 240 16 +1 0 24500 +1 1 4500 +1 3 4700 +1 5 4900 +1 7 5100 +1 9 5300 +2 0 25000 +2 2 4600 +2 4 4800 +2 6 5000 +2 8 5200 +2 10 5400 +0 0 49500 + +1 0 24500 +1 1 4500 +1 3 4700 +1 5 4900 +1 7 5100 +1 9 5300 +2 0 25000 +2 2 4600 +2 4 4800 +2 6 5000 +2 8 5200 +2 10 5400 +0 0 49500 diff --git a/tests/queries/0_stateless/01883_with_grouping_sets.sql b/tests/queries/0_stateless/01883_with_grouping_sets.sql index 6270b772328..274eebef5ed 100644 --- a/tests/queries/0_stateless/01883_with_grouping_sets.sql +++ b/tests/queries/0_stateless/01883_with_grouping_sets.sql @@ -1,20 +1,38 @@ DROP TABLE IF EXISTS grouping_sets; -CREATE TABLE grouping_sets(a String, b Int32, s Int32) ENGINE = Memory; -INSERT INTO grouping_sets VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20); -INSERT INTO grouping_sets VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5); -INSERT INTO grouping_sets VALUES ('b', 2, 20), ('b', 2, 15); +CREATE TABLE grouping_sets(fact_1_id Int32, fact_2_id Int32, fact_3_id Int32, fact_4_id Int32, sales_value Int32) ENGINE = Memory; -SELECT a, b, sum(s), count() from grouping_sets GROUP BY GROUPING SETS(a, b) ORDER BY a, b; +INSERT INTO grouping_sets +SELECT + number % 2 + 1 AS fact_1_id, + number % 5 + 1 AS fact_2_id, + number % 10 + 1 AS fact_3_id, + number % 10 + 1 AS fact_4_id, + number % 100 AS sales_value +FROM system.numbers limit 1000; -SELECT a, b, sum(s), count() from grouping_sets GROUP BY GROUPING SETS(a, b) WITH TOTALS ORDER BY a, b; +SELECT fact_1_id, fact_2_id, fact_3_id, SUM(sales_value) AS sales_value from grouping_sets +GROUP BY GROUPING SETS((fact_1_id, fact_2_id), (fact_1_id, fact_3_id)) +ORDER BY fact_1_id, fact_2_id, fact_3_id; -SELECT a, b, sum(s), count() from grouping_sets GROUP BY a, b WITH GROUPING SETS ORDER BY a, b; +SELECT fact_1_id, fact_2_id, fact_3_id, fact_4_id, SUM(sales_value) AS sales_value from grouping_sets +GROUP BY GROUPING SETS((fact_1_id, fact_2_id), (fact_3_id, fact_4_id)) +ORDER BY fact_1_id, fact_2_id, fact_3_id, fact_4_id; -SELECT a, b, sum(s), count() from grouping_sets GROUP BY a, b WITH GROUPING SETS WITH TOTALS ORDER BY a, b; +SELECT + fact_1_id, + fact_3_id, + SUM(sales_value) AS sales_value +FROM grouping_sets +GROUP BY grouping sets((fact_1_id), (fact_1_id, fact_3_id)) WITH TOTALS +ORDER BY fact_1_id, fact_3_id; --- not sure that always works --- SET group_by_two_level_threshold = 1; --- SELECT a, b, sum(s), count() from grouping_sets GROUP BY a, b WITH GROUPING SETS ORDER BY a, b; +SELECT + fact_1_id, + fact_3_id, + SUM(sales_value) AS sales_value +FROM grouping_sets +GROUP BY grouping sets(fact_1_id, (fact_1_id, fact_3_id)) WITH TOTALS +ORDER BY fact_1_id, fact_3_id; DROP TABLE grouping_sets;