diff --git a/src/Core/ColumnNumbers.h b/src/Core/ColumnNumbers.h index 9441f6485a7..82868ef3cd3 100644 --- a/src/Core/ColumnNumbers.h +++ b/src/Core/ColumnNumbers.h @@ -8,5 +8,6 @@ namespace DB { using ColumnNumbers = std::vector; +using ColumnNumbersTwoDimension = std::vector; } diff --git a/src/Core/NamesAndTypes.h b/src/Core/NamesAndTypes.h index 58b5189db63..787dd629190 100644 --- a/src/Core/NamesAndTypes.h +++ b/src/Core/NamesAndTypes.h @@ -107,6 +107,8 @@ public: std::optional tryGetByName(const std::string & name) const; }; +using TwoDimensionNamesAndTypesList = std::list; + } namespace std diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 3c53769e128..91eca14d483 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -879,6 +879,7 @@ public: /// What to count. const ColumnNumbers keys; + const ColumnNumbersTwoDimension keys_vector; const AggregateDescriptions aggregates; const size_t keys_size; const size_t aggregates_size; @@ -938,6 +939,27 @@ public: { } + /// two dimensional vector of aggregating keys in params + Params( + const Block & src_header_, + const ColumnNumbersTwoDimension & keys_vector_, const AggregateDescriptions & aggregates_, + bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, + size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, + size_t max_bytes_before_external_group_by_, + bool empty_result_for_aggregation_by_empty_set_, + VolumePtr tmp_volume_, size_t max_threads_, + size_t min_free_disk_space_) + : src_header(src_header_), + keys_vector(keys_vector_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), + overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), + group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), + max_bytes_before_external_group_by(max_bytes_before_external_group_by_), + empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_), + tmp_volume(tmp_volume_), max_threads(max_threads_), + min_free_disk_space(min_free_disk_space_) + { + } + /// Only parameters that matter during merge. Params(const Block & intermediate_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index d40a92972b2..c09bf92a7da 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -345,6 +345,12 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) if (getContext()->getSettingsRef().enable_positional_arguments) replaceForPositionalArguments(group_asts[i], select_query, ASTSelectQuery::Expression::GROUP_BY); + if (select_query->group_by_with_grouping_sets) + { + LOG_DEBUG(poco_log, "analyzeAggregation: detect group by with grouping sets"); + /// TODO + } + getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false); const auto & column_name = group_asts[i]->getColumnName(); @@ -380,6 +386,11 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions) /// Aggregation keys are uniqued. if (!unique_keys.count(key.name)) { + if (select_query->group_by_with_grouping_sets) + { + aggregation_keys_list.push_back({key}); + } + unique_keys.insert(key.name); aggregation_keys.push_back(key); diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index b6bb3c5fad5..7598efa40c6 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -65,6 +65,7 @@ struct ExpressionAnalyzerData bool has_aggregation = false; NamesAndTypesList aggregation_keys; bool has_const_aggregation_keys = false; + TwoDimensionNamesAndTypesList aggregation_keys_list; AggregateDescriptions aggregate_descriptions; WindowDescriptions window_descriptions; @@ -94,6 +95,8 @@ private: explicit ExtractedSettings(const Settings & settings_); }; + Poco::Logger * poco_log = &Poco::Logger::get("ExpressionAnalyzer"); + public: /// Ctor for non-select queries. Generally its usage is: /// auto actions = ExpressionAnalyzer(query, syntax, context).getActions(); @@ -321,6 +324,7 @@ public: const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; } bool hasConstAggregationKeys() const { return has_const_aggregation_keys; } + const TwoDimensionNamesAndTypesList & aggregationKeysList() const { return aggregation_keys_list; } const AggregateDescriptions & aggregates() const { return aggregate_descriptions; } const PreparedSets & getPreparedSets() const { return prepared_sets; } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 8d824d85d37..705191e3ef8 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1182,8 +1182,18 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

aggregationKeys()) + { keys.push_back(header_before_aggregation.getPositionByName(key.name)); + LOG_DEBUG(log, "executeAggregation pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name)); + } AggregateDescriptions aggregates = query_analyzer->aggregates(); for (auto & descr : aggregates) @@ -2203,14 +2214,18 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific query_plan.addStep(std::move(step)); } -void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan) +void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info) { + /* const auto & header_before_transform = query_plan.getCurrentDataStream().header; ColumnNumbers keys; for (const auto & key : query_analyzer->aggregationKeys()) + { keys.push_back(header_before_transform.getPositionByName(key.name)); + LOG_DEBUG(log, "executeGroupingSets pushed back key with name {} and number {}", key.name, header_before_transform.getPositionByName(key.name)); + } const Settings & settings = context->getSettingsRef(); @@ -2226,6 +2241,87 @@ void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan) step = std::make_unique(query_plan.getCurrentDataStream(), std::move(transform_params)); query_plan.addStep(std::move(step)); + */ + + auto expression_before_aggregation = std::make_unique(query_plan.getCurrentDataStream(), expression); + expression_before_aggregation->setStepDescription("Before GROUP BY"); + query_plan.addStep(std::move(expression_before_aggregation)); + + const auto & header_before_aggregation = query_plan.getCurrentDataStream().header; + ColumnNumbers keys; + ColumnNumbersTwoDimension keys_vector; + for (const auto & aggregation_keys : query_analyzer->aggregationKeysList()) + { + keys.clear(); + for (const auto & key : aggregation_keys) + { + keys.push_back(header_before_aggregation.getPositionByName(key.name)); + LOG_DEBUG( + log, + "executeGroupingSets add key with name {} and number {}", + key.name, + header_before_aggregation.getPositionByName(key.name)); + } + keys_vector.push_back(keys); + } + + AggregateDescriptions aggregates = query_analyzer->aggregates(); + for (auto & descr : aggregates) + if (descr.arguments.empty()) + for (const auto & name : descr.argument_names) + { + descr.arguments.push_back(header_before_aggregation.getPositionByName(name)); + LOG_DEBUG( + log, + "executeGroupingSets add descr.atruments with name {} and number {}", + name, + header_before_aggregation.getPositionByName(name)); + } + + const Settings & settings = context->getSettingsRef(); + + Aggregator::Params params( + header_before_aggregation, + keys_vector, + aggregates, + overflow_row, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data); + + SortDescription group_by_sort_description; + + if (group_by_info && settings.optimize_aggregation_in_order) + group_by_sort_description = getSortDescriptionFromGroupBy(getSelectQuery()); + else + group_by_info = nullptr; + + auto merge_threads = max_streams; + auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads + ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : static_cast(settings.max_threads); + + bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead(); + + auto aggregating_step = std::make_unique( + query_plan.getCurrentDataStream(), + params, + final, + settings.max_block_size, + settings.aggregation_in_order_max_block_bytes, + merge_threads, + temporary_data_merge_threads, + storage_has_evenly_distributed_read, + std::move(group_by_info), + std::move(group_by_sort_description)); + + query_plan.addStep(std::move(aggregating_step)); } void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description) diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index 8ea04cf7b27..db77a8bd8cb 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -14,6 +14,7 @@ #include #include +#include "Interpreters/ActionsDAG.h" namespace Poco { @@ -160,7 +161,7 @@ private: }; void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator); - void executeGroupingSets(QueryPlan & query_plan); + void executeGroupingSets(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info); /** If there is a SETTINGS section in the SELECT query, then apply settings from it. * diff --git a/src/Processors/Transforms/GroupingSetsTransform.cpp b/src/Processors/Transforms/GroupingSetsTransform.cpp index 637f244ff96..ccbafac8e3e 100644 --- a/src/Processors/Transforms/GroupingSetsTransform.cpp +++ b/src/Processors/Transforms/GroupingSetsTransform.cpp @@ -19,6 +19,7 @@ GroupingSetsTransform::GroupingSetsTransform(Block header, AggregatingTransformP Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final) { + LOG_DEBUG(log, "merge {} blocks", chunks.size()); BlocksList rollup_blocks; for (auto & chunk : chunks) rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); @@ -31,12 +32,15 @@ Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final) void GroupingSetsTransform::consume(Chunk chunk) { consumed_chunks.emplace_back(std::move(chunk)); + LOG_DEBUG(log, "consumed block, now consumed_chunks size is {}", consumed_chunks.size()); } Chunk GroupingSetsTransform::generate() { + LOG_DEBUG(log, "generate start, mask = {}", mask); if (!consumed_chunks.empty()) { + LOG_DEBUG(log, "consumed_chunks not empty, size is {}", consumed_chunks.size()); if (consumed_chunks.size() > 1) grouping_sets_chunk = merge(std::move(consumed_chunks), false); else @@ -46,6 +50,7 @@ Chunk GroupingSetsTransform::generate() auto num_rows = grouping_sets_chunk.getNumRows(); mask = (UInt64(1) << keys.size()); + LOG_DEBUG(log, "changed mask, mask = {}", mask); current_columns = grouping_sets_chunk.getColumns(); current_zero_columns.clear(); @@ -56,9 +61,10 @@ Chunk GroupingSetsTransform::generate() } // auto gen_chunk = std::move(cube_chunk); - + LOG_DEBUG(log, "before if mask"); if (mask > 1) { + LOG_DEBUG(log, "in if mask > 1"); mask = mask >> 1; auto columns = current_columns; @@ -72,6 +78,7 @@ Chunk GroupingSetsTransform::generate() chunks.emplace_back(std::move(columns), current_columns.front()->size()); grouping_sets_chunk = merge(std::move(chunks), false); } + LOG_DEBUG(log, "before gen_chunk"); auto gen_chunk = std::move(grouping_sets_chunk); finalizeChunk(gen_chunk); diff --git a/src/Processors/Transforms/GroupingSetsTransform.h b/src/Processors/Transforms/GroupingSetsTransform.h index 0e300453215..10108d3c1a7 100644 --- a/src/Processors/Transforms/GroupingSetsTransform.h +++ b/src/Processors/Transforms/GroupingSetsTransform.h @@ -29,6 +29,8 @@ private: UInt64 mask = 0; + Poco::Logger * log = &Poco::Logger::get("GroupingSetsTransform"); + Chunk merge(Chunks && chunks, bool final); }; diff --git a/src/Processors/Transforms/TotalsHavingTransform.cpp b/src/Processors/Transforms/TotalsHavingTransform.cpp index 0b7797da24f..a636ba70ef9 100644 --- a/src/Processors/Transforms/TotalsHavingTransform.cpp +++ b/src/Processors/Transforms/TotalsHavingTransform.cpp @@ -119,6 +119,7 @@ TotalsHavingTransform::TotalsHavingTransform( IProcessor::Status TotalsHavingTransform::prepare() { + LOG_DEBUG(log, "TotalsHavingTransform::prepare()"); if (!finished_transform) { auto status = ISimpleTransform::prepare(); @@ -143,20 +144,24 @@ IProcessor::Status TotalsHavingTransform::prepare() totals_output.push(std::move(totals)); totals_output.finish(); + LOG_DEBUG(log, "exit TotalsHavingTransform::prepare()"); return Status::Finished; } void TotalsHavingTransform::work() { + LOG_DEBUG(log, "TotalsHavingTransform::work()"); if (finished_transform) prepareTotals(); else ISimpleTransform::work(); + LOG_DEBUG(log, "exit TotalsHavingTransform::work()"); } void TotalsHavingTransform::transform(Chunk & chunk) { /// Block with values not included in `max_rows_to_group_by`. We'll postpone it. + LOG_DEBUG(log, "TotalsHavingTransform::transform()"); if (overflow_row) { const auto & info = chunk.getChunkInfo(); @@ -249,10 +254,12 @@ void TotalsHavingTransform::transform(Chunk & chunk) } passed_keys += chunk.getNumRows(); + LOG_DEBUG(log, "exit TotalsHavingTransform::transform()"); } void TotalsHavingTransform::addToTotals(const Chunk & chunk, const IColumn::Filter * filter) { + LOG_DEBUG(log, "TotalsHavingTransform::addToTotals()"); auto num_columns = chunk.getNumColumns(); for (size_t col = 0; col < num_columns; ++col) { @@ -284,10 +291,12 @@ void TotalsHavingTransform::addToTotals(const Chunk & chunk, const IColumn::Filt } } } + LOG_DEBUG(log, "exit TotalsHavingTransform::addToTotals()"); } void TotalsHavingTransform::prepareTotals() { + LOG_DEBUG(log, "TotalsHavingTransform::prepareTotals()"); /// If totals_mode == AFTER_HAVING_AUTO, you need to decide whether to add aggregates to TOTALS for strings, /// not passed max_rows_to_group_by. if (overflow_aggregates) @@ -312,6 +321,7 @@ void TotalsHavingTransform::prepareTotals() /// Note: after expression totals may have several rows if `arrayJoin` was used in expression. totals = Chunk(block.getColumns(), num_rows); } + LOG_DEBUG(log, "exit TotalsHavingTransform::prepareTotals()"); } } diff --git a/src/Processors/Transforms/TotalsHavingTransform.h b/src/Processors/Transforms/TotalsHavingTransform.h index 03635054c65..59fa58edfbc 100644 --- a/src/Processors/Transforms/TotalsHavingTransform.h +++ b/src/Processors/Transforms/TotalsHavingTransform.h @@ -2,6 +2,8 @@ #include #include +#include // to be removed +// #include namespace DB { @@ -74,6 +76,8 @@ private: /// Here, total values are accumulated. After the work is finished, they will be placed in totals. MutableColumns current_totals; + + Poco::Logger * log = &Poco::Logger::get("TotalsHavingTransform"); }; void finalizeChunk(Chunk & chunk);