From efa51a6cd9007ec853feae5fffe1e1610b26a6d6 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 11 Aug 2019 01:36:55 +0300 Subject: [PATCH 1/7] Batch aggregation (experimental) --- .../AggregateFunctions/IAggregateFunction.cpp | 17 +++++++ .../AggregateFunctions/IAggregateFunction.h | 5 ++ dbms/src/Interpreters/Aggregator.cpp | 50 ++++++++++++++++--- dbms/src/Interpreters/Aggregator.h | 11 ++-- 4 files changed, 74 insertions(+), 9 deletions(-) create mode 100644 dbms/src/AggregateFunctions/IAggregateFunction.cpp diff --git a/dbms/src/AggregateFunctions/IAggregateFunction.cpp b/dbms/src/AggregateFunctions/IAggregateFunction.cpp new file mode 100644 index 00000000000..135f6c2662f --- /dev/null +++ b/dbms/src/AggregateFunctions/IAggregateFunction.cpp @@ -0,0 +1,17 @@ +#include + +namespace DB +{ + +void IAggregateFunction::addBatch( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + Arena * arena) const +{ + for (size_t i = 0; i < batch_size; ++i) + add(places[i] + place_offset, columns, i, arena); +} + +} diff --git a/dbms/src/AggregateFunctions/IAggregateFunction.h b/dbms/src/AggregateFunctions/IAggregateFunction.h index 94feb2456cf..ae6937a04bd 100644 --- a/dbms/src/AggregateFunctions/IAggregateFunction.h +++ b/dbms/src/AggregateFunctions/IAggregateFunction.h @@ -128,6 +128,11 @@ public: using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *); virtual AddFunc getAddressOfAddFunction() const = 0; + /** Contains a loop with calls to "add" function. You can collect arguments into array "places" + * and do a single call to "addBatch" for devirtualization and inlining. + */ + void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const; + /** This is used for runtime code generation to determine, which header files to include in generated source. * Always implement it as * const char * getHeaderFilePath() const override { return __FILE__; } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 23a06ea58d6..f3b30d51d0d 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -583,16 +583,16 @@ void NO_INLINE Aggregator::executeImpl( size_t rows, ColumnRawPtrs & key_columns, AggregateFunctionInstruction * aggregate_instructions, - StringRefs & keys, bool no_more_keys, AggregateDataPtr overflow_row) const { typename Method::State state(key_columns, key_sizes, aggregation_state_cache); if (!no_more_keys) - executeImplCase(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row); + //executeImplCase(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); + executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions); else - executeImplCase(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row); + executeImplCase(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); } @@ -602,9 +602,7 @@ void NO_INLINE Aggregator::executeImplCase( typename Method::State & state, Arena * aggregates_pool, size_t rows, - ColumnRawPtrs & /*key_columns*/, AggregateFunctionInstruction * aggregate_instructions, - StringRefs & /*keys*/, AggregateDataPtr overflow_row) const { /// NOTE When editing this code, also pay attention to SpecializedAggregator.h. @@ -655,6 +653,46 @@ void NO_INLINE Aggregator::executeImplCase( } +template +void NO_INLINE Aggregator::executeImplBatch( + Method & method, + typename Method::State & state, + Arena * aggregates_pool, + size_t rows, + AggregateFunctionInstruction * aggregate_instructions) const +{ + PODArray places(rows); + + /// For all rows. + for (size_t i = 0; i < rows; ++i) + { + AggregateDataPtr aggregate_data = nullptr; + + auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool); + + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.isInserted()) + { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.setMapped(nullptr); + + aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); + createAggregateStates(aggregate_data); + + emplace_result.setMapped(aggregate_data); + } + else + aggregate_data = emplace_result.getMapped(); + + places[i] = aggregate_data; + } + + /// Add values to the aggregate functions. + for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) + inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool); +} + + void NO_INLINE Aggregator::executeWithoutKeyImpl( AggregatedDataWithoutKey & res, size_t rows, @@ -826,7 +864,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, aggregate_functions_instructions.data(), \ - key, no_more_keys, overflow_row_ptr); + no_more_keys, overflow_row_ptr); if (false) {} APPLY_FOR_AGGREGATED_VARIANTS(M) diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index cf4b590258f..e089f4707d2 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1003,7 +1003,6 @@ protected: size_t rows, ColumnRawPtrs & key_columns, AggregateFunctionInstruction * aggregate_instructions, - StringRefs & keys, bool no_more_keys, AggregateDataPtr overflow_row) const; @@ -1014,11 +1013,17 @@ protected: typename Method::State & state, Arena * aggregates_pool, size_t rows, - ColumnRawPtrs & key_columns, AggregateFunctionInstruction * aggregate_instructions, - StringRefs & keys, AggregateDataPtr overflow_row) const; + template + void executeImplBatch( + Method & method, + typename Method::State & state, + Arena * aggregates_pool, + size_t rows, + AggregateFunctionInstruction * aggregate_instructions) const; + /// For case when there are no keys (all aggregate into one row). void executeWithoutKeyImpl( AggregatedDataWithoutKey & res, From c98d2fe6e13a6089f32f5d215ab8462844755cf6 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 11 Aug 2019 01:54:33 +0300 Subject: [PATCH 2/7] Addition to prev. revision --- .../AggregateFunctions/IAggregateFunction.cpp | 17 ----------------- .../src/AggregateFunctions/IAggregateFunction.h | 9 ++++++++- 2 files changed, 8 insertions(+), 18 deletions(-) delete mode 100644 dbms/src/AggregateFunctions/IAggregateFunction.cpp diff --git a/dbms/src/AggregateFunctions/IAggregateFunction.cpp b/dbms/src/AggregateFunctions/IAggregateFunction.cpp deleted file mode 100644 index 135f6c2662f..00000000000 --- a/dbms/src/AggregateFunctions/IAggregateFunction.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include - -namespace DB -{ - -void IAggregateFunction::addBatch( - size_t batch_size, - AggregateDataPtr * places, - size_t place_offset, - const IColumn ** columns, - Arena * arena) const -{ - for (size_t i = 0; i < batch_size; ++i) - add(places[i] + place_offset, columns, i, arena); -} - -} diff --git a/dbms/src/AggregateFunctions/IAggregateFunction.h b/dbms/src/AggregateFunctions/IAggregateFunction.h index ae6937a04bd..101b194227b 100644 --- a/dbms/src/AggregateFunctions/IAggregateFunction.h +++ b/dbms/src/AggregateFunctions/IAggregateFunction.h @@ -131,7 +131,7 @@ public: /** Contains a loop with calls to "add" function. You can collect arguments into array "places" * and do a single call to "addBatch" for devirtualization and inlining. */ - void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const; + virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0; /** This is used for runtime code generation to determine, which header files to include in generated source. * Always implement it as @@ -161,7 +161,14 @@ private: public: IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_) : IAggregateFunction(argument_types_, parameters_) {} + AddFunc getAddressOfAddFunction() const override { return &addFree; } + + void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override + { + for (size_t i = 0; i < batch_size; ++i) + static_cast(this)->add(places[i] + place_offset, columns, i, arena); + } }; From 3885cc20b86215ddb7674047f55a439a6dd6a898 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 11 Aug 2019 02:22:23 +0300 Subject: [PATCH 3/7] Improved performance of aggregation without key --- dbms/src/AggregateFunctions/IAggregateFunction.h | 10 ++++++++++ dbms/src/Interpreters/Aggregator.cpp | 9 +++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/dbms/src/AggregateFunctions/IAggregateFunction.h b/dbms/src/AggregateFunctions/IAggregateFunction.h index 101b194227b..ed535d91ece 100644 --- a/dbms/src/AggregateFunctions/IAggregateFunction.h +++ b/dbms/src/AggregateFunctions/IAggregateFunction.h @@ -133,6 +133,10 @@ public: */ virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0; + /** The same for single place. + */ + virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0; + /** This is used for runtime code generation to determine, which header files to include in generated source. * Always implement it as * const char * getHeaderFilePath() const override { return __FILE__; } @@ -169,6 +173,12 @@ public: for (size_t i = 0; i < batch_size; ++i) static_cast(this)->add(places[i] + place_offset, columns, i, arena); } + + void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override + { + for (size_t i = 0; i < batch_size; ++i) + static_cast(this)->add(place, columns, i, arena); + } }; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index f3b30d51d0d..2498bcec7fd 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -708,12 +708,9 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( agg_count->addDelta(res, rows); else { - for (size_t i = 0; i < rows; ++i) - { - /// Adding values - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - (*inst->func)(inst->that, res + inst->state_offset, inst->arguments, i, arena); - } + /// Adding values + for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) + inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena); } } From 011e50cae6b15d6ae1bf51621c5fdc505a84ae27 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 11 Aug 2019 02:27:25 +0300 Subject: [PATCH 4/7] Removed useless function arguments --- dbms/src/Interpreters/Aggregator.cpp | 5 ++--- dbms/src/Interpreters/Aggregator.h | 3 --- dbms/src/Interpreters/SpecializedAggregator.h | 7 ++----- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 2498bcec7fd..c19a54a07af 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -289,7 +289,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type) "template void Aggregator::executeSpecialized<\n" " " << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n" " " << method_typename << " &, Arena *, size_t, ColumnRawPtrs &,\n" - " AggregateColumns &, StringRefs &, bool, AggregateDataPtr) const;\n" + " AggregateColumns &, bool, AggregateDataPtr) const;\n" "\n" "static void wrapper" << suffix << "(\n" " const Aggregator & aggregator,\n" @@ -298,13 +298,12 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type) " size_t rows,\n" " ColumnRawPtrs & key_columns,\n" " Aggregator::AggregateColumns & aggregate_columns,\n" - " StringRefs & keys,\n" " bool no_more_keys,\n" " AggregateDataPtr overflow_row)\n" "{\n" " aggregator.executeSpecialized<\n" " " << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n" - " method, arena, rows, key_columns, aggregate_columns, keys, no_more_keys, overflow_row);\n" + " method, arena, rows, key_columns, aggregate_columns, no_more_keys, overflow_row);\n" "}\n" "\n" "void * getPtr" << suffix << "() __attribute__((__visibility__(\"default\")));\n" diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index e089f4707d2..4b6f7d16b90 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1047,7 +1047,6 @@ public: size_t rows, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, - StringRefs & keys, bool no_more_keys, AggregateDataPtr overflow_row) const; @@ -1057,9 +1056,7 @@ public: typename Method::State & state, Arena * aggregates_pool, size_t rows, - ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, - StringRefs & keys, AggregateDataPtr overflow_row) const; template diff --git a/dbms/src/Interpreters/SpecializedAggregator.h b/dbms/src/Interpreters/SpecializedAggregator.h index 8ec6b297111..4136f2162ac 100644 --- a/dbms/src/Interpreters/SpecializedAggregator.h +++ b/dbms/src/Interpreters/SpecializedAggregator.h @@ -103,7 +103,6 @@ void NO_INLINE Aggregator::executeSpecialized( size_t rows, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, - StringRefs & keys, bool no_more_keys, AggregateDataPtr overflow_row) const { @@ -111,10 +110,10 @@ void NO_INLINE Aggregator::executeSpecialized( if (!no_more_keys) executeSpecializedCase( - method, state, aggregates_pool, rows, key_columns, aggregate_columns, keys, overflow_row); + method, state, aggregates_pool, rows, aggregate_columns, overflow_row); else executeSpecializedCase( - method, state, aggregates_pool, rows, key_columns, aggregate_columns, keys, overflow_row); + method, state, aggregates_pool, rows, aggregate_columns, overflow_row); } #pragma GCC diagnostic push @@ -126,9 +125,7 @@ void NO_INLINE Aggregator::executeSpecializedCase( typename Method::State & state, Arena * aggregates_pool, size_t rows, - ColumnRawPtrs & /*key_columns*/, AggregateColumns & aggregate_columns, - StringRefs & /*keys*/, AggregateDataPtr overflow_row) const { /// For all rows. From f00fa640ba4c7098e2ca3eaf4ee633ca4ccf82a0 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 11 Aug 2019 02:40:15 +0300 Subject: [PATCH 5/7] Removed old optimization that is now dominated by the new optimization --- .../AggregateFunctions/AggregateFunctionCount.h | 6 ------ dbms/src/Interpreters/Aggregator.cpp | 17 +++-------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/dbms/src/AggregateFunctions/AggregateFunctionCount.h b/dbms/src/AggregateFunctions/AggregateFunctionCount.h index e0371a78644..c1b96a4fe4a 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionCount.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionCount.h @@ -62,12 +62,6 @@ public: static_cast(to).getData().push_back(data(place).count); } - /// May be used for optimization. - void addDelta(AggregateDataPtr place, UInt64 x) const - { - data(place).count += x; - } - const char * getHeaderFilePath() const override { return __FILE__; } }; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index c19a54a07af..3985ec94c66 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -698,19 +697,9 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( AggregateFunctionInstruction * aggregate_instructions, Arena * arena) const { - /// Optimization in the case of a single aggregate function `count`. - AggregateFunctionCount * agg_count = params.aggregates_size == 1 - ? typeid_cast(aggregate_functions[0]) - : nullptr; - - if (agg_count) - agg_count->addDelta(res, rows); - else - { - /// Adding values - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena); - } + /// Adding values + for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) + inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena); } From cc5b34d778b7bbd1cb9123f0c67a2bafedbfff56 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 11 Aug 2019 04:00:54 +0300 Subject: [PATCH 6/7] Addition to prev. revision --- dbms/src/Interpreters/SpecializedAggregator.h | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/dbms/src/Interpreters/SpecializedAggregator.h b/dbms/src/Interpreters/SpecializedAggregator.h index 4136f2162ac..9a238c77032 100644 --- a/dbms/src/Interpreters/SpecializedAggregator.h +++ b/dbms/src/Interpreters/SpecializedAggregator.h @@ -181,20 +181,10 @@ void NO_INLINE Aggregator::executeSpecializedWithoutKey( AggregateColumns & aggregate_columns, Arena * arena) const { - /// Optimization in the case of a single aggregate function `count`. - AggregateFunctionCount * agg_count = params.aggregates_size == 1 - ? typeid_cast(aggregate_functions[0]) - : nullptr; - - if (agg_count) - agg_count->addDelta(res, rows); - else + for (size_t i = 0; i < rows; ++i) { - for (size_t i = 0; i < rows; ++i) - { - AggregateFunctionsList::forEach(AggregateFunctionsUpdater( - aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i, arena)); - } + AggregateFunctionsList::forEach(AggregateFunctionsUpdater( + aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i, arena)); } } From 2570907f4402e2dda05c2cb002b11c052105cf8c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 12 Aug 2019 00:45:18 +0300 Subject: [PATCH 7/7] Removed useless parameter --- .../ParallelAggregatingBlockInputStream.cpp | 6 ++---- .../ParallelAggregatingBlockInputStream.h | 2 -- dbms/src/Interpreters/Aggregator.cpp | 20 ++++++++----------- dbms/src/Interpreters/Aggregator.h | 2 -- .../Transforms/AggregatingTransform.cpp | 5 ++--- .../Transforms/AggregatingTransform.h | 1 - 6 files changed, 12 insertions(+), 24 deletions(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 9fee2996a7b..6c75ec726d4 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -110,8 +110,7 @@ ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(co void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num) { parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num], - parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, - parent.threads_data[thread_num].key, parent.no_more_keys); + parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, parent.no_more_keys); parent.threads_data[thread_num].src_rows += block.rows(); parent.threads_data[thread_num].src_bytes += block.bytes(); @@ -205,8 +204,7 @@ void ParallelAggregatingBlockInputStream::execute() /// To do this, we pass a block with zero rows to aggregate. if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0], - threads_data[0].key_columns, threads_data[0].aggregate_columns, - threads_data[0].key, no_more_keys); + threads_data[0].key_columns, threads_data[0].aggregate_columns, no_more_keys); } } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 5342c03e68f..0c93f5d1161 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -80,13 +80,11 @@ private: size_t src_rows = 0; size_t src_bytes = 0; - StringRefs key; ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; ThreadData(size_t keys_size_, size_t aggregates_size_) { - key.resize(keys_size_); key_columns.resize(keys_size_); aggregate_columns.resize(aggregates_size_); } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 3985ec94c66..f367908a0a5 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -704,8 +704,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, - ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, StringRefs & key, - bool & no_more_keys) + ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) { if (isCancelled()) return true; @@ -819,9 +818,9 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re reinterpret_cast(compiled_data->compiled_method_ptr) \ + bool, AggregateDataPtr)>(compiled_data->compiled_method_ptr) \ (*this, *result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, \ - key, no_more_keys, overflow_row_ptr); + no_more_keys, overflow_row_ptr); if (false) {} APPLY_FOR_AGGREGATED_VARIANTS(M) @@ -835,9 +834,9 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re reinterpret_cast(compiled_data->compiled_two_level_method_ptr) \ + bool, AggregateDataPtr)>(compiled_data->compiled_two_level_method_ptr) \ (*this, *result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, \ - key, no_more_keys, overflow_row_ptr); + no_more_keys, overflow_row_ptr); if (false) {} APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -1055,7 +1054,6 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria if (isCancelled()) return; - StringRefs key(params.keys_size); ColumnRawPtrs key_columns(params.keys_size); AggregateColumns aggregate_columns(params.aggregates_size); @@ -1082,14 +1080,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria src_rows += block.rows(); src_bytes += block.bytes(); - if (!executeOnBlock(block, result, key_columns, aggregate_columns, key, no_more_keys)) + if (!executeOnBlock(block, result, key_columns, aggregate_columns, no_more_keys)) break; } /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) - executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, key, no_more_keys); + executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, no_more_keys); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.sizeWithoutOverflowRow(); @@ -2344,7 +2342,6 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( Method & method, Arena * pool, ColumnRawPtrs & key_columns, - StringRefs & keys [[maybe_unused]], const Block & source, std::vector & destinations) const { @@ -2406,7 +2403,6 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) AggregatedDataVariants data; - StringRefs key(params.keys_size); ColumnRawPtrs key_columns(params.keys_size); /// Remember the columns we will work with @@ -2446,7 +2442,7 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) #define M(NAME) \ else if (data.type == AggregatedDataVariants::Type::NAME) \ convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, \ - key_columns, key, block, splitted_blocks); + key_columns, block, splitted_blocks); if (false) {} APPLY_FOR_VARIANTS_TWO_LEVEL(M) diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 4b6f7d16b90..df7354d8294 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -842,7 +842,6 @@ public: /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). bool executeOnBlock(const Block & block, AggregatedDataVariants & result, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block - StringRefs & keys, /// - pass the corresponding objects that are initially empty. bool & no_more_keys); /** Convert the aggregation data structure into a block. @@ -1181,7 +1180,6 @@ protected: Method & method, Arena * pool, ColumnRawPtrs & key_columns, - StringRefs & keys, const Block & source, std::vector & destinations) const; diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index 55fe66b7d9f..5575b628344 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -101,7 +101,6 @@ AggregatingTransform::AggregatingTransform( Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_, size_t current_variant, size_t temporary_data_merge_threads_, size_t max_threads_) : IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_)) - , key(params->params.keys_size) , key_columns(params->params.keys_size) , aggregate_columns(params->params.aggregates_size) , many_data(std::move(many_data_)) @@ -212,7 +211,7 @@ void AggregatingTransform::consume(Chunk chunk) auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); - if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, key, no_more_keys)) + if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, no_more_keys)) is_consume_finished = true; } @@ -226,7 +225,7 @@ void AggregatingTransform::initGenerate() /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set) - params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, key, no_more_keys); + params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = variants.sizeWithoutOverflowRow(); diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.h b/dbms/src/Processors/Transforms/AggregatingTransform.h index 17786ccfa1a..3621a60517c 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.h +++ b/dbms/src/Processors/Transforms/AggregatingTransform.h @@ -71,7 +71,6 @@ private: AggregatingTransformParamsPtr params; Logger * log = &Logger::get("AggregatingTransform"); - StringRefs key; ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; bool no_more_keys = false;