diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index c0ea848fea8..468463a0504 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -369,6 +369,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B many_data, counter++); }); + if (skip_merging) + { + pipeline.addSimpleTransform([&](const Block & header) + { return std::make_shared(header, transform_params); }); + pipeline.resize(merge_threads); + aggregating_in_order = collector.detachProcessors(0); + return; + } + aggregating_in_order = collector.detachProcessors(0); auto transform = std::make_shared( diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index addfcb508ce..55aae54c427 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -353,12 +353,7 @@ Pipe ReadFromMergeTree::read( size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache) { if (read_type == ReadType::Default && max_streams > 1) - { - Pipe pipe = readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache); - if (output_each_partition_through_separate_port) - pipe.resize(1); - return pipe; - } + return readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache); auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0); @@ -468,9 +463,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsImpl( info.use_uncompressed_cache); } -Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( - RangesInDataParts && parts_with_ranges, - const Names & column_names) +template +static Pipe runForEachPartition( + RangesInDataParts && parts_with_ranges, size_t requested_num_streams, bool output_each_partition_through_separate_port, ReadFunc read) { if (parts_with_ranges.empty()) return {}; @@ -479,7 +474,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( if (!output_each_partition_through_separate_port) { - return spreadMarkRangesAmongStreamsImpl(std::move(parts_with_ranges), column_names, num_streams); + return read(std::move(parts_with_ranges), num_streams); } else { @@ -487,7 +482,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( LOG_DEBUG( &Poco::Logger::get("debug"), - "spreadMarkRangesAmongStreams {} {} {}", + "{} {} {} {}", + demangle(typeid(ReadFunc).name()).substr(0, 60), parts_with_ranges.size(), requested_num_streams, countPartitions(parts_with_ranges)); @@ -500,12 +496,19 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( parts_with_ranges.end(), [&begin](auto & part) { return begin->data_part->info.partition_id != part.data_part->info.partition_id; }); - LOG_DEBUG(&Poco::Logger::get("debug"), "spreadMarkRangesAmongStreams {} {}", begin->data_part->info.partition_id, end - begin); + LOG_DEBUG( + &Poco::Logger::get("debug"), + "{} {} {}", + demangle(typeid(ReadFunc).name()).substr(0, 60), + begin->data_part->info.partition_id, + end - begin); RangesInDataParts partition_parts; partition_parts.insert(partition_parts.end(), std::make_move_iterator(begin), std::make_move_iterator(end)); - pipes.emplace_back(spreadMarkRangesAmongStreamsImpl(std::move(partition_parts), column_names, num_streams)); + pipes.emplace_back(read(std::move(partition_parts), num_streams)); + if (!pipes.back().empty()) + pipes.back().resize(1); begin = end; } @@ -514,6 +517,15 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( } } +Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_with_ranges, const Names & column_names) +{ + auto read = [this, &column_names](RangesInDataParts && parts_with_ranges_, size_t num_streams_) + { + return spreadMarkRangesAmongStreamsImpl(std::move(parts_with_ranges_), column_names, num_streams_); + }; + return runForEachPartition(std::move(parts_with_ranges), requested_num_streams, output_each_partition_through_separate_port, read); +} + static ActionsDAGPtr createProjection(const Block & header) { auto projection = std::make_shared(header.getNamesAndTypesList()); @@ -522,41 +534,21 @@ static ActionsDAGPtr createProjection(const Block & header) return projection; } -Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( +Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrderImpl( RangesInDataParts && parts_with_ranges, const Names & column_names, - ActionsDAGPtr & out_projection, - const InputOrderInfoPtr & input_order_info) + const InputOrderInfoPtr & input_order_info, + size_t num_streams, + bool need_preliminary_merge) { const auto & settings = context->getSettingsRef(); const auto data_settings = data.getSettings(); PartRangesReadInfo info(parts_with_ranges, settings, *data_settings); - Pipes res; - if (info.sum_marks == 0) return {}; - /// PREWHERE actions can remove some input columns (which are needed only for prewhere condition). - /// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key. - /// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting. - /// See 02354_read_in_order_prewhere.sql as an example. - bool have_input_columns_removed_after_prewhere = false; - if (prewhere_info && prewhere_info->prewhere_actions) - { - auto & outputs = prewhere_info->prewhere_actions->getOutputs(); - std::unordered_set outputs_set(outputs.begin(), outputs.end()); - for (const auto * input : prewhere_info->prewhere_actions->getInputs()) - { - if (!outputs_set.contains(input)) - { - outputs.push_back(input); - have_input_columns_removed_after_prewhere = true; - } - } - } - /// Let's split ranges to avoid reading much data. auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size] (const auto & ranges, int direction) @@ -603,8 +595,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( return new_ranges; }; - const size_t min_marks_per_stream = (info.sum_marks - 1) / requested_num_streams + 1; - bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold); + const size_t min_marks_per_stream = (info.sum_marks - 1) / num_streams + 1; Pipes pipes; @@ -686,7 +677,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( if (!pipes.empty()) pipe_header = pipes.front().getHeader(); - if (need_preliminary_merge) + if (need_preliminary_merge || output_each_partition_through_separate_port) { size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size; auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone(); @@ -705,32 +696,79 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( auto sorting_key_expr = std::make_shared(sorting_key_prefix_expr); - for (auto & pipe : pipes) + auto add_merge = [&](Pipe & pipe) { pipe.addSimpleTransform([sorting_key_expr](const Block & header) - { - return std::make_shared(header, sorting_key_expr); - }); + { return std::make_shared(header, sorting_key_expr); }); if (pipe.numOutputPorts() > 1) { auto transform = std::make_shared( - pipe.getHeader(), - pipe.numOutputPorts(), - sort_description, - max_block_size, - SortingQueueStrategy::Batch); + pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, SortingQueueStrategy::Batch); pipe.addTransform(std::move(transform)); } + }; + + if (need_preliminary_merge) + for (auto & pipe : pipes) + add_merge(pipe); + + Pipe pipe = Pipe::unitePipes(std::move(pipes)); + + if (output_each_partition_through_separate_port) + add_merge(pipe); + + return pipe; + } + + return Pipe::unitePipes(std::move(pipes)); +} + +Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( + RangesInDataParts && parts_with_ranges, + const Names & column_names, + ActionsDAGPtr & out_projection, + const InputOrderInfoPtr & input_order_info) +{ + if (parts_with_ranges.empty()) + return {}; + + /// PREWHERE actions can remove some input columns (which are needed only for prewhere condition). + /// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key. + /// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting. + /// See 02354_read_in_order_prewhere.sql as an example. + bool have_input_columns_removed_after_prewhere = false; + if (prewhere_info && prewhere_info->prewhere_actions) + { + auto & outputs = prewhere_info->prewhere_actions->getOutputs(); + std::unordered_set outputs_set(outputs.begin(), outputs.end()); + for (const auto * input : prewhere_info->prewhere_actions->getInputs()) + { + if (!outputs_set.contains(input)) + { + outputs.push_back(input); + have_input_columns_removed_after_prewhere = true; + } } } - if (!pipes.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere)) - /// Drop temporary columns, added by 'sorting_key_prefix_expr' - out_projection = createProjection(pipe_header); + const auto & settings = context->getSettingsRef(); + bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold); - return Pipe::unitePipes(std::move(pipes)); + auto read + = [this, &column_names, &input_order_info, need_preliminary_merge](RangesInDataParts && parts_with_ranges_, size_t num_streams_) + { + return spreadMarkRangesAmongStreamsWithOrderImpl( + std::move(parts_with_ranges_), column_names, input_order_info, num_streams_, need_preliminary_merge); + }; + Pipe pipe = runForEachPartition(std::move(parts_with_ranges), requested_num_streams, output_each_partition_through_separate_port, read); + + if (!pipe.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere)) + /// Drop temporary columns, added by 'sorting_key_prefix_expr' + out_projection = createProjection(pipe.getHeader()); + + return pipe; } static void addMergingFinal( @@ -1269,14 +1307,11 @@ void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, output_stream->sort_description = std::move(sort_description); output_stream->sort_scope = DataStream::SortScope::Stream; } - - /// Not supported currently. Disable optimisation. - output_each_partition_through_separate_port = false; } bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort() { - if (isQueryWithFinal() || query_info.getInputOrderInfo()) + if (isQueryWithFinal()) return false; output_each_partition_through_separate_port = true; @@ -1377,9 +1412,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons } else if (input_order_info) { - if (output_each_partition_through_separate_port) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used when reading in order is used"); - pipe = spreadMarkRangesAmongStreamsWithOrder( std::move(result.parts_with_ranges), column_names_to_read, diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 1599e36fc41..1e7a1696a30 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -238,6 +238,13 @@ private: ActionsDAGPtr & out_projection, const InputOrderInfoPtr & input_order_info); + Pipe spreadMarkRangesAmongStreamsWithOrderImpl( + RangesInDataParts && parts_with_ranges, + const Names & column_names, + const InputOrderInfoPtr & input_order_info, + size_t num_streams, + bool need_preliminary_merge); + Pipe spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, const Names & column_names, diff --git a/tests/performance/aggregation_by_partitions.xml b/tests/performance/aggregation_by_partitions.xml index db6bbb8f215..263c6d211bd 100644 --- a/tests/performance/aggregation_by_partitions.xml +++ b/tests/performance/aggregation_by_partitions.xml @@ -3,11 +3,13 @@ - create table t(a UInt32) engine=MergeTree order by tuple() partition by a % 16 + create table t(a UInt32) engine=MergeTree order by a partition by a % 16 insert into t select * from numbers_mt(5e7) select a from t group by a format Null + select a from t group by a format Null settings optimize_aggregation_in_order = 1 + drop table t diff --git a/tests/queries/0_stateless/02521_aggregation_by_partitions.reference b/tests/queries/0_stateless/02521_aggregation_by_partitions.reference index 2670ad4212b..475e1867d03 100644 --- a/tests/queries/0_stateless/02521_aggregation_by_partitions.reference +++ b/tests/queries/0_stateless/02521_aggregation_by_partitions.reference @@ -81,3 +81,116 @@ ExpressionTransform × 16 Concat 2 → 1 MergeTreeInOrder × 2 0 → 1 1000000 +(Expression) +ExpressionTransform × 16 + (Aggregating) + Resize 4 → 16 + FinalizeAggregatedTransform × 4 + AggregatingInOrderTransform × 4 + (Expression) + ExpressionTransform × 4 + (ReadFromMergeTree) + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 +1000000 +(Expression) +ExpressionTransform × 16 + (Aggregating) + Resize 8 → 16 + FinalizeAggregatedTransform × 8 + AggregatingInOrderTransform × 8 + (Expression) + ExpressionTransform × 8 + (ReadFromMergeTree) + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 +1000000 +(Expression) +ExpressionTransform × 16 + (Aggregating) + FinalizeAggregatedTransform × 16 + AggregatingInOrderTransform × 16 + (Expression) + ExpressionTransform × 16 + (ReadFromMergeTree) + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 +1000000 diff --git a/tests/queries/0_stateless/02521_aggregation_by_partitions.sql b/tests/queries/0_stateless/02521_aggregation_by_partitions.sql index ed880ef8631..33127a0233a 100644 --- a/tests/queries/0_stateless/02521_aggregation_by_partitions.sql +++ b/tests/queries/0_stateless/02521_aggregation_by_partitions.sql @@ -40,3 +40,46 @@ select count() from (select throwIf(count() != 2) from t3 group by a); select throwIf(count() != 4) from remote('127.0.0.{1,2}', currentDatabase(), t3) group by a format Null; drop table t3; + +-- aggregation in order -- + +set optimize_aggregation_in_order = 1; + +create table t4(a UInt32) engine=MergeTree order by a partition by a % 4; + +system stop merges t4; + +insert into t4 select number from numbers_mt(1e6); +insert into t4 select number from numbers_mt(1e6); + +explain pipeline select a from t4 group by a settings read_in_order_two_level_merge_threshold = 1e12; + +select count() from (select throwIf(count() != 2) from t4 group by a); + +drop table t4; + +create table t5(a UInt32) engine=MergeTree order by a partition by a % 8; + +system stop merges t5; + +insert into t5 select number from numbers_mt(1e6); +insert into t5 select number from numbers_mt(1e6); + +explain pipeline select a from t5 group by a settings read_in_order_two_level_merge_threshold = 1e12; + +select count() from (select throwIf(count() != 2) from t5 group by a); + +drop table t5; + +create table t6(a UInt32) engine=MergeTree order by a partition by a % 16; + +system stop merges t6; + +insert into t6 select number from numbers_mt(1e6); +insert into t6 select number from numbers_mt(1e6); + +explain pipeline select a from t6 group by a settings read_in_order_two_level_merge_threshold = 1e12; + +select count() from (select throwIf(count() != 2) from t6 group by a); + +drop table t6;