mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #23681 from ClickHouse/devirt-agg-merge
A little bit faster merge of aggregating states.
This commit is contained in:
commit
5837fdabf2
@ -113,6 +113,16 @@ public:
|
||||
nested_func->merge(place, rhs, arena);
|
||||
}
|
||||
|
||||
void mergeBatch(
|
||||
size_t batch_size,
|
||||
AggregateDataPtr * places,
|
||||
size_t place_offset,
|
||||
const AggregateDataPtr * rhs,
|
||||
Arena * arena) const override
|
||||
{
|
||||
nested_func->mergeBatch(batch_size, places, place_offset, rhs, arena);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
|
||||
{
|
||||
nested_func->serialize(place, buf);
|
||||
|
@ -196,6 +196,18 @@ public:
|
||||
place[size_of_data] |= rhs[size_of_data];
|
||||
}
|
||||
|
||||
void mergeBatch(
|
||||
size_t batch_size,
|
||||
AggregateDataPtr * places,
|
||||
size_t place_offset,
|
||||
const AggregateDataPtr * rhs,
|
||||
Arena * arena) const override
|
||||
{
|
||||
nested_function->mergeBatch(batch_size, places, place_offset, rhs, arena);
|
||||
for (size_t i = 0; i < batch_size; ++i)
|
||||
(places[i] + place_offset)[size_of_data] |= rhs[i][size_of_data];
|
||||
}
|
||||
|
||||
void serialize(
|
||||
ConstAggregateDataPtr place,
|
||||
WriteBuffer & buf) const override
|
||||
|
@ -153,6 +153,13 @@ public:
|
||||
Arena * arena,
|
||||
ssize_t if_argument_pos = -1) const = 0;
|
||||
|
||||
virtual void mergeBatch(
|
||||
size_t batch_size,
|
||||
AggregateDataPtr * places,
|
||||
size_t place_offset,
|
||||
const AggregateDataPtr * rhs,
|
||||
Arena * arena) const = 0;
|
||||
|
||||
/** The same for single place.
|
||||
*/
|
||||
virtual void addBatchSinglePlace(
|
||||
@ -279,6 +286,18 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void mergeBatch(
|
||||
size_t batch_size,
|
||||
AggregateDataPtr * places,
|
||||
size_t place_offset,
|
||||
const AggregateDataPtr * rhs,
|
||||
Arena * arena) const override
|
||||
{
|
||||
for (size_t i = 0; i < batch_size; ++i)
|
||||
if (places[i])
|
||||
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
|
||||
}
|
||||
|
||||
void addBatchSinglePlace(
|
||||
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
|
||||
{
|
||||
|
@ -1769,6 +1769,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
|
||||
|
||||
/// For all rows.
|
||||
size_t rows = block.rows();
|
||||
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
AggregateDataPtr aggregate_data = nullptr;
|
||||
@ -1797,18 +1799,17 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
|
||||
|
||||
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
|
||||
|
||||
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
|
||||
if (!aggregate_data && !overflow_row)
|
||||
continue;
|
||||
|
||||
AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
|
||||
places[i] = value;
|
||||
}
|
||||
|
||||
for (size_t j = 0; j < params.aggregates_size; ++j)
|
||||
{
|
||||
/// Merge state of aggregate functions.
|
||||
for (size_t j = 0; j < params.aggregates_size; ++j)
|
||||
aggregate_functions[j]->merge(
|
||||
value + offsets_of_aggregate_states[j],
|
||||
(*aggregate_columns[j])[i],
|
||||
aggregates_pool);
|
||||
aggregate_functions[j]->mergeBatch(
|
||||
rows, places.get(), offsets_of_aggregate_states[j],
|
||||
aggregate_columns[j]->data(),
|
||||
aggregates_pool);
|
||||
}
|
||||
|
||||
/// Early release memory.
|
||||
|
Loading…
Reference in New Issue
Block a user