From 6f7c8894e95d78ff80fe7cd64c490db01fce8c2c Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 18 Mar 2021 23:17:09 +0300 Subject: [PATCH] fix bugs in aggregation by primary key --- src/Interpreters/Aggregator.cpp | 24 +++++++---- src/Interpreters/Aggregator.h | 6 ++- .../FinishAggregatingInOrderAlgorithm.cpp | 40 +++++++++++++++---- .../FinishAggregatingInOrderAlgorithm.h | 9 ++++- .../FinishAggregatingInOrderTransform.h | 6 ++- src/Processors/QueryPlan/AggregatingStep.cpp | 3 +- .../AggregatingInOrderTransform.cpp | 37 +++++++++++------ 7 files changed, 91 insertions(+), 34 deletions(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index ee8132cd40c..b28ffc832ac 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1260,22 +1260,30 @@ Block Aggregator::prepareBlockAndFill( return res; } -void Aggregator::fillAggregateColumnsWithSingleKey( +void Aggregator::addToAggregateColumnsWithSingleKey( + const AggregatedDataVariants & data_variants, + MutableColumns & final_aggregate_columns) +{ + const auto & data = data_variants.without_key; + for (size_t i = 0; i < params.aggregates_size; ++i) + { + auto & column_aggregate_func = assert_cast(*final_aggregate_columns[i]); + column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]); + } +} + +void Aggregator::finalizeAggregateColumnsWithSingleKey( AggregatedDataVariants & data_variants, MutableColumns & final_aggregate_columns) { - AggregatedDataWithoutKey & data = data_variants.without_key; - for (size_t i = 0; i < params.aggregates_size; ++i) { - ColumnAggregateFunction & column_aggregate_func = assert_cast(*final_aggregate_columns[i]); + auto & column_aggregate_func = assert_cast(*final_aggregate_columns[i]); for (auto & pool : data_variants.aggregates_pools) - { column_aggregate_func.addArena(pool); - } - column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]); } - data = nullptr; + + data_variants.without_key = nullptr; } void Aggregator::createStatesAndFillKeyColumnsWithSingleKey( diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index d24e5478372..d3eb2fcc3ba 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1303,7 +1303,11 @@ protected: AggregateFunctionInstructions & instructions, NestedColumnsHolder & nested_columns_holder); - void fillAggregateColumnsWithSingleKey( + void addToAggregateColumnsWithSingleKey( + const AggregatedDataVariants & data_variants, + MutableColumns & final_aggregate_columns); + + void finalizeAggregateColumnsWithSingleKey( AggregatedDataVariants & data_variants, MutableColumns & final_aggregate_columns); diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp index 3ef0caefd8f..3eb94ba78b7 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp @@ -24,11 +24,13 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm( const Block & header_, size_t num_inputs_, AggregatingTransformParamsPtr params_, - SortDescription description_) + SortDescription description_, + size_t max_block_size_) : header(header_) , num_inputs(num_inputs_) , params(params_) , description(std::move(description_)) + , max_block_size(max_block_size_) { /// Replace column names in description to positions. for (auto & column_description : description) @@ -56,6 +58,13 @@ void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge() { + if (!inputs_to_update.empty()) + { + Status status(inputs_to_update.back()); + inputs_to_update.pop_back(); + return status; + } + /// Find the input with smallest last row. std::optional best_input; for (size_t i = 0; i < num_inputs; ++i) @@ -94,16 +103,30 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge() states[i].to_row = (it == indices.end() ? states[i].num_rows : *it); } - Status status(*best_input); - status.chunk = aggregate(); + addToAggregation(); + + /// At least one chunk should be fully aggregated. + assert(!inputs_to_update.empty()); + Status status(inputs_to_update.back()); + inputs_to_update.pop_back(); + + /// Do not merge blocks, if there are too few rows. + if (accumulated_rows >= max_block_size) + status.chunk = aggregate(); return status; } Chunk FinishAggregatingInOrderAlgorithm::aggregate() { - BlocksList blocks; + auto aggregated = params->aggregator.mergeBlocks(blocks, false); + blocks.clear(); + accumulated_rows = 0; + return {aggregated.getColumns(), aggregated.rows()}; +} +void FinishAggregatingInOrderAlgorithm::addToAggregation() +{ for (size_t i = 0; i < num_inputs; ++i) { const auto & state = states[i]; @@ -112,7 +135,7 @@ Chunk FinishAggregatingInOrderAlgorithm::aggregate() if (state.to_row - state.current_row == state.num_rows) { - blocks.emplace_back(header.cloneWithColumns(states[i].all_columns)); + blocks.emplace_back(header.cloneWithColumns(state.all_columns)); } else { @@ -125,10 +148,11 @@ Chunk FinishAggregatingInOrderAlgorithm::aggregate() } states[i].current_row = states[i].to_row; + accumulated_rows += blocks.back().rows(); + + if (!states[i].isValid()) + inputs_to_update.push_back(i); } - - auto aggregated = params->aggregator.mergeBlocks(blocks, false); - return {aggregated.getColumns(), aggregated.rows()}; } } diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h index 2f9cd5d71a2..119aefb0ab0 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h @@ -37,7 +37,8 @@ public: const Block & header_, size_t num_inputs_, AggregatingTransformParamsPtr params_, - SortDescription description_); + SortDescription description_, + size_t max_block_size_); void initialize(Inputs inputs) override; void consume(Input & input, size_t source_num) override; @@ -45,6 +46,7 @@ public: private: Chunk aggregate(); + void addToAggregation(); struct State { @@ -66,8 +68,13 @@ private: size_t num_inputs; AggregatingTransformParamsPtr params; SortDescription description; + size_t max_block_size; + Inputs current_inputs; std::vector states; + std::vector inputs_to_update; + BlocksList blocks; + size_t accumulated_rows = 0; }; } diff --git a/src/Processors/Merges/FinishAggregatingInOrderTransform.h b/src/Processors/Merges/FinishAggregatingInOrderTransform.h index e067b9472d9..4f9e53bd7d5 100644 --- a/src/Processors/Merges/FinishAggregatingInOrderTransform.h +++ b/src/Processors/Merges/FinishAggregatingInOrderTransform.h @@ -16,13 +16,15 @@ public: const Block & header, size_t num_inputs, AggregatingTransformParamsPtr params, - SortDescription description) + SortDescription description, + size_t max_block_size) : IMergingTransform( num_inputs, header, header, true, header, num_inputs, params, - std::move(description)) + std::move(description), + max_block_size) { } diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 813d86b50c0..0474a15961e 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -100,7 +100,8 @@ void AggregatingStep::transformPipeline(QueryPipeline & pipeline) pipeline.getHeader(), pipeline.getNumStreams(), transform_params, - group_by_sort_description); + group_by_sort_description, + max_block_size); pipeline.addTransform(std::move(transform)); aggregating_sorted = collector.detachProcessors(1); diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index d448d31611d..cccd37f72e7 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -58,6 +58,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) LOG_TRACE(log, "Aggregating in order"); is_consume_started = true; } + src_rows += rows; src_bytes += chunk.bytes(); @@ -82,23 +83,24 @@ void AggregatingInOrderTransform::consume(Chunk chunk) res_aggregate_columns.resize(params->params.aggregates_size); for (size_t i = 0; i < params->params.keys_size; ++i) - { res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn(); - } + for (size_t i = 0; i < params->params.aggregates_size; ++i) - { res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn(); - } + params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns); ++cur_block_size; } + ssize_t mid = 0; ssize_t high = 0; ssize_t low = -1; + /// Will split block into segments with the same key while (key_end != rows) { high = rows; + /// Find the first position of new (not current) key in current chunk while (high - low > 1) { @@ -108,32 +110,34 @@ void AggregatingInOrderTransform::consume(Chunk chunk) else high = mid; } + key_end = high; + /// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block. if (key_begin != key_end) - { params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool); - } - low = key_begin = key_end; /// We finalize last key aggregation state if a new key found. - if (key_begin != rows) + if (key_end != rows) { - params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + params->aggregator.addToAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + /// If res_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk. if (cur_block_size == res_block_size) { Columns source_columns = chunk.detachColumns(); for (auto & source_column : source_columns) - source_column = source_column->cut(key_begin, rows - key_begin); + source_column = source_column->cut(key_end, rows - key_end); - current_chunk = Chunk(source_columns, rows - key_begin); + current_chunk = Chunk(source_columns, rows - key_end); src_rows -= current_chunk.getNumRows(); block_end_reached = true; need_generate = true; cur_block_size = 0; + params->aggregator.finalizeAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + /// Arenas cannot be destroyed here, since later, in FinalizingSimpleTransform /// there will be finalizeChunk(), but even after /// finalizeChunk() we cannot destroy arena, since some memory @@ -155,10 +159,14 @@ void AggregatingInOrderTransform::consume(Chunk chunk) } /// We create a new state for the new key and update res_key_columns - params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns); + params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_end, res_key_columns); ++cur_block_size; } + + key_begin = key_end; + low = key_end; } + block_end_reached = false; } @@ -234,7 +242,10 @@ IProcessor::Status AggregatingInOrderTransform::prepare() void AggregatingInOrderTransform::generate() { if (cur_block_size && is_consume_finished) - params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + { + params->aggregator.addToAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + params->aggregator.finalizeAggregateColumnsWithSingleKey(variants, res_aggregate_columns); + } Block res = res_header.cloneEmpty();