Added fully unrolled method

This commit is contained in:
Alexey Milovidov 2020-07-30 04:27:49 +03:00
parent a7e5feb8b4
commit a9998746ea

View File

@ -337,6 +337,89 @@ public:
{
return alignof(Data);
}
void addBatchLookupTable8(
size_t batch_size,
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
const UInt8 * key,
const IColumn ** columns,
Arena * arena) const override
{
const Derived & func = *static_cast<const Derived *>(this);
if (func.allocatesMemoryInArena() || sizeof(Data) > 16)
{
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(batch_size, map, place_offset, init, key, columns, arena);
return;
}
static constexpr size_t UNROLL_COUNT = 8;
Data places[256 * UNROLL_COUNT];
bool has_data[256 * UNROLL_COUNT]{};
size_t i = 0;
size_t batch_size_unrolled = batch_size / UNROLL_COUNT * UNROLL_COUNT;
for (; i < batch_size_unrolled; i += UNROLL_COUNT)
{
for (size_t j = 0; j < UNROLL_COUNT; ++j)
{
size_t idx = j * 256 + key[i + j];
if (unlikely(!has_data[idx]))
{
new (&places[idx]) Data;
has_data[idx] = true;
}
func.add(reinterpret_cast<char *>(&places[idx]), columns, i + j, nullptr);
}
}
for (size_t k = 0; k < 256; ++k)
{
for (size_t j = 1; j < UNROLL_COUNT; ++j)
{
if (has_data[j * 256 + k])
{
if (unlikely(!has_data[k]))
{
new (&places[k]) Data;
has_data[k] = true;
}
func.merge(
reinterpret_cast<char *>(&places[k]),
reinterpret_cast<const char *>(&places[256 * j + k]),
nullptr);
}
}
}
for (; i < batch_size; ++i)
{
size_t idx = key[i];
if (unlikely(!has_data[idx]))
{
new (&places[idx]) Data;
has_data[idx] = true;
}
func.add(reinterpret_cast<char *>(&places[idx]), columns, i, nullptr);
}
for (size_t k = 0; k < 256; ++k)
{
if (has_data[k])
{
AggregateDataPtr & place = map[k];
if (unlikely(!place))
init(place);
func.merge(place + place_offset, reinterpret_cast<const char *>(&places[k]), arena);
}
}
}
};