A little bit faster merge of aggregating states.

This commit is contained in:
Nikolai Kochetov 2021-04-27 12:01:58 +03:00
parent 186b1128d0
commit 5570b56cc3
4 changed files with 51 additions and 9 deletions

View File

@ -113,6 +113,16 @@ public:
nested_func->merge(place, rhs, arena);
}
void mergeBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
nested_func->mergeBatch(batch_size, places, place_offset, rhs, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
{
nested_func->serialize(place, buf);

View File

@ -196,6 +196,18 @@ public:
place[size_of_data] |= rhs[size_of_data];
}
void mergeBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
nested_function->mergeBatch(batch_size, places, place_offset, rhs, arena);
for (size_t i = 0; i < batch_size; ++i)
(places[i] + place_offset)[size_of_data] |= rhs[i][size_of_data];
}
void serialize(
ConstAggregateDataPtr place,
WriteBuffer & buf) const override

View File

@ -153,6 +153,13 @@ public:
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
virtual void mergeBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(
@ -279,6 +286,18 @@ public:
}
}
void mergeBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
if (places[i])
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
{

View File

@ -1769,6 +1769,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
/// For all rows.
size_t rows = block.rows();
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
@ -1797,18 +1799,17 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
if (!aggregate_data && !overflow_row)
continue;
AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
places[i] = value;
}
for (size_t j = 0; j < params.aggregates_size; ++j)
{
/// Merge state of aggregate functions.
for (size_t j = 0; j < params.aggregates_size; ++j)
aggregate_functions[j]->merge(
value + offsets_of_aggregate_states[j],
(*aggregate_columns[j])[i],
aggregates_pool);
aggregate_functions[j]->mergeBatch(
rows, places.get(), offsets_of_aggregate_states[j],
aggregate_columns[j]->data(),
aggregates_pool);
}
/// Early release memory.