Devirtualize -If and vectorize count

This commit is contained in:
Amos Bird 2020-11-15 18:05:52 +08:00
parent 9c1516bd74
commit 9348526078
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
16 changed files with 336 additions and 56 deletions

View File

@ -129,7 +129,7 @@ public:
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -6,6 +6,7 @@
#include <array>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/assert_cast.h>
@ -42,6 +43,39 @@ public:
++data(place).count;
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, size_t num_arguments) const override
{
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
data(place).count += countBytesInFilter(flags);
}
else
{
data(place).count += batch_size;
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
size_t num_arguments) const override
{
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
data(place).count += countBytesInFilterWithNull(flags, null_map);
}
else
{
data(place).count += batch_size - countBytesInFilter(null_map, batch_size);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
data(place).count += data(rhs).count;

View File

@ -235,6 +235,8 @@ public:
{
return true;
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -252,6 +252,8 @@ public:
{
return nested_func->isState();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};

View File

@ -80,6 +80,34 @@ public:
nested_func->add(place, columns, row_num, arena);
}
void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
size_t) const override
{
nested_func->addBatch(batch_size, places, place_offset, columns, arena, num_arguments);
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t) const override
{
nested_func->addBatchSinglePlace(batch_size, place, columns, arena, num_arguments);
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
size_t) const override
{
nested_func->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, num_arguments);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs, arena);
@ -113,6 +141,8 @@ public:
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function, const DataTypes & arguments,
const Array & params, const AggregateFunctionProperties & properties) const override;
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -102,6 +102,8 @@ public:
{
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -180,6 +180,8 @@ public:
{
return nested_function->isState();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};
@ -209,13 +211,15 @@ public:
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments = 0) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
const UInt8 * null_map = column->getNullMapData().data();
this->nested_function->addBatchSinglePlaceNotNull(batch_size, this->nestedPlace(place), &nested_column, null_map, arena);
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), &nested_column, null_map, arena, num_arguments);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))

View File

@ -2,6 +2,7 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadHelpers.h>
@ -96,37 +97,94 @@ public:
place[size_of_data] = 1;
}
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
size_t num_arguments = 0) const override
{
nested_function->addBatch(batch_size, places, place_offset, columns, arena);
for (size_t i = 0; i < batch_size; ++i)
(places[i] + place_offset)[size_of_data] = 1;
// TODO we can devirtualize this too
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
add(places[i] + place_offset, columns, i, arena);
}
}
else
{
nested_function->addBatch(batch_size, places, place_offset, columns, arena, num_arguments);
for (size_t i = 0; i < batch_size; ++i)
(places[i] + place_offset)[size_of_data] = 1;
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments = 0) const override
{
if (batch_size)
if (num_arguments > 0)
{
nested_function->addBatchSinglePlace(batch_size, place, columns, arena);
place[size_of_data] = 1;
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, num_arguments);
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
{
place[size_of_data] = 1;
break;
}
}
}
else
{
if (batch_size)
{
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, num_arguments);
place[size_of_data] = 1;
}
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const override
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
size_t num_arguments = 0) const override
{
if (batch_size)
if (num_arguments > 0)
{
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena);
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, num_arguments);
for (size_t i = 0; i < batch_size; ++i)
{
if (!null_map[i])
if (flags[i] && !null_map[i])
{
place[size_of_data] = 1;
break;
}
}
}
else
{
if (batch_size)
{
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, num_arguments);
for (size_t i = 0; i < batch_size; ++i)
{
if (!null_map[i])
{
place[size_of_data] = 1;
break;
}
}
}
}
}
void merge(
@ -207,6 +265,8 @@ public:
else
to.insertDefault();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};
}

View File

@ -198,6 +198,8 @@ public:
col_offsets.getData().push_back(col.getData().size());
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
};
}

View File

@ -92,7 +92,7 @@ public:
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};
}

View File

@ -282,17 +282,41 @@ public:
}
/// Vectorized version when there is no GROUP BY keys.
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
add(place, columns, i, arena);
}
}
else
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *) const override
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena, size_t num_arguments)
const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i] && flags[i])
add(place, columns, i, arena);
}
else
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override

View File

@ -10,6 +10,7 @@
#include <Core/Block.h>
#include <Common/Exception.h>
#include <Core/Field.h>
#include <Columns/ColumnsNumber.h>
namespace DB
@ -140,19 +141,32 @@ public:
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
virtual void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
size_t num_arguments = 0) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
virtual void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments = 0) const = 0;
/** The same for single place when need to aggregate only filtered data.
*/
virtual void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const = 0;
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
size_t num_arguments = 0) const = 0;
virtual void addBatchSinglePlaceFromInterval(
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments = 0)
const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
@ -192,6 +206,11 @@ public:
return nullptr;
}
/** Return the nested function if this is an Aggregate Function Combinator.
* Otherwise return nullptr.
*/
virtual AggregateFunctionPtr getNestedFunction() const { return {}; }
const DataTypes & getArgumentTypes() const { return argument_types; }
const Array & getParameters() const { return parameters; }
@ -217,31 +236,90 @@ public:
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
void addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
size_t num_arguments = 0) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
else
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments = 0) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
else
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const override
size_t batch_size,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
size_t num_arguments = 0) const override
{
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i] && flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
else
{
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceFromInterval(
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, size_t num_arguments = 0)
const override
{
for (size_t i = batch_begin; i < batch_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
if (num_arguments > 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[num_arguments - 1]).getData();
for (size_t i = batch_begin; i < batch_end; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
else
{
for (size_t i = batch_begin; i < batch_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchArray(

View File

@ -12,7 +12,53 @@
namespace DB
{
#if defined(__SSE2__) && defined(__POPCNT__)
auto toBits64(const Int8 * bytes64)
{
static const __m128i zero16 = _mm_setzero_si128();
return static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64)), zero16)))
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 16)), zero16)))
<< 16)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 32)), zero16)))
<< 32)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 48)), zero16)))
<< 48);
};
#endif
size_t countBytesInFilter(const UInt8 * filt, size_t sz)
{
size_t count = 0;
/** NOTE: In theory, `filt` should only contain zeros and ones.
* But, just in case, here the condition > 0 (to signed bytes) is used.
* It would be better to use != 0, then this does not allow SSE2.
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt);
const Int8 * end = pos + sz;
#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + sz / 64 * 64;
for (; pos < end64; pos += 64)
count += __builtin_popcountll(toBits64(pos));
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
count += *pos > 0;
return count;
}
size_t countBytesInFilter(const IColumn::Filter & filt)
{
return countBytesInFilter(filt.data(), filt.size());
}
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map)
{
size_t count = 0;
@ -22,32 +68,20 @@ size_t countBytesInFilter(const IColumn::Filter & filt)
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data());
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map);
const Int8 * end = pos + filt.size();
#if defined(__SSE2__) && defined(__POPCNT__)
const __m128i zero16 = _mm_setzero_si128();
const Int8 * end64 = pos + filt.size() / 64 * 64;
for (; pos < end64; pos += 64)
count += __builtin_popcountll(
static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos)),
zero16)))
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 16)),
zero16))) << 16)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 32)),
zero16))) << 32)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 48)),
zero16))) << 48));
for (; pos < end64; pos += 64, pos2 += 64)
count += __builtin_popcountll(toBits64(pos) & ~toBits64(pos2));
/// TODO Add duff device for tail?
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
count += *pos > 0;
count += (*pos & ~*pos2) > 0;
return count;
}

View File

@ -15,7 +15,9 @@ namespace ErrorCodes
}
/// Counts how many bytes of `filt` are greater than zero.
size_t countBytesInFilter(const UInt8 * filt, size_t sz);
size_t countBytesInFilter(const IColumn::Filter & filt);
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map);
/// Returns vector with num_columns elements. vector[i] is the count of i values in selector.
/// Selector must contain values from 0 to num_columns - 1. NOTE: this is not checked.

View File

@ -0,0 +1,3 @@
<test>
<query>SELECT countIf(number % 2) FROM numbers(100000000)</query>
</test>

View File

@ -0,0 +1,3 @@
<test>
<query>SELECT sumIf(number, number % 2) FROM numbers(100000000)</query>
</test>