diff --git a/src/Processors/Chunk.h b/src/Processors/Chunk.h index bb8266d6e27..011d719d094 100644 --- a/src/Processors/Chunk.h +++ b/src/Processors/Chunk.h @@ -82,6 +82,7 @@ public: MutableColumns cloneEmptyColumns() const; const ChunkInfoPtr & getChunkInfo() const { return chunk_info; } + bool hasChunkInfo() const { return chunk_info != nullptr; } void setChunkInfo(ChunkInfoPtr chunk_info_) { chunk_info = std::move(chunk_info_); } UInt64 getNumRows() const { return num_rows; } diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp index 99e3adf893c..f54331d5550 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -25,14 +26,12 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm( size_t num_inputs_, AggregatingTransformParamsPtr params_, SortDescription description_, - size_t max_block_size_, - size_t merge_threads_) + size_t max_block_size_) : header(header_) , num_inputs(num_inputs_) , params(params_) , description(std::move(description_)) , max_block_size(max_block_size_) - , pool(merge_threads_) { /// Replace column names in description to positions. for (auto & column_description : description) @@ -60,12 +59,6 @@ void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge() { - if (finished) - { - auto res = popResult(); - return Status(std::move(res), results.empty()); - } - if (!inputs_to_update.empty()) { Status status(inputs_to_update.back()); @@ -89,13 +82,7 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge() } if (!best_input) - { - aggregate(); - pool.wait(); - finished = true; - auto res = popResult(); - return Status(std::move(res), results.empty()); - } + return Status(prepareToMerge(), true); /// Chunk at best_input will be aggregated entirely. auto & best_state = states[*best_input]; @@ -126,34 +113,20 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge() /// Do not merge blocks, if there are too few rows. if (accumulated_rows >= max_block_size) - aggregate(); + status.chunk = prepareToMerge(); - status.chunk = popResult(); return status; } -Chunk FinishAggregatingInOrderAlgorithm::popResult() -{ - std::lock_guard lock(results_mutex); - - if (results.empty()) - return {}; - - auto res = std::move(results.back()); - results.pop_back(); - return res; -} - -void FinishAggregatingInOrderAlgorithm::aggregate() +Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge() { accumulated_rows = 0; - pool.scheduleOrThrowOnError([this, blocks_list = std::move(blocks)]() mutable - { - auto aggregated = params->aggregator.mergeBlocks(blocks_list, false); + auto info = std::make_shared(); + info->chunks = std::make_unique(std::move(chunks)); - std::lock_guard lock(results_mutex); - results.emplace_back(aggregated.getColumns(), aggregated.rows()); - }); + Chunk chunk; + chunk.setChunkInfo(std::move(info)); + return chunk; } void FinishAggregatingInOrderAlgorithm::addToAggregation() @@ -164,22 +137,25 @@ void FinishAggregatingInOrderAlgorithm::addToAggregation() if (!state.isValid() || state.current_row == state.to_row) continue; - if (state.to_row - state.current_row == state.num_rows) + size_t current_rows = state.to_row - state.current_row; + if (current_rows == state.num_rows) { - blocks.emplace_back(header.cloneWithColumns(state.all_columns)); + chunks.emplace_back(state.all_columns, current_rows); } else { Columns new_columns; new_columns.reserve(state.all_columns.size()); for (const auto & column : state.all_columns) - new_columns.emplace_back(column->cut(state.current_row, state.to_row - state.current_row)); + new_columns.emplace_back(column->cut(state.current_row, current_rows)); - blocks.emplace_back(header.cloneWithColumns(new_columns)); + chunks.emplace_back(std::move(new_columns), current_rows); } + chunks.back().setChunkInfo(std::make_shared()); + states[i].current_row = states[i].to_row; - accumulated_rows += blocks.back().rows(); + accumulated_rows += current_rows; if (!states[i].isValid()) inputs_to_update.push_back(i); diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h index 8b83e4e215b..2591e11a211 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h @@ -39,17 +39,15 @@ public: size_t num_inputs_, AggregatingTransformParamsPtr params_, SortDescription description_, - size_t max_block_size_, - size_t merge_threads_); + size_t max_block_size_); void initialize(Inputs inputs) override; void consume(Input & input, size_t source_num) override; Status merge() override; private: - void aggregate(); + Chunk prepareToMerge(); void addToAggregation(); - Chunk popResult(); struct State { @@ -72,17 +70,14 @@ private: AggregatingTransformParamsPtr params; SortDescription description; size_t max_block_size; - ThreadPool pool; - - std::mutex results_mutex; - std::vector results; Inputs current_inputs; + std::vector states; std::vector inputs_to_update; - BlocksList blocks; + + std::vector chunks; size_t accumulated_rows = 0; - bool finished = false; }; } diff --git a/src/Processors/Merges/FinishAggregatingInOrderTransform.h b/src/Processors/Merges/FinishAggregatingInOrderTransform.h index 3a388a36727..5605cea8e39 100644 --- a/src/Processors/Merges/FinishAggregatingInOrderTransform.h +++ b/src/Processors/Merges/FinishAggregatingInOrderTransform.h @@ -17,16 +17,14 @@ public: size_t num_inputs, AggregatingTransformParamsPtr params, SortDescription description, - size_t max_block_size, - size_t merge_threads) + size_t max_block_size) : IMergingTransform( - num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, + num_inputs, header, {}, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, params, std::move(description), - max_block_size, - merge_threads) + max_block_size) { } diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index cba78390c97..85416673d4f 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -132,7 +132,7 @@ IProcessor::Status IMergingTransformBase::prepare() bool is_port_full = !output.canPush(); /// Push if has data. - if (state.output_chunk && !is_port_full) + if ((state.output_chunk || state.output_chunk.hasChunkInfo()) && !is_port_full) output.push(std::move(state.output_chunk)); if (!is_initialized) diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index 8b0a44ae025..1acd4916260 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -107,7 +107,7 @@ public: IMergingAlgorithm::Status status = algorithm.merge(); - if (status.chunk && status.chunk.hasRows()) + if ((status.chunk && status.chunk.hasRows()) || status.chunk.hasChunkInfo()) { // std::cerr << "Got chunk with " << status.chunk.getNumRows() << " rows" << std::endl; state.output_chunk = std::move(status.chunk); diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 4942757cbd8..0a362c4750a 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -87,24 +88,21 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B aggregating_in_order = collector.detachProcessors(0); - for (auto & column_description : group_by_sort_description) - { - if (!column_description.column_name.empty()) - { - column_description.column_number = pipeline.getHeader().getPositionByName(column_description.column_name); - column_description.column_name.clear(); - } - } - auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), transform_params, group_by_sort_description, - max_block_size, - merge_threads); + max_block_size); pipeline.addTransform(std::move(transform)); + pipeline.resize(merge_threads); + + pipeline.addSimpleTransform([&](const Block &) + { + return std::make_shared(transform_params); + }); + aggregating_sorted = collector.detachProcessors(1); } else @@ -114,14 +112,14 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B return std::make_shared(header, transform_params, group_by_sort_description, max_block_size); }); + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, transform_params); + }); + aggregating_in_order = collector.detachProcessors(0); } - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, transform_params); - }); - finalizing = collector.detachProcessors(2); return; } diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index 33bc5280422..d6c5491f45c 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -133,7 +133,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) variants.without_key = nullptr; - /// Arenas cannot be destroyed here, since later, in FinalizingSimpleTransform + /// Arenas cannot be destroyed here, since later, in FinalizeAggregatedTransform /// there will be finalizeChunk(), but even after /// finalizeChunk() we cannot destroy arena, since some memory /// from Arena still in use, so we attach it to the Chunk to diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.h b/src/Processors/Transforms/AggregatingInOrderTransform.h index 10793e885ce..e0cb9b80967 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.h +++ b/src/Processors/Transforms/AggregatingInOrderTransform.h @@ -64,10 +64,10 @@ private: }; -class FinalizingSimpleTransform : public ISimpleTransform +class FinalizeAggregatedTransform : public ISimpleTransform { public: - FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params_) + FinalizeAggregatedTransform(Block header, AggregatingTransformParamsPtr params_) : ISimpleTransform({std::move(header)}, {params_->getHeader()}, true) , params(params_) {} @@ -82,7 +82,7 @@ public: } } - String getName() const override { return "FinalizingSimpleTransform"; } + String getName() const override { return "FinalizeAggregatedTransform"; } private: AggregatingTransformParamsPtr params; diff --git a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp index df2ea4b03f0..0eec02809e4 100644 --- a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp @@ -12,13 +12,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -struct ChunksToMerge : public ChunkInfo -{ - std::unique_ptr chunks; - Int32 bucket_num = -1; - bool is_overflows = false; -}; - GroupingAggregatedTransform::GroupingAggregatedTransform( const Block & header_, size_t num_inputs_, AggregatingTransformParamsPtr params_) : IProcessor(InputPorts(num_inputs_, header_), { Block() }) diff --git a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h index 4367f6fec32..b4a62f8a13e 100644 --- a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h +++ b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h @@ -137,6 +137,13 @@ private: void addChunk(Chunk chunk, size_t from_input); }; +struct ChunksToMerge : public ChunkInfo +{ + std::unique_ptr chunks; + Int32 bucket_num = -1; + bool is_overflows = false; +}; + class Pipe; /// Adds processors to pipe which performs memory efficient merging of partially aggregated data from several sources. diff --git a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference index cdc595a3c57..b6b8b04907c 100644 --- a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference +++ b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference @@ -1,17 +1,18 @@ (Expression) -ExpressionTransform +ExpressionTransform × 3 (Aggregating) - FinalizingSimpleTransform - FinishAggregatingInOrderTransform 3 → 1 - AggregatingInOrderTransform × 3 - (Expression) - ExpressionTransform × 3 - (SettingQuotaAndLimits) - (ReadFromMergeTree) - ExpressionTransform × 4 - MergeTreeInOrder 0 → 1 - MergingSortedTransform 2 → 1 - ExpressionTransform × 2 - MergeTreeInOrder × 2 0 → 1 - ExpressionTransform - MergeTreeInOrder 0 → 1 + MergingAggregatedBucketTransform × 3 + Resize 1 → 3 + FinishAggregatingInOrderTransform 3 → 1 + AggregatingInOrderTransform × 3 + (Expression) + ExpressionTransform × 3 + (SettingQuotaAndLimits) + (ReadFromMergeTree) + ExpressionTransform × 4 + MergeTreeInOrder 0 → 1 + MergingSortedTransform 2 → 1 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 + ExpressionTransform + MergeTreeInOrder 0 → 1