Add ability to pass range of rows to Aggregator

v2: fix compiled aggregate functions (seek result to row_start)
v3: fix compiled aggregate functions (seek args to row_start)
v4: change signatures for JIT
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-03-07 16:48:38 +03:00
parent 599a255741
commit 767acd53fb
25 changed files with 497 additions and 255 deletions

View File

@ -225,26 +225,38 @@ public:
}
void
addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const final
addBatchSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const final
{
AggregateFunctionSumData<Numerator> sum_data;
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
sum_data.addManyConditional(column.getData().data(), flags.data(), batch_size);
this->data(place).denominator += countBytesInFilter(flags.data(), batch_size);
sum_data.addManyConditional(column.getData().data(), flags.data(), row_begin, row_end);
this->data(place).denominator += countBytesInFilter(flags.data(), row_begin, row_end);
}
else
{
sum_data.addMany(column.getData().data(), batch_size);
this->data(place).denominator += batch_size;
sum_data.addMany(column.getData().data(), row_begin, row_end);
this->data(place).denominator += (row_end - row_begin);
}
increment(place, sum_data.sum);
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *, ssize_t if_argument_pos)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos)
const final
{
AggregateFunctionSumData<Numerator> sum_data;
@ -253,22 +265,22 @@ public:
{
/// Merge the 2 sets of flags (null and if) into a single one. This allows us to use parallelizable sums when available
const auto * if_flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
auto final_flags = std::make_unique<UInt8[]>(batch_size);
auto final_flags = std::make_unique<UInt8[]>(row_end);
size_t used_value = 0;
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
UInt8 kept = (!null_map[i]) & !!if_flags[i];
final_flags[i] = kept;
used_value += kept;
}
sum_data.addManyConditional(column.getData().data(), final_flags.get(), batch_size);
sum_data.addManyConditional(column.getData().data(), final_flags.get(), row_begin, row_end);
this->data(place).denominator += used_value;
}
else
{
sum_data.addManyNotNull(column.getData().data(), null_map, batch_size);
this->data(place).denominator += batch_size - countBytesInFilter(null_map, batch_size);
sum_data.addManyNotNull(column.getData().data(), null_map, row_begin, row_end);
this->data(place).denominator += (row_end - row_begin) - countBytesInFilter(null_map, row_begin, row_end);
}
increment(place, sum_data.sum);
}

View File

@ -54,7 +54,12 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const override
{
if (if_argument_pos >= 0)
{
@ -63,12 +68,13 @@ public:
}
else
{
data(place).count += batch_size;
data(place).count += row_end - row_begin;
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -78,11 +84,12 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
data(place).count += countBytesInFilterWithNull(flags, null_map);
data(place).count += countBytesInFilterWithNull(flags, null_map, row_begin, row_end);
}
else
{
data(place).count += batch_size - countBytesInFilter(null_map, batch_size);
size_t rows = row_end - row_begin;
data(place).count += rows - countBytesInFilter(null_map, row_begin, row_end);
}
}
@ -204,17 +211,23 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const override
{
const auto & nc = assert_cast<const ColumnNullable &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
data(place).count += countBytesInFilterWithNull(flags, nc.getNullMapData().data());
data(place).count += countBytesInFilterWithNull(flags, nc.getNullMapData().data(), row_begin, row_end);
}
else
{
data(place).count += batch_size - countBytesInFilter(nc.getNullMapData().data(), batch_size);
size_t rows = row_end - row_begin;
data(place).count += rows - countBytesInFilter(nc.getNullMapData().data(), row_begin, row_end);
}
}

View File

@ -200,7 +200,7 @@ public:
arguments_raw[i] = arguments[i].get();
assert(!arguments.empty());
nested_func->addBatchSinglePlace(arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
nested_func->addBatchSinglePlace(0, arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
nested_func->insertResultInto(getNestedPlace(place), to, arena);
}

View File

@ -37,18 +37,18 @@ inline TColumn readItem(const IColumn * column, Arena * arena, size_t row)
template <typename TColumn, typename TFilter = void>
size_t
getFirstNElements_low_threshold(const TColumn * data, int num_elements, int threshold, size_t * results, const TFilter * filter = nullptr)
getFirstNElements_low_threshold(const TColumn * data, size_t row_begin, size_t row_end, size_t threshold, size_t * results, const TFilter * filter = nullptr)
{
for (int i = 0; i < threshold; i++)
for (size_t i = 0; i < threshold; i++)
{
results[i] = 0;
}
threshold = std::min(num_elements, threshold);
int current_max = 0;
int cur;
int z;
for (int i = 0; i < num_elements; i++)
threshold = std::min(row_end - row_begin, threshold);
size_t current_max = 0;
size_t cur;
size_t z;
for (size_t i = row_begin; i < row_end; i++)
{
if constexpr (!std::is_same_v<TFilter, void>)
{
@ -90,12 +90,12 @@ struct SortableItem
template <typename TColumn, typename TFilter = void>
size_t getFirstNElements_high_threshold(
const TColumn * data, size_t num_elements, size_t threshold, size_t * results, const TFilter * filter = nullptr)
const TColumn * data, size_t row_begin, size_t row_end, size_t threshold, size_t * results, const TFilter * filter = nullptr)
{
std::vector<SortableItem<TColumn>> dataIndexed(num_elements);
std::vector<SortableItem<TColumn>> dataIndexed(row_end);
size_t num_elements_filtered = 0;
for (size_t i = 0; i < num_elements; i++)
for (size_t i = row_begin; i < row_end; i++)
{
if constexpr (!std::is_same_v<TFilter, void>)
{
@ -124,21 +124,21 @@ size_t getFirstNElements_high_threshold(
static const size_t THRESHOLD_MAX_CUSTOM_FUNCTION = 1000;
template <typename TColumn>
size_t getFirstNElements(const TColumn * data, size_t num_elements, size_t threshold, size_t * results, const UInt8 * filter = nullptr)
size_t getFirstNElements(const TColumn * data, size_t row_begin, size_t row_end, size_t threshold, size_t * results, const UInt8 * filter = nullptr)
{
if (threshold < THRESHOLD_MAX_CUSTOM_FUNCTION)
{
if (filter != nullptr)
return getFirstNElements_low_threshold(data, num_elements, threshold, results, filter);
return getFirstNElements_low_threshold(data, row_begin, row_end, threshold, results, filter);
else
return getFirstNElements_low_threshold(data, num_elements, threshold, results);
return getFirstNElements_low_threshold(data, row_begin, row_end, threshold, results);
}
else
{
if (filter != nullptr)
return getFirstNElements_high_threshold(data, num_elements, threshold, results, filter);
return getFirstNElements_high_threshold(data, row_begin, row_end, threshold, results, filter);
else
return getFirstNElements_high_threshold(data, num_elements, threshold, results);
return getFirstNElements_high_threshold(data, row_begin, row_end, threshold, results);
}
}
@ -203,7 +203,7 @@ public:
template <typename TColumn, bool is_plain, typename TFunc>
void
forFirstRows(size_t batch_size, const IColumn ** columns, size_t data_column, Arena * arena, ssize_t if_argument_pos, TFunc func) const
forFirstRows(size_t row_begin, size_t row_end, const IColumn ** columns, size_t data_column, Arena * arena, ssize_t if_argument_pos, TFunc func) const
{
const TColumn * values = nullptr;
std::unique_ptr<std::vector<TColumn>> values_vector;
@ -211,8 +211,8 @@ public:
if constexpr (std::is_same_v<TColumn, StringRef>)
{
values_vector.reset(new std::vector<TColumn>(batch_size));
for (size_t i = 0; i < batch_size; i++)
values_vector.reset(new std::vector<TColumn>(row_end));
for (size_t i = row_begin; i < row_end; i++)
(*values_vector)[i] = readItem<TColumn, is_plain>(columns[data_column], arena, i);
values = (*values_vector).data();
}
@ -231,7 +231,7 @@ public:
filter = reinterpret_cast<const UInt8 *>(refFilter.data);
}
size_t num_elements = getFirstNElements(values, batch_size, threshold, best_rows.data(), filter);
size_t num_elements = getFirstNElements(values, row_begin, row_end, threshold, best_rows.data(), filter);
for (size_t i = 0; i < num_elements; i++)
{
func(best_rows[i], values);
@ -239,14 +239,19 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos) const override
{
State & data = this->data(place);
if constexpr (use_column_b)
{
forFirstRows<TColumnB, is_plain_b>(
batch_size, columns, 1, arena, if_argument_pos, [columns, &arena, &data](size_t row, const TColumnB * values)
row_begin, row_end, columns, 1, arena, if_argument_pos, [columns, &arena, &data](size_t row, const TColumnB * values)
{
data.add(readItem<TColumnA, is_plain_a>(columns[0], arena, row), values[row]);
});
@ -254,7 +259,7 @@ public:
else
{
forFirstRows<TColumnA, is_plain_a>(
batch_size, columns, 0, arena, if_argument_pos, [&data](size_t row, const TColumnA * values)
row_begin, row_end, columns, 0, arena, if_argument_pos, [&data](size_t row, const TColumnA * values)
{
data.add(values[row]);
});

View File

@ -119,7 +119,13 @@ public:
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t) const override
void addBatchSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const UInt8 * null_map = column->getNullMapData().data();
@ -142,25 +148,31 @@ public:
/// Combine the 2 flag arrays so we can call a simplified version (one check vs 2)
/// Note that now the null map will contain 0 if not null and not filtered, or 1 for null or filtered (or both)
auto final_nulls = std::make_unique<UInt8[]>(batch_size);
auto final_nulls = std::make_unique<UInt8[]>(row_end);
if (filter_null_map)
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_values[i]) | (!!filter_null_map[i]);
else
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_values[i]);
if constexpr (result_is_nullable)
{
if (!memoryIsByte(final_nulls.get(), batch_size, 1))
if (!memoryIsByte(final_nulls.get(), row_begin, row_end, 1))
this->setFlag(place);
else
return; /// No work to do.
}
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), columns_param, final_nulls.get(), arena, -1);
row_begin,
row_end,
this->nestedPlace(place),
columns_param,
final_nulls.get(),
arena,
-1);
}
#if USE_EMBEDDED_COMPILER

View File

@ -98,31 +98,38 @@ public:
}
void addBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
nested_func->addBatch(batch_size, places, place_offset, columns, arena, num_arguments - 1);
nested_func->addBatch(row_begin, row_end, places, place_offset, columns, arena, num_arguments - 1);
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
nested_func->addBatchSinglePlace(batch_size, place, columns, arena, num_arguments - 1);
nested_func->addBatchSinglePlace(row_begin, row_end, place, columns, arena, num_arguments - 1);
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
ssize_t) const override
{
nested_func->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, num_arguments - 1);
nested_func->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, num_arguments - 1);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
@ -131,13 +138,14 @@ public:
}
void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
nested_func->mergeBatch(batch_size, places, place_offset, rhs, arena);
nested_func->mergeBatch(row_begin, row_end, places, place_offset, rhs, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

View File

@ -1159,7 +1159,12 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos) const override
{
if constexpr (is_any)
if (this->data(place).has())
@ -1167,7 +1172,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
{
@ -1179,7 +1184,7 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
this->data(place).changeIfBetter(*columns[0], i, arena);
if constexpr (is_any)
@ -1189,7 +1194,8 @@ public:
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -1203,7 +1209,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (!null_map[i] && flags[i])
{
@ -1215,7 +1221,7 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (!null_map[i])
{

View File

@ -307,17 +307,22 @@ public:
}
void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
const UInt8 * null_map = column->getNullMapData().data();
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), &nested_column, null_map, arena, if_argument_pos);
row_begin, row_end, this->nestedPlace(place), &nested_column, null_map, arena, if_argument_pos);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))
if (!memoryIsByte(null_map, row_begin, row_end, 1))
this->setFlag(place);
}

View File

@ -109,7 +109,8 @@ public:
}
void addBatch( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -119,7 +120,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i] && places[i])
add(places[i] + place_offset, columns, i, arena);
@ -127,21 +128,26 @@ public:
}
else
{
nested_function->addBatch(batch_size, places, place_offset, columns, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatch(row_begin, row_end, places, place_offset, columns, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
(places[i] + place_offset)[size_of_data] = 1;
}
}
void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
{
@ -152,16 +158,17 @@ public:
}
else
{
if (batch_size)
if (row_end != row_begin)
{
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, if_argument_pos);
nested_function->addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos);
place[size_of_data] = 1;
}
}
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -171,8 +178,8 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i] && !null_map[i])
{
@ -183,10 +190,10 @@ public:
}
else
{
if (batch_size)
if (row_end != row_begin)
{
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
{
if (!null_map[i])
{
@ -208,14 +215,15 @@ public:
}
void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
nested_function->mergeBatch(batch_size, places, place_offset, rhs, arena);
for (size_t i = 0; i < batch_size; ++i)
nested_function->mergeBatch(row_begin, row_end, places, place_offset, rhs, arena);
for (size_t i = row_begin; i < row_end; ++i)
(places[i] + place_offset)[size_of_data] |= rhs[i][size_of_data];
}

View File

@ -59,9 +59,11 @@ struct AggregateFunctionSumData
/// Vectorized version
template <typename Value>
void NO_SANITIZE_UNDEFINED NO_INLINE addMany(const Value * __restrict ptr, size_t count)
void NO_SANITIZE_UNDEFINED NO_INLINE addMany(const Value * __restrict ptr, size_t start, size_t end)
{
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
if constexpr (std::is_floating_point_v<T>)
{
@ -87,7 +89,7 @@ struct AggregateFunctionSumData
/// clang cannot vectorize the loop if accumulator is class member instead of local variable.
T local_sum{};
while (ptr < end)
while (ptr < end_ptr)
{
Impl::add(local_sum, *ptr);
++ptr;
@ -97,9 +99,11 @@ struct AggregateFunctionSumData
template <typename Value, bool add_if_zero>
void NO_SANITIZE_UNDEFINED NO_INLINE
addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t count)
addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t start, size_t end)
{
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
if constexpr (
(is_integer<T> && !is_big_int_v<T>)
@ -108,7 +112,7 @@ struct AggregateFunctionSumData
/// For integers we can vectorize the operation if we replace the null check using a multiplication (by 0 for null, 1 for not null)
/// https://quick-bench.com/q/MLTnfTvwC2qZFVeWHfOBR3U7a8I
T local_sum{};
while (ptr < end)
while (ptr < end_ptr)
{
T multiplier = !*condition_map == add_if_zero;
Impl::add(local_sum, *ptr * multiplier);
@ -151,7 +155,7 @@ struct AggregateFunctionSumData
}
T local_sum{};
while (ptr < end)
while (ptr < end_ptr)
{
if (!*condition_map == add_if_zero)
Impl::add(local_sum, *ptr);
@ -162,15 +166,15 @@ struct AggregateFunctionSumData
}
template <typename Value>
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, true>(ptr, null_map, count);
return addManyConditionalInternal<Value, true>(ptr, null_map, start, end);
}
template <typename Value>
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t count)
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, false>(ptr, cond_map, count);
return addManyConditionalInternal<Value, false>(ptr, cond_map, start, end);
}
void NO_SANITIZE_UNDEFINED merge(const AggregateFunctionSumData & rhs)
@ -220,7 +224,7 @@ struct AggregateFunctionSumKahanData
/// Vectorized version
template <typename Value>
void NO_INLINE addMany(const Value * __restrict ptr, size_t count)
void NO_INLINE addMany(const Value * __restrict ptr, size_t start, size_t end)
{
/// Less than in ordinary sum, because the algorithm is more complicated and too large loop unrolling is questionable.
/// But this is just a guess.
@ -228,7 +232,10 @@ struct AggregateFunctionSumKahanData
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
@ -241,7 +248,7 @@ struct AggregateFunctionSumKahanData
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
while (ptr < end_ptr)
{
addImpl(*ptr, sum, compensation);
++ptr;
@ -249,13 +256,16 @@ struct AggregateFunctionSumKahanData
}
template <typename Value, bool add_if_zero>
void NO_INLINE addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t count)
void NO_INLINE addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t start, size_t end)
{
constexpr size_t unroll_count = 4;
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
@ -270,7 +280,7 @@ struct AggregateFunctionSumKahanData
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
while (ptr < end_ptr)
{
if ((!*condition_map) == add_if_zero)
addImpl(*ptr, sum, compensation);
@ -280,15 +290,15 @@ struct AggregateFunctionSumKahanData
}
template <typename Value>
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, true>(ptr, null_map, count);
return addManyConditionalInternal<Value, true>(ptr, null_map, start, end);
}
template <typename Value>
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t count)
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, false>(ptr, cond_map, count);
return addManyConditionalInternal<Value, false>(ptr, cond_map, start, end);
}
void ALWAYS_INLINE mergeImpl(T & to_sum, T & to_compensation, T from_sum, T from_compensation)
@ -385,22 +395,33 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const override
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
this->data(place).addManyConditional(column.getData().data(), flags.data(), batch_size);
this->data(place).addManyConditional(column.getData().data(), flags.data(), row_begin, row_end);
}
else
{
this->data(place).addMany(column.getData().data(), batch_size);
this->data(place).addMany(column.getData().data(), row_begin, row_end);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *, ssize_t if_argument_pos)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos)
const override
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
@ -408,15 +429,15 @@ public:
{
/// Merge the 2 sets of flags (null and if) into a single one. This allows us to use parallelizable sums when available
const auto * if_flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
auto final_flags = std::make_unique<UInt8[]>(batch_size);
for (size_t i = 0; i < batch_size; ++i)
auto final_flags = std::make_unique<UInt8[]>(row_end);
for (size_t i = row_begin; i < row_end; ++i)
final_flags[i] = (!null_map[i]) & if_flags[i];
this->data(place).addManyConditional(column.getData().data(), final_flags.get(), batch_size);
this->data(place).addManyConditional(column.getData().data(), final_flags.get(), row_begin, row_end);
}
else
{
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
this->data(place).addManyNotNull(column.getData().data(), null_map, row_begin, row_end);
}
}

View File

@ -175,7 +175,8 @@ public:
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -184,13 +185,16 @@ public:
/// The version of "addBatch", that handle sparse columns as arguments.
virtual void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const = 0;
virtual void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
@ -199,17 +203,27 @@ public:
/** The same for single place.
*/
virtual void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const = 0;
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
/// The version of "addBatchSinglePlace", that handle sparse columns as arguments.
virtual void addBatchSparseSinglePlace(
AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena) const = 0;
/** The same for single place when need to aggregate only filtered data.
* Instead of using an if-column, the condition is combined inside the null_map
*/
virtual void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -217,7 +231,12 @@ public:
ssize_t if_argument_pos = -1) const = 0;
virtual void addBatchSinglePlaceFromInterval( /// NOLINT
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1)
const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
@ -226,7 +245,8 @@ public:
* "places" contains a large number of same values consecutively.
*/
virtual void addBatchArray(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -237,7 +257,8 @@ public:
* and pointers to aggregation states are stored in AggregateDataPtr[256] lookup table.
*/
virtual void addBatchLookupTable8(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
@ -251,7 +272,8 @@ public:
* All places that were not inserted must be destroyed if there was exception during insert into result column.
*/
virtual void insertResultIntoBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
@ -261,7 +283,8 @@ public:
/** Destroy batch of aggregate places.
*/
virtual void destroyBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset) const noexcept = 0;
@ -355,7 +378,8 @@ public:
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -365,7 +389,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i] && places[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
@ -373,13 +397,15 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -387,33 +413,42 @@ public:
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
size_t batch_size = column_sparse.size();
auto offset_it = column_sparse.begin();
for (size_t i = 0; i < batch_size; ++i, ++offset_it)
/// FIXME: make it more optimal
for (size_t i = 0; i < row_begin; ++i, ++offset_it)
;
for (size_t i = 0; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(places[offset_it.getCurrentRow()] + place_offset,
&values, offset_it.getValueIndex(), arena);
}
void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
}
void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
@ -421,26 +456,34 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSparseSinglePlace(
AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena) const override
{
/// TODO: add values and defaults separately if order of adding isn't important.
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
size_t batch_size = column_sparse.size();
auto offset_it = column_sparse.begin();
for (size_t i = 0; i < batch_size; ++i, ++offset_it)
/// FIXME: make it more optimal
for (size_t i = 0; i < row_begin; ++i, ++offset_it)
;
for (size_t i = 0; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(place, &values, offset_it.getValueIndex(), arena);
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -450,26 +493,31 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (!null_map[i] && flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceFromInterval( /// NOLINT
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1)
const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = batch_begin; i < batch_end; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
@ -477,17 +525,23 @@ public:
}
else
{
for (size_t i = batch_begin; i < batch_end; ++i)
for (size_t i = row_begin; i < row_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena)
const override
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
@ -498,7 +552,8 @@ public:
}
void addBatchLookupTable8(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
@ -508,10 +563,10 @@ public:
{
static constexpr size_t UNROLL_COUNT = 8;
size_t i = 0;
size_t i = row_begin;
size_t batch_size_unrolled = batch_size / UNROLL_COUNT * UNROLL_COUNT;
for (; i < batch_size_unrolled; i += UNROLL_COUNT)
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
for (; i < size_unrolled; i += UNROLL_COUNT)
{
AggregateDataPtr places[UNROLL_COUNT];
for (size_t j = 0; j < UNROLL_COUNT; ++j)
@ -527,7 +582,7 @@ public:
static_cast<const Derived *>(this)->add(places[j] + place_offset, columns, i + j, arena);
}
for (; i < batch_size; ++i)
for (; i < row_end; ++i)
{
AggregateDataPtr & place = map[key[i]];
if (unlikely(!place))
@ -536,13 +591,20 @@ public:
}
}
void insertResultIntoBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, IColumn & to, Arena * arena, bool destroy_place_after_insert) const override
void insertResultIntoBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
Arena * arena,
bool destroy_place_after_insert) const override
{
size_t batch_index = 0;
size_t batch_index = row_begin;
try
{
for (; batch_index < batch_size; ++batch_index)
for (; batch_index < row_end; ++batch_index)
{
static_cast<const Derived *>(this)->insertResultInto(places[batch_index] + place_offset, to, arena);
@ -552,16 +614,20 @@ public:
}
catch (...)
{
for (size_t destroy_index = batch_index; destroy_index < batch_size; ++destroy_index)
for (size_t destroy_index = batch_index; destroy_index < row_end; ++destroy_index)
static_cast<const Derived *>(this)->destroy(places[destroy_index] + place_offset);
throw;
}
}
void destroyBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset) const noexcept override
void destroyBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset) const noexcept override
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
static_cast<const Derived *>(this)->destroy(places[i] + place_offset);
}
@ -612,7 +678,8 @@ public:
}
void addBatchLookupTable8(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
@ -626,7 +693,7 @@ public:
if (func.allocatesMemoryInArena() || sizeof(Data) > 16 || func.sizeOfData() != sizeof(Data))
{
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(batch_size, map, place_offset, init, key, columns, arena);
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(row_begin, row_end, map, place_offset, init, key, columns, arena);
return;
}
@ -637,12 +704,12 @@ public:
std::unique_ptr<Data[]> places{new Data[256 * UNROLL_COUNT]};
bool has_data[256 * UNROLL_COUNT]{}; /// Separate flags array to avoid heavy initialization.
size_t i = 0;
size_t i = row_begin;
/// Aggregate data into different lookup tables.
size_t batch_size_unrolled = batch_size / UNROLL_COUNT * UNROLL_COUNT;
for (; i < batch_size_unrolled; i += UNROLL_COUNT)
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
for (; i < size_unrolled; i += UNROLL_COUNT)
{
for (size_t j = 0; j < UNROLL_COUNT; ++j)
{
@ -676,7 +743,7 @@ public:
/// Process tails and add directly to the final destination.
for (; i < batch_size; ++i)
for (; i < row_end; ++i)
{
size_t k = key[i];
AggregateDataPtr & place = map[k];

View File

@ -54,7 +54,7 @@ MutableColumnPtr ColumnFixedString::cloneResized(size_t size) const
bool ColumnFixedString::isDefaultAt(size_t index) const
{
assert(index < size());
return memoryIsZero(chars.data() + index * n, n);
return memoryIsZero(chars.data() + index * n, 0, n);
}
void ColumnFixedString::insert(const Field & x)

View File

@ -27,7 +27,7 @@ static UInt64 toBits64(const Int8 * bytes64)
}
#endif
size_t countBytesInFilter(const UInt8 * filt, size_t sz)
size_t countBytesInFilter(const UInt8 * filt, size_t start, size_t end)
{
size_t count = 0;
@ -37,18 +37,20 @@ size_t countBytesInFilter(const UInt8 * filt, size_t sz)
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt);
const Int8 * end = pos + sz;
pos += start;
const Int8 * end_pos = pos + (end - start);
#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + sz / 64 * 64;
const Int8 * end_pos64 = pos + (end - start) / 64 * 64;
for (; pos < end64; pos += 64)
for (; pos < end_pos64; pos += 64)
count += __builtin_popcountll(toBits64(pos));
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
for (; pos < end_pos; ++pos)
count += *pos != 0;
return count;
@ -56,10 +58,10 @@ size_t countBytesInFilter(const UInt8 * filt, size_t sz)
size_t countBytesInFilter(const IColumn::Filter & filt)
{
return countBytesInFilter(filt.data(), filt.size());
return countBytesInFilter(filt.data(), 0, filt.size());
}
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map)
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map, size_t start, size_t end)
{
size_t count = 0;
@ -68,20 +70,20 @@ size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * nu
* It would be better to use != 0, then this does not allow SSE2.
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data());
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map);
const Int8 * end = pos + filt.size();
const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data()) + start;
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map) + start;
const Int8 * end_pos = pos + (end - start);
#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + filt.size() / 64 * 64;
const Int8 * end_pos64 = pos + (end - start) / 64 * 64;
for (; pos < end64; pos += 64, pos2 += 64)
for (; pos < end_pos64; pos += 64, pos2 += 64)
count += __builtin_popcountll(toBits64(pos) & ~toBits64(pos2));
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos, ++pos2)
for (; pos < end_pos; ++pos, ++pos2)
count += (*pos & ~*pos2) != 0;
return count;
@ -96,17 +98,18 @@ std::vector<size_t> countColumnsSizeInSelector(IColumn::ColumnIndex num_columns,
return counts;
}
bool memoryIsByte(const void * data, size_t size, uint8_t byte)
bool memoryIsByte(const void * data, size_t start, size_t end, uint8_t byte)
{
size_t size = end - start;
if (size == 0)
return true;
const auto * ptr = reinterpret_cast<const uint8_t *>(data);
const auto * ptr = reinterpret_cast<const uint8_t *>(data) + start;
return *ptr == byte && memcmp(ptr, ptr + 1, size - 1) == 0;
}
bool memoryIsZero(const void * data, size_t size)
bool memoryIsZero(const void * data, size_t start, size_t end)
{
return memoryIsByte(data, size, 0x0);
return memoryIsByte(data, start, end, 0x0);
}
namespace ErrorCodes

View File

@ -53,17 +53,17 @@ inline UInt64 bytes64MaskToBits64Mask(const UInt8 * bytes64)
}
/// Counts how many bytes of `filt` are greater than zero.
size_t countBytesInFilter(const UInt8 * filt, size_t sz);
size_t countBytesInFilter(const UInt8 * filt, size_t start, size_t end);
size_t countBytesInFilter(const IColumn::Filter & filt);
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map);
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map, size_t start, size_t end);
/// Returns vector with num_columns elements. vector[i] is the count of i values in selector.
/// Selector must contain values from 0 to num_columns - 1. NOTE: this is not checked.
std::vector<size_t> countColumnsSizeInSelector(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector);
/// Returns true, if the memory contains only zeros.
bool memoryIsZero(const void * data, size_t size);
bool memoryIsByte(const void * data, size_t size, uint8_t byte);
bool memoryIsZero(const void * data, size_t start, size_t end);
bool memoryIsByte(const void * data, size_t start, size_t end, uint8_t byte);
/// The general implementation of `filter` function for ColumnArray and ColumnString.
template <typename T>

View File

@ -3576,7 +3576,7 @@ private:
const auto & nullable_col = assert_cast<const ColumnNullable &>(*col);
const auto & null_map = nullable_col.getNullMapData();
if (!memoryIsZero(null_map.data(), null_map.size()))
if (!memoryIsZero(null_map.data(), 0, null_map.size()))
throw Exception{"Cannot convert NULL value to non-Nullable type",
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
}

View File

@ -186,7 +186,7 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->addBatchArray(input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
that->addBatchArray(0, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)

View File

@ -147,7 +147,7 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam
/// Unnest consecutive trailing -State combinators
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->addBatch(input_rows_count, places.data(), 0, aggregate_arguments, arena.get());
that->addBatch(0, input_rows_count, places.data(), 0, aggregate_arguments, arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)

View File

@ -125,7 +125,7 @@ public:
if (in)
{
const auto & in_data = in->getData();
if (!memoryIsZero(in_data.data(), in_data.size() * sizeof(in_data[0])))
if (!memoryIsZero(in_data.data(), 0, in_data.size() * sizeof(in_data[0])))
{
throw Exception(ErrorCodes::FUNCTION_THROW_IF_VALUE_IS_NON_ZERO,
message.value_or("Value passed to '" + getName() + "' function is non zero"));

View File

@ -860,7 +860,8 @@ template <typename Method>
void NO_INLINE Aggregator::executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
@ -873,17 +874,17 @@ void NO_INLINE Aggregator::executeImpl(
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
{
executeImplBatch<false, true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch<false, true>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
else
#endif
{
executeImplBatch<false, false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch<false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
}
else
{
executeImplBatch<true, false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch<true, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
}
@ -892,7 +893,8 @@ void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const
{
@ -904,7 +906,7 @@ void NO_INLINE Aggregator::executeImplBatch(
/// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0);
for (size_t i = 0; i < rows; ++i)
for (size_t i = row_begin; i < row_end; ++i)
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
return;
}
@ -928,7 +930,8 @@ void NO_INLINE Aggregator::executeImplBatch(
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
rows,
row_begin,
row_end,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
@ -944,10 +947,14 @@ void NO_INLINE Aggregator::executeImplBatch(
}
}
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
/// NOTE: only row_end-row_start is required, but:
/// - this affects only optimize_aggregation_in_order,
/// - this is just a pointer, so it should not be significant,
/// - and plus this will require other changes in the interface.
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
@ -1032,7 +1039,7 @@ void NO_INLINE Aggregator::executeImplBatch(
}
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(rows, columns_data.data(), places.get());
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
}
#endif
@ -1048,11 +1055,11 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
@ -1060,10 +1067,13 @@ void NO_INLINE Aggregator::executeImplBatch(
template <bool use_compiled_functions>
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
size_t row_begin, size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
{
if (row_begin == row_end)
return;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
{
@ -1084,7 +1094,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
}
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(rows, columns_data.data(), res);
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), res);
#if defined(MEMORY_SANITIZER)
@ -1115,11 +1125,23 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(
inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
0,
inst->offsets[static_cast<ssize_t>(row_end - 1)],
res + inst->state_offset,
inst->batch_arguments,
arena);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSparseSinglePlace(
row_begin, row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
else
inst->batch_that->addBatchSinglePlace(rows, res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlace(
row_begin, row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
}
}
@ -1211,16 +1233,27 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
}
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
bool Aggregator::executeOnBlock(const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
bool & no_more_keys) const
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
return executeOnBlock(block.getColumns(),
/* row_begin= */ 0, block.rows(),
result,
key_columns,
aggregate_columns,
no_more_keys);
}
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
bool Aggregator::executeOnBlock(Columns columns,
size_t row_begin, size_t row_end,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -1276,12 +1309,12 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
// #if USE_EMBEDDED_COMPILER
// if (compiled_aggregate_functions_holder)
// {
// executeWithoutKeyImpl<true>(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
// executeWithoutKeyImpl<true>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
// }
// else
// #endif
{
executeWithoutKeyImpl<false>(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
executeWithoutKeyImpl<false>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
}
}
else
@ -1291,7 +1324,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, aggregate_functions_instructions.data(), \
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), \
no_more_keys, overflow_row_ptr);
if (false) {} // NOLINT
@ -1718,7 +1751,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
auto insert_aggregates_into_columns_function = compiled_functions.insert_aggregates_into_columns_function;
insert_aggregates_into_columns_function(places.size(), columns_data.data(), places.data());
insert_aggregates_into_columns_function(0, places.size(), columns_data.data(), places.data());
}
#endif
@ -1747,7 +1780,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoBatch(places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
}
}
catch (...)
@ -1767,7 +1800,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset);
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(0, places.size(), places.data(), offset);
}
if (exception)
@ -2527,6 +2560,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
/// For all rows.
size_t rows = block.rows();
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
for (size_t i = 0; i < rows; ++i)
@ -2565,7 +2599,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
{
/// Merge state of aggregate functions.
aggregate_functions[j]->mergeBatch(
rows, places.get(), offsets_of_aggregate_states[j],
0, rows,
places.get(), offsets_of_aggregate_states[j],
aggregate_columns[j]->data(),
aggregates_pool);
}

View File

@ -1022,12 +1022,17 @@ public:
using AggregateFunctionsPlainPtrs = std::vector<const IAggregateFunction *>;
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool executeOnBlock(const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys) const;
bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool executeOnBlock(Columns columns,
size_t row_begin, size_t row_end,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys) const;
/// Used for aggregate projection.
@ -1165,7 +1170,8 @@ private:
void executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
@ -1177,7 +1183,8 @@ private:
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const;
@ -1185,7 +1192,8 @@ private:
template <bool use_compiled_functions>
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;

View File

@ -309,11 +309,12 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * places_type = b.getInt8Ty()->getPointerTo()->getPointerTo();
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func_definition->args().begin();
llvm::Value * rows_count_arg = arguments++;
llvm::Value * row_start_arg = arguments++;
llvm::Value * row_end_arg = arguments++;
llvm::Value * columns_arg = arguments++;
llvm::Value * places_arg = arguments++;
@ -322,6 +323,9 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition);
b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry);
auto * places_start_arg = entry_builder.CreateInBoundsGEP(nullptr, places_arg, row_start_arg);
std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0;
@ -338,7 +342,16 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg);
if (argument_type->isNullable())
{
data_placeholder.null_init = b.CreateExtractValue(data, {1});
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg);
}
else
{
data_placeholder.null_init = nullptr;
}
columns.emplace_back(data_placeholder);
}
@ -350,15 +363,15 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.CreateCondBr(b.CreateICmpEQ(row_start_arg, row_end_arg), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
auto * counter_phi = b.CreatePHI(row_start_arg->getType(), 2);
counter_phi->addIncoming(row_start_arg, entry);
auto * places_phi = b.CreatePHI(places_arg->getType(), 2);
places_phi->addIncoming(places_arg, entry);
auto * places_phi = b.CreatePHI(places_start_arg->getType(), 2);
places_phi->addIncoming(places_start_arg, entry);
for (auto & col : columns)
{
@ -428,7 +441,7 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();
@ -443,11 +456,12 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * places_type = b.getInt8Ty()->getPointerTo();
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func_definition->args().begin();
llvm::Value * rows_count_arg = arguments++;
llvm::Value * row_start_arg = arguments++;
llvm::Value * row_end_arg = arguments++;
llvm::Value * columns_arg = arguments++;
llvm::Value * place_arg = arguments++;
@ -456,6 +470,8 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition);
b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry);
std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0;
@ -472,7 +488,16 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg);
if (argument_type->isNullable())
{
data_placeholder.null_init = b.CreateExtractValue(data, {1});
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg);
}
else
{
data_placeholder.null_init = nullptr;
}
columns.emplace_back(data_placeholder);
}
@ -484,12 +509,12 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.CreateCondBr(b.CreateICmpEQ(row_start_arg, row_end_arg), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
auto * counter_phi = b.CreatePHI(row_start_arg->getType(), 2);
counter_phi->addIncoming(row_start_arg, entry);
for (auto & col : columns)
{
@ -555,7 +580,7 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();
@ -600,35 +625,47 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * aggregate_data_places_type = b.getInt8Ty()->getPointerTo()->getPointerTo();
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), aggregate_data_places_type }, false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, size_type, column_data_type->getPointerTo(), aggregate_data_places_type }, false);
auto * aggregate_loop_func = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func->args().begin();
llvm::Value * rows_count_arg = &*arguments++;
llvm::Value * row_start_arg = &*arguments++;
llvm::Value * row_end_arg = &*arguments++;
llvm::Value * columns_arg = &*arguments++;
llvm::Value * aggregate_data_places_arg = &*arguments++;
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func);
b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry);
std::vector<ColumnDataPlaceholder> columns(functions.size());
for (size_t i = 0; i < functions.size(); ++i)
{
auto return_type = functions[i].function->getReturnType();
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, i));
columns[i].data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(return_type))->getPointerTo());
columns[i].null_init = return_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
columns[i].data_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].data_init, row_start_arg);
if (return_type->isNullable())
{
columns[i].null_init = b.CreateExtractValue(data, {1});
columns[i].null_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].null_init, row_start_arg);
}
else
{
columns[i].null_init = nullptr;
}
}
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.CreateCondBr(b.CreateICmpEQ(row_start_arg, row_end_arg), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
auto * counter_phi = b.CreatePHI(row_start_arg->getType(), 2);
counter_phi->addIncoming(row_start_arg, entry);
auto * aggregate_data_place_phi = b.CreatePHI(aggregate_data_places_type, 2);
aggregate_data_place_phi->addIncoming(aggregate_data_places_arg, entry);
@ -682,7 +719,7 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
aggregate_data_place_phi->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_phi, 1), cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();

View File

@ -26,6 +26,7 @@ struct ColumnData
*/
ColumnData getColumnData(const IColumn * column);
using ColumnDataRowsOffset = size_t;
using ColumnDataRowsSize = size_t;
using JITCompiledFunction = void (*)(ColumnDataRowsSize, ColumnData *);
@ -51,10 +52,10 @@ struct AggregateFunctionWithOffset
};
using JITCreateAggregateStatesFunction = void (*)(AggregateDataPtr);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITAddIntoAggregateStatesFunctionSinglePlace = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsOffset, ColumnDataRowsOffset, ColumnData *, AggregateDataPtr *);
using JITAddIntoAggregateStatesFunctionSinglePlace = void (*)(ColumnDataRowsOffset, ColumnDataRowsOffset, ColumnData *, AggregateDataPtr);
using JITMergeAggregateStatesFunction = void (*)(AggregateDataPtr, AggregateDataPtr);
using JITInsertAggregateStatesIntoColumnsFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITInsertAggregateStatesIntoColumnsFunction = void (*)(ColumnDataRowsOffset, ColumnDataRowsOffset, ColumnData *, AggregateDataPtr *);
struct CompiledAggregateFunctions
{

View File

@ -159,8 +159,9 @@ void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggrega
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns,
columns_for_aggregator, no_more_keys);
aggregator->executeOnBlock(
aggregate_chunk, /* row_begin= */ 0, length,
aggregation_result, key_columns, columns_for_aggregator, no_more_keys);
}

View File

@ -533,7 +533,7 @@ void AggregatingTransform::consume(Chunk chunk)
}
else
{
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys))
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}
}

View File

@ -64,7 +64,7 @@ void CheckConstraintsTransform::onConsume(Chunk chunk)
/// Check if constraint value is nullable
const auto & null_map = column_nullable->getNullMapColumn();
const PaddedPODArray<UInt8> & null_map_data = null_map.getData();
bool null_map_contains_null = !memoryIsZero(null_map_data.raw_data(), null_map_data.size() * sizeof(UInt8));
bool null_map_contains_null = !memoryIsZero(null_map_data.raw_data(), 0, null_map_data.size() * sizeof(UInt8));
if (null_map_contains_null)
throw Exception(
@ -84,7 +84,7 @@ void CheckConstraintsTransform::onConsume(Chunk chunk)
size_t size = res_column_uint8.size();
/// Is violated.
if (!memoryIsByte(res_data, size, 1))
if (!memoryIsByte(res_data, 0, size, 1))
{
size_t row_idx = 0;
for (; row_idx < size; ++row_idx)