Special case for aggregation by 8bit field

This commit is contained in:
Alexey Milovidov 2020-07-29 21:35:52 +03:00
parent 542ec9f777
commit 1088bfffb1
4 changed files with 65 additions and 3 deletions

View File

@ -151,7 +151,8 @@ public:
virtual void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const = 0;
virtual void addBatchSinglePlaceFromInterval(size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
virtual void addBatchSinglePlaceFromInterval(
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
@ -159,7 +160,24 @@ public:
* "places" contains a large number of same values consecutively.
*/
virtual void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena) const = 0;
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena) const = 0;
/** The case when the aggregation key is UInt8
* and pointers to aggregation states are stored in AggregateDataPtr[256] lookup table.
*/
virtual void addBatchLookupTable8(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
const UInt8 * key,
const IColumn ** columns,
Arena * arena) const = 0;
/** By default all NULLs are skipped during aggregation.
* If it returns nullptr, the default one will be used.
@ -204,6 +222,24 @@ public:
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
void addBatchLookupTable8(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
const UInt8 * key,
const IColumn ** columns,
Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
{
AggregateDataPtr & place = places[key[i]];
if (unlikely(!place))
init(place);
static_cast<const Derived *>(this)->add(place + place_offset, columns, i, arena);
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
@ -218,7 +254,8 @@ public:
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
void addBatchSinglePlaceFromInterval(size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlaceFromInterval(
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = batch_begin; i < batch_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);

View File

@ -64,6 +64,8 @@ struct HashMethodOneNumber
/// Is used for default implementation in HashMethodBase.
FieldType getKeyHolder(size_t row, Arena &) const { return unalignedLoad<FieldType>(vec + row * sizeof(FieldType)); }
const FieldType * getKeyData() const { return reinterpret_cast<const FieldType *>(vec); }
};

View File

@ -468,6 +468,9 @@ public:
size_t getBufferSizeInCells() const { return NUM_CELLS; }
const Cell * data() const { return buf; }
Cell * data() { return buf; }
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
size_t getCollisions() const { return 0; }
#endif

View File

@ -521,6 +521,26 @@ void NO_INLINE Aggregator::executeImplBatch(
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
if constexpr (std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
{
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
rows,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
{
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
},
state.getKeyData(),
inst->batch_arguments,
aggregates_pool);
}
return;
}
PODArray<AggregateDataPtr> places(rows);
/// For all rows.