better performace with sparse columns in aggregate functions

This commit is contained in:
Anton Popov 2022-05-28 02:17:02 +00:00
parent 4dd447b232
commit b2cff26ecf
7 changed files with 105 additions and 22 deletions

View File

@ -224,8 +224,16 @@ public:
++this->data(place).denominator;
}
void
addBatchSinglePlace(
void addManyDefaults(
AggregateDataPtr __restrict place,
const IColumn ** /*columns*/,
size_t length,
Arena * /*arena*/) const override
{
this->data(place).denominator += length;
}
void addBatchSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr place,

View File

@ -880,8 +880,9 @@ struct AggregateFunctionMinData : Data
{
using Self = AggregateFunctionMinData;
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfLess(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfLess(to, arena); }
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfLess(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfLess(to, arena); }
void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeIfLess(column, 0, arena); }
static const char * name() { return "min"; }
@ -907,8 +908,9 @@ struct AggregateFunctionMaxData : Data
{
using Self = AggregateFunctionMaxData;
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfGreater(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfGreater(to, arena); }
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfGreater(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfGreater(to, arena); }
void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeIfGreater(column, 0, arena); }
static const char * name() { return "max"; }
@ -935,8 +937,9 @@ struct AggregateFunctionAnyData : Data
using Self = AggregateFunctionAnyData;
static constexpr bool is_any = true;
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); }
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); }
void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeFirstTime(column, 0, arena); }
static const char * name() { return "any"; }
@ -962,8 +965,9 @@ struct AggregateFunctionAnyLastData : Data
{
using Self = AggregateFunctionAnyLastData;
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); }
bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); }
bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); }
void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeEveryTime(column, 0, arena); }
static const char * name() { return "anyLast"; }
@ -1024,6 +1028,8 @@ struct AggregateFunctionSingleValueOrNullData : Data
return false;
}
void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeIfBetter(column, 0, arena); }
void insertResultInto(IColumn & to) const
{
if (is_null || first_value)
@ -1098,6 +1104,12 @@ struct AggregateFunctionAnyHeavyData : Data
return false;
}
void addManyDefaults(const IColumn & column, size_t length, Arena * arena)
{
for (size_t i = 0; i < length; ++i)
changeIfBetter(column, 0, arena);
}
void write(WriteBuffer & buf, const ISerialization & serialization) const
{
Data::write(buf, serialization);
@ -1158,6 +1170,15 @@ public:
this->data(place).changeIfBetter(*columns[0], row_num, arena);
}
void addManyDefaults(
AggregateDataPtr __restrict place,
const IColumn ** columns,
size_t length,
Arena * arena) const override
{
this->data(place).addManyDefaults(*columns[0], length, arena);
}
void addBatchSinglePlace(
size_t row_begin,
size_t row_end,

View File

@ -489,6 +489,33 @@ public:
}
}
void addManyDefaults(
AggregateDataPtr __restrict /*place*/,
const IColumn ** /*columns*/,
size_t /*length*/,
Arena * /*arena*/) const override
{
}
void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const override
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
const auto & offsets = column_sparse.getOffsetsData();
size_t from = std::lower_bound(offsets.begin(), offsets.end(), row_begin) - offsets.begin();
size_t to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin();
for (size_t i = from; i < to; ++i)
add(places[offsets[i]] + place_offset, &values, i + 1, arena);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));

View File

@ -237,6 +237,15 @@ public:
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
}
void addManyDefaults(
AggregateDataPtr __restrict place,
const IColumn ** columns,
size_t /*length*/,
Arena * /*arena*/) const override
{
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], 0);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).set.merge(this->data(rhs).set);

View File

@ -123,6 +123,10 @@ public:
*/
virtual void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;
/// Adds several default values of arguments into aggregation data on which place points to.
/// Default values must be a the 0-th positions in columns.
virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0;
/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;
@ -377,6 +381,16 @@ public:
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addManyDefaults(
AggregateDataPtr __restrict place,
const IColumn ** columns,
size_t length,
Arena * arena) const override
{
for (size_t i = 0; i < length; ++i)
static_cast<const Derived *>(this)->add(place, columns, 0, arena);
}
void addBatch( /// NOLINT
size_t row_begin,
size_t row_end,
@ -413,11 +427,7 @@ public:
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
auto offset_it = column_sparse.begin();
/// FIXME: make it more optimal
for (size_t i = 0; i < row_begin; ++i, ++offset_it)
;
auto offset_it = column_sparse.getIterator(row_begin);
for (size_t i = 0; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(places[offset_it.getCurrentRow()] + place_offset,
@ -468,17 +478,16 @@ public:
const IColumn ** columns,
Arena * arena) const override
{
/// TODO: add values and defaults separately if order of adding isn't important.
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
auto offset_it = column_sparse.begin();
const auto & offsets = column_sparse.getOffsetsData();
/// FIXME: make it more optimal
for (size_t i = 0; i < row_begin; ++i, ++offset_it)
;
auto from = std::lower_bound(offsets.begin(), offsets.end(), row_begin) - offsets.begin() + 1;
auto to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin() + 1;
for (size_t i = 0; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(place, &values, offset_it.getValueIndex(), arena);
size_t num_defaults = (row_end - row_begin) - (to - from);
static_cast<const Derived *>(this)->addBatchSinglePlace(from, to, place, &values, arena, -1);
static_cast<const Derived *>(this)->addManyDefaults(place, &values, num_defaults, arena);
}
void addBatchSinglePlaceNotNull( /// NOLINT

View File

@ -772,6 +772,14 @@ size_t ColumnSparse::getValueIndex(size_t n) const
return it - offsets_data.begin() + 1;
}
ColumnSparse::Iterator ColumnSparse::getIterator(size_t n) const
{
const auto & offsets_data = getOffsetsData();
const auto * it = std::lower_bound(offsets_data.begin(), offsets_data.end(), n);
size_t current_offset = it - offsets_data.begin();
return Iterator(offsets_data, _size, current_offset, n);
}
ColumnPtr recursiveRemoveSparse(const ColumnPtr & column)
{
if (!column)

View File

@ -215,6 +215,7 @@ public:
Iterator begin() const { return Iterator(getOffsetsData(), _size, 0, 0); }
Iterator end() const { return Iterator(getOffsetsData(), _size, getOffsetsData().size(), _size); }
Iterator getIterator(size_t n) const;
private:
using Inserter = std::function<void(IColumn &)>;