Merge branch 'master' into aggregator-jit-lock-fix

This commit is contained in:
mergify[bot] 2022-05-02 16:16:36 +00:00 committed by GitHub
commit 93f5aa7488
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1148 additions and 526 deletions

View File

@ -225,26 +225,38 @@ public:
}
void
addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const final
addBatchSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const final
{
AggregateFunctionSumData<Numerator> sum_data;
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
sum_data.addManyConditional(column.getData().data(), flags.data(), batch_size);
this->data(place).denominator += countBytesInFilter(flags.data(), batch_size);
sum_data.addManyConditional(column.getData().data(), flags.data(), row_begin, row_end);
this->data(place).denominator += countBytesInFilter(flags.data(), row_begin, row_end);
}
else
{
sum_data.addMany(column.getData().data(), batch_size);
this->data(place).denominator += batch_size;
sum_data.addMany(column.getData().data(), row_begin, row_end);
this->data(place).denominator += (row_end - row_begin);
}
increment(place, sum_data.sum);
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *, ssize_t if_argument_pos)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos)
const final
{
AggregateFunctionSumData<Numerator> sum_data;
@ -253,22 +265,22 @@ public:
{
/// Merge the 2 sets of flags (null and if) into a single one. This allows us to use parallelizable sums when available
const auto * if_flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
auto final_flags = std::make_unique<UInt8[]>(batch_size);
auto final_flags = std::make_unique<UInt8[]>(row_end);
size_t used_value = 0;
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
UInt8 kept = (!null_map[i]) & !!if_flags[i];
final_flags[i] = kept;
used_value += kept;
}
sum_data.addManyConditional(column.getData().data(), final_flags.get(), batch_size);
sum_data.addManyConditional(column.getData().data(), final_flags.get(), row_begin, row_end);
this->data(place).denominator += used_value;
}
else
{
sum_data.addManyNotNull(column.getData().data(), null_map, batch_size);
this->data(place).denominator += batch_size - countBytesInFilter(null_map, batch_size);
sum_data.addManyNotNull(column.getData().data(), null_map, row_begin, row_end);
this->data(place).denominator += (row_end - row_begin) - countBytesInFilter(null_map, row_begin, row_end);
}
increment(place, sum_data.sum);
}

View File

@ -54,7 +54,12 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const override
{
if (if_argument_pos >= 0)
{
@ -63,12 +68,13 @@ public:
}
else
{
data(place).count += batch_size;
data(place).count += row_end - row_begin;
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -78,11 +84,12 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
data(place).count += countBytesInFilterWithNull(flags, null_map);
data(place).count += countBytesInFilterWithNull(flags, null_map, row_begin, row_end);
}
else
{
data(place).count += batch_size - countBytesInFilter(null_map, batch_size);
size_t rows = row_end - row_begin;
data(place).count += rows - countBytesInFilter(null_map, row_begin, row_end);
}
}
@ -204,17 +211,23 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const override
{
const auto & nc = assert_cast<const ColumnNullable &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
data(place).count += countBytesInFilterWithNull(flags, nc.getNullMapData().data());
data(place).count += countBytesInFilterWithNull(flags, nc.getNullMapData().data(), row_begin, row_end);
}
else
{
data(place).count += batch_size - countBytesInFilter(nc.getNullMapData().data(), batch_size);
size_t rows = row_end - row_begin;
data(place).count += rows - countBytesInFilter(nc.getNullMapData().data(), row_begin, row_end);
}
}

View File

@ -200,7 +200,7 @@ public:
arguments_raw[i] = arguments[i].get();
assert(!arguments.empty());
nested_func->addBatchSinglePlace(arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
nested_func->addBatchSinglePlace(0, arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
nested_func->insertResultInto(getNestedPlace(place), to, arena);
}

View File

@ -37,18 +37,18 @@ inline TColumn readItem(const IColumn * column, Arena * arena, size_t row)
template <typename TColumn, typename TFilter = void>
size_t
getFirstNElements_low_threshold(const TColumn * data, int num_elements, int threshold, size_t * results, const TFilter * filter = nullptr)
getFirstNElements_low_threshold(const TColumn * data, size_t row_begin, size_t row_end, size_t threshold, size_t * results, const TFilter * filter = nullptr)
{
for (int i = 0; i < threshold; i++)
for (size_t i = 0; i < threshold; i++)
{
results[i] = 0;
}
threshold = std::min(num_elements, threshold);
int current_max = 0;
int cur;
int z;
for (int i = 0; i < num_elements; i++)
threshold = std::min(row_end - row_begin, threshold);
size_t current_max = 0;
size_t cur;
size_t z;
for (size_t i = row_begin; i < row_end; i++)
{
if constexpr (!std::is_same_v<TFilter, void>)
{
@ -90,12 +90,12 @@ struct SortableItem
template <typename TColumn, typename TFilter = void>
size_t getFirstNElements_high_threshold(
const TColumn * data, size_t num_elements, size_t threshold, size_t * results, const TFilter * filter = nullptr)
const TColumn * data, size_t row_begin, size_t row_end, size_t threshold, size_t * results, const TFilter * filter = nullptr)
{
std::vector<SortableItem<TColumn>> dataIndexed(num_elements);
std::vector<SortableItem<TColumn>> dataIndexed(row_end);
size_t num_elements_filtered = 0;
for (size_t i = 0; i < num_elements; i++)
for (size_t i = row_begin; i < row_end; i++)
{
if constexpr (!std::is_same_v<TFilter, void>)
{
@ -124,21 +124,21 @@ size_t getFirstNElements_high_threshold(
static const size_t THRESHOLD_MAX_CUSTOM_FUNCTION = 1000;
template <typename TColumn>
size_t getFirstNElements(const TColumn * data, size_t num_elements, size_t threshold, size_t * results, const UInt8 * filter = nullptr)
size_t getFirstNElements(const TColumn * data, size_t row_begin, size_t row_end, size_t threshold, size_t * results, const UInt8 * filter = nullptr)
{
if (threshold < THRESHOLD_MAX_CUSTOM_FUNCTION)
{
if (filter != nullptr)
return getFirstNElements_low_threshold(data, num_elements, threshold, results, filter);
return getFirstNElements_low_threshold(data, row_begin, row_end, threshold, results, filter);
else
return getFirstNElements_low_threshold(data, num_elements, threshold, results);
return getFirstNElements_low_threshold(data, row_begin, row_end, threshold, results);
}
else
{
if (filter != nullptr)
return getFirstNElements_high_threshold(data, num_elements, threshold, results, filter);
return getFirstNElements_high_threshold(data, row_begin, row_end, threshold, results, filter);
else
return getFirstNElements_high_threshold(data, num_elements, threshold, results);
return getFirstNElements_high_threshold(data, row_begin, row_end, threshold, results);
}
}
@ -203,7 +203,7 @@ public:
template <typename TColumn, bool is_plain, typename TFunc>
void
forFirstRows(size_t batch_size, const IColumn ** columns, size_t data_column, Arena * arena, ssize_t if_argument_pos, TFunc func) const
forFirstRows(size_t row_begin, size_t row_end, const IColumn ** columns, size_t data_column, Arena * arena, ssize_t if_argument_pos, TFunc func) const
{
const TColumn * values = nullptr;
std::unique_ptr<std::vector<TColumn>> values_vector;
@ -211,8 +211,8 @@ public:
if constexpr (std::is_same_v<TColumn, StringRef>)
{
values_vector.reset(new std::vector<TColumn>(batch_size));
for (size_t i = 0; i < batch_size; i++)
values_vector.reset(new std::vector<TColumn>(row_end));
for (size_t i = row_begin; i < row_end; i++)
(*values_vector)[i] = readItem<TColumn, is_plain>(columns[data_column], arena, i);
values = (*values_vector).data();
}
@ -231,7 +231,7 @@ public:
filter = reinterpret_cast<const UInt8 *>(refFilter.data);
}
size_t num_elements = getFirstNElements(values, batch_size, threshold, best_rows.data(), filter);
size_t num_elements = getFirstNElements(values, row_begin, row_end, threshold, best_rows.data(), filter);
for (size_t i = 0; i < num_elements; i++)
{
func(best_rows[i], values);
@ -239,14 +239,19 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos) const override
{
State & data = this->data(place);
if constexpr (use_column_b)
{
forFirstRows<TColumnB, is_plain_b>(
batch_size, columns, 1, arena, if_argument_pos, [columns, &arena, &data](size_t row, const TColumnB * values)
row_begin, row_end, columns, 1, arena, if_argument_pos, [columns, &arena, &data](size_t row, const TColumnB * values)
{
data.add(readItem<TColumnA, is_plain_a>(columns[0], arena, row), values[row]);
});
@ -254,7 +259,7 @@ public:
else
{
forFirstRows<TColumnA, is_plain_a>(
batch_size, columns, 0, arena, if_argument_pos, [&data](size_t row, const TColumnA * values)
row_begin, row_end, columns, 0, arena, if_argument_pos, [&data](size_t row, const TColumnA * values)
{
data.add(values[row]);
});

View File

@ -119,7 +119,13 @@ public:
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t) const override
void addBatchSinglePlace(
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const UInt8 * null_map = column->getNullMapData().data();
@ -142,25 +148,31 @@ public:
/// Combine the 2 flag arrays so we can call a simplified version (one check vs 2)
/// Note that now the null map will contain 0 if not null and not filtered, or 1 for null or filtered (or both)
auto final_nulls = std::make_unique<UInt8[]>(batch_size);
auto final_nulls = std::make_unique<UInt8[]>(row_end);
if (filter_null_map)
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_values[i]) | (!!filter_null_map[i]);
else
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
final_nulls[i] = (!!null_map[i]) | (!filter_values[i]);
if constexpr (result_is_nullable)
{
if (!memoryIsByte(final_nulls.get(), batch_size, 1))
if (!memoryIsByte(final_nulls.get(), row_begin, row_end, 1))
this->setFlag(place);
else
return; /// No work to do.
}
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), columns_param, final_nulls.get(), arena, -1);
row_begin,
row_end,
this->nestedPlace(place),
columns_param,
final_nulls.get(),
arena,
-1);
}
#if USE_EMBEDDED_COMPILER

View File

@ -98,31 +98,38 @@ public:
}
void addBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
nested_func->addBatch(batch_size, places, place_offset, columns, arena, num_arguments - 1);
nested_func->addBatch(row_begin, row_end, places, place_offset, columns, arena, num_arguments - 1);
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t) const override
{
nested_func->addBatchSinglePlace(batch_size, place, columns, arena, num_arguments - 1);
nested_func->addBatchSinglePlace(row_begin, row_end, place, columns, arena, num_arguments - 1);
}
void addBatchSinglePlaceNotNull(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena * arena,
ssize_t) const override
{
nested_func->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, num_arguments - 1);
nested_func->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, num_arguments - 1);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
@ -131,13 +138,14 @@ public:
}
void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
nested_func->mergeBatch(batch_size, places, place_offset, rhs, arena);
nested_func->mergeBatch(row_begin, row_end, places, place_offset, rhs, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override

View File

@ -1159,7 +1159,12 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos) const override
{
if constexpr (is_any)
if (this->data(place).has())
@ -1167,7 +1172,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
{
@ -1179,7 +1184,7 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
this->data(place).changeIfBetter(*columns[0], i, arena);
if constexpr (is_any)
@ -1189,7 +1194,8 @@ public:
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -1203,7 +1209,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (!null_map[i] && flags[i])
{
@ -1215,7 +1221,7 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (!null_map[i])
{

View File

@ -307,17 +307,22 @@ public:
}
void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
const UInt8 * null_map = column->getNullMapData().data();
this->nested_function->addBatchSinglePlaceNotNull(
batch_size, this->nestedPlace(place), &nested_column, null_map, arena, if_argument_pos);
row_begin, row_end, this->nestedPlace(place), &nested_column, null_map, arena, if_argument_pos);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))
if (!memoryIsByte(null_map, row_begin, row_end, 1))
this->setFlag(place);
}

View File

@ -109,7 +109,8 @@ public:
}
void addBatch( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -119,7 +120,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i] && places[i])
add(places[i] + place_offset, columns, i, arena);
@ -127,21 +128,26 @@ public:
}
else
{
nested_function->addBatch(batch_size, places, place_offset, columns, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatch(row_begin, row_end, places, place_offset, columns, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
(places[i] + place_offset)[size_of_data] = 1;
}
}
void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
{
@ -152,16 +158,17 @@ public:
}
else
{
if (batch_size)
if (row_end != row_begin)
{
nested_function->addBatchSinglePlace(batch_size, place, columns, arena, if_argument_pos);
nested_function->addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos);
place[size_of_data] = 1;
}
}
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -171,8 +178,8 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i] && !null_map[i])
{
@ -183,10 +190,10 @@ public:
}
else
{
if (batch_size)
if (row_end != row_begin)
{
nested_function->addBatchSinglePlaceNotNull(batch_size, place, columns, null_map, arena, if_argument_pos);
for (size_t i = 0; i < batch_size; ++i)
nested_function->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos);
for (size_t i = row_begin; i < row_end; ++i)
{
if (!null_map[i])
{
@ -208,14 +215,15 @@ public:
}
void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
nested_function->mergeBatch(batch_size, places, place_offset, rhs, arena);
for (size_t i = 0; i < batch_size; ++i)
nested_function->mergeBatch(row_begin, row_end, places, place_offset, rhs, arena);
for (size_t i = row_begin; i < row_end; ++i)
(places[i] + place_offset)[size_of_data] |= rhs[i][size_of_data];
}

View File

@ -59,9 +59,11 @@ struct AggregateFunctionSumData
/// Vectorized version
template <typename Value>
void NO_SANITIZE_UNDEFINED NO_INLINE addMany(const Value * __restrict ptr, size_t count)
void NO_SANITIZE_UNDEFINED NO_INLINE addMany(const Value * __restrict ptr, size_t start, size_t end)
{
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
if constexpr (std::is_floating_point_v<T>)
{
@ -87,7 +89,7 @@ struct AggregateFunctionSumData
/// clang cannot vectorize the loop if accumulator is class member instead of local variable.
T local_sum{};
while (ptr < end)
while (ptr < end_ptr)
{
Impl::add(local_sum, *ptr);
++ptr;
@ -97,9 +99,11 @@ struct AggregateFunctionSumData
template <typename Value, bool add_if_zero>
void NO_SANITIZE_UNDEFINED NO_INLINE
addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t count)
addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t start, size_t end)
{
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
if constexpr (
(is_integer<T> && !is_big_int_v<T>)
@ -108,7 +112,7 @@ struct AggregateFunctionSumData
/// For integers we can vectorize the operation if we replace the null check using a multiplication (by 0 for null, 1 for not null)
/// https://quick-bench.com/q/MLTnfTvwC2qZFVeWHfOBR3U7a8I
T local_sum{};
while (ptr < end)
while (ptr < end_ptr)
{
T multiplier = !*condition_map == add_if_zero;
Impl::add(local_sum, *ptr * multiplier);
@ -151,7 +155,7 @@ struct AggregateFunctionSumData
}
T local_sum{};
while (ptr < end)
while (ptr < end_ptr)
{
if (!*condition_map == add_if_zero)
Impl::add(local_sum, *ptr);
@ -162,15 +166,15 @@ struct AggregateFunctionSumData
}
template <typename Value>
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, true>(ptr, null_map, count);
return addManyConditionalInternal<Value, true>(ptr, null_map, start, end);
}
template <typename Value>
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t count)
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, false>(ptr, cond_map, count);
return addManyConditionalInternal<Value, false>(ptr, cond_map, start, end);
}
void NO_SANITIZE_UNDEFINED merge(const AggregateFunctionSumData & rhs)
@ -220,7 +224,7 @@ struct AggregateFunctionSumKahanData
/// Vectorized version
template <typename Value>
void NO_INLINE addMany(const Value * __restrict ptr, size_t count)
void NO_INLINE addMany(const Value * __restrict ptr, size_t start, size_t end)
{
/// Less than in ordinary sum, because the algorithm is more complicated and too large loop unrolling is questionable.
/// But this is just a guess.
@ -228,7 +232,10 @@ struct AggregateFunctionSumKahanData
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
@ -241,7 +248,7 @@ struct AggregateFunctionSumKahanData
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
while (ptr < end_ptr)
{
addImpl(*ptr, sum, compensation);
++ptr;
@ -249,13 +256,16 @@ struct AggregateFunctionSumKahanData
}
template <typename Value, bool add_if_zero>
void NO_INLINE addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t count)
void NO_INLINE addManyConditionalInternal(const Value * __restrict ptr, const UInt8 * __restrict condition_map, size_t start, size_t end)
{
constexpr size_t unroll_count = 4;
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
ptr += start;
size_t count = end - start;
const auto * end_ptr = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
@ -270,7 +280,7 @@ struct AggregateFunctionSumKahanData
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
while (ptr < end_ptr)
{
if ((!*condition_map) == add_if_zero)
addImpl(*ptr, sum, compensation);
@ -280,15 +290,15 @@ struct AggregateFunctionSumKahanData
}
template <typename Value>
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
void ALWAYS_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, true>(ptr, null_map, count);
return addManyConditionalInternal<Value, true>(ptr, null_map, start, end);
}
template <typename Value>
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t count)
void ALWAYS_INLINE addManyConditional(const Value * __restrict ptr, const UInt8 * __restrict cond_map, size_t start, size_t end)
{
return addManyConditionalInternal<Value, false>(ptr, cond_map, count);
return addManyConditionalInternal<Value, false>(ptr, cond_map, start, end);
}
void ALWAYS_INLINE mergeImpl(T & to_sum, T & to_compensation, T from_sum, T from_compensation)
@ -385,22 +395,33 @@ public:
}
void addBatchSinglePlace(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena *,
ssize_t if_argument_pos) const override
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
this->data(place).addManyConditional(column.getData().data(), flags.data(), batch_size);
this->data(place).addManyConditional(column.getData().data(), flags.data(), row_begin, row_end);
}
else
{
this->data(place).addMany(column.getData().data(), batch_size);
this->data(place).addMany(column.getData().data(), row_begin, row_end);
}
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *, ssize_t if_argument_pos)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos)
const override
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
@ -408,15 +429,15 @@ public:
{
/// Merge the 2 sets of flags (null and if) into a single one. This allows us to use parallelizable sums when available
const auto * if_flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
auto final_flags = std::make_unique<UInt8[]>(batch_size);
for (size_t i = 0; i < batch_size; ++i)
auto final_flags = std::make_unique<UInt8[]>(row_end);
for (size_t i = row_begin; i < row_end; ++i)
final_flags[i] = (!null_map[i]) & if_flags[i];
this->data(place).addManyConditional(column.getData().data(), final_flags.get(), batch_size);
this->data(place).addManyConditional(column.getData().data(), final_flags.get(), row_begin, row_end);
}
else
{
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
this->data(place).addManyNotNull(column.getData().data(), null_map, row_begin, row_end);
}
}

View File

@ -175,7 +175,8 @@ public:
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -184,13 +185,16 @@ public:
/// The version of "addBatch", that handle sparse columns as arguments.
virtual void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const = 0;
virtual void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
@ -199,17 +203,27 @@ public:
/** The same for single place.
*/
virtual void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const = 0;
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const = 0;
/// The version of "addBatchSinglePlace", that handle sparse columns as arguments.
virtual void addBatchSparseSinglePlace(
AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena) const = 0;
/** The same for single place when need to aggregate only filtered data.
* Instead of using an if-column, the condition is combined inside the null_map
*/
virtual void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -217,7 +231,12 @@ public:
ssize_t if_argument_pos = -1) const = 0;
virtual void addBatchSinglePlaceFromInterval( /// NOLINT
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1)
const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
@ -226,7 +245,8 @@ public:
* "places" contains a large number of same values consecutively.
*/
virtual void addBatchArray(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -237,7 +257,8 @@ public:
* and pointers to aggregation states are stored in AggregateDataPtr[256] lookup table.
*/
virtual void addBatchLookupTable8(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
@ -251,7 +272,8 @@ public:
* All places that were not inserted must be destroyed if there was exception during insert into result column.
*/
virtual void insertResultIntoBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
@ -261,7 +283,8 @@ public:
/** Destroy batch of aggregate places.
*/
virtual void destroyBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset) const noexcept = 0;
@ -355,7 +378,8 @@ public:
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -365,7 +389,7 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i] && places[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
@ -373,13 +397,15 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
}
void addBatchSparse(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
@ -387,33 +413,42 @@ public:
{
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
size_t batch_size = column_sparse.size();
auto offset_it = column_sparse.begin();
for (size_t i = 0; i < batch_size; ++i, ++offset_it)
/// FIXME: make it more optimal
for (size_t i = 0; i < row_begin; ++i, ++offset_it)
;
for (size_t i = 0; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(places[offset_it.getCurrentRow()] + place_offset,
&values, offset_it.getValueIndex(), arena);
}
void mergeBatch(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const AggregateDataPtr * rhs,
Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
}
void addBatchSinglePlace( /// NOLINT
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
@ -421,26 +456,34 @@ public:
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSparseSinglePlace(
AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena) const override
{
/// TODO: add values and defaults separately if order of adding isn't important.
const auto & column_sparse = assert_cast<const ColumnSparse &>(*columns[0]);
const auto * values = &column_sparse.getValuesColumn();
size_t batch_size = column_sparse.size();
auto offset_it = column_sparse.begin();
for (size_t i = 0; i < batch_size; ++i, ++offset_it)
/// FIXME: make it more optimal
for (size_t i = 0; i < row_begin; ++i, ++offset_it)
;
for (size_t i = 0; i < row_end; ++i, ++offset_it)
static_cast<const Derived *>(this)->add(place, &values, offset_it.getValueIndex(), arena);
}
void addBatchSinglePlaceNotNull( /// NOLINT
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
const UInt8 * null_map,
@ -450,26 +493,31 @@ public:
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (!null_map[i] && flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
else
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchSinglePlaceFromInterval( /// NOLINT
size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1)
size_t row_begin,
size_t row_end,
AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1)
const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = batch_begin; i < batch_end; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
if (flags[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
@ -477,17 +525,23 @@ public:
}
else
{
for (size_t i = batch_begin; i < batch_end; ++i)
for (size_t i = row_begin; i < row_end; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
}
void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena)
const override
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
@ -498,7 +552,8 @@ public:
}
void addBatchLookupTable8(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
@ -508,10 +563,10 @@ public:
{
static constexpr size_t UNROLL_COUNT = 8;
size_t i = 0;
size_t i = row_begin;
size_t batch_size_unrolled = batch_size / UNROLL_COUNT * UNROLL_COUNT;
for (; i < batch_size_unrolled; i += UNROLL_COUNT)
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
for (; i < size_unrolled; i += UNROLL_COUNT)
{
AggregateDataPtr places[UNROLL_COUNT];
for (size_t j = 0; j < UNROLL_COUNT; ++j)
@ -527,7 +582,7 @@ public:
static_cast<const Derived *>(this)->add(places[j] + place_offset, columns, i + j, arena);
}
for (; i < batch_size; ++i)
for (; i < row_end; ++i)
{
AggregateDataPtr & place = map[key[i]];
if (unlikely(!place))
@ -536,13 +591,20 @@ public:
}
}
void insertResultIntoBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, IColumn & to, Arena * arena, bool destroy_place_after_insert) const override
void insertResultIntoBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
Arena * arena,
bool destroy_place_after_insert) const override
{
size_t batch_index = 0;
size_t batch_index = row_begin;
try
{
for (; batch_index < batch_size; ++batch_index)
for (; batch_index < row_end; ++batch_index)
{
static_cast<const Derived *>(this)->insertResultInto(places[batch_index] + place_offset, to, arena);
@ -552,16 +614,20 @@ public:
}
catch (...)
{
for (size_t destroy_index = batch_index; destroy_index < batch_size; ++destroy_index)
for (size_t destroy_index = batch_index; destroy_index < row_end; ++destroy_index)
static_cast<const Derived *>(this)->destroy(places[destroy_index] + place_offset);
throw;
}
}
void destroyBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset) const noexcept override
void destroyBatch(
size_t row_begin,
size_t row_end,
AggregateDataPtr * places,
size_t place_offset) const noexcept override
{
for (size_t i = 0; i < batch_size; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
static_cast<const Derived *>(this)->destroy(places[i] + place_offset);
}
@ -612,7 +678,8 @@ public:
}
void addBatchLookupTable8(
size_t batch_size,
size_t row_begin,
size_t row_end,
AggregateDataPtr * map,
size_t place_offset,
std::function<void(AggregateDataPtr &)> init,
@ -626,7 +693,7 @@ public:
if (func.allocatesMemoryInArena() || sizeof(Data) > 16 || func.sizeOfData() != sizeof(Data))
{
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(batch_size, map, place_offset, init, key, columns, arena);
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(row_begin, row_end, map, place_offset, init, key, columns, arena);
return;
}
@ -637,12 +704,12 @@ public:
std::unique_ptr<Data[]> places{new Data[256 * UNROLL_COUNT]};
bool has_data[256 * UNROLL_COUNT]{}; /// Separate flags array to avoid heavy initialization.
size_t i = 0;
size_t i = row_begin;
/// Aggregate data into different lookup tables.
size_t batch_size_unrolled = batch_size / UNROLL_COUNT * UNROLL_COUNT;
for (; i < batch_size_unrolled; i += UNROLL_COUNT)
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
for (; i < size_unrolled; i += UNROLL_COUNT)
{
for (size_t j = 0; j < UNROLL_COUNT; ++j)
{
@ -676,7 +743,7 @@ public:
/// Process tails and add directly to the final destination.
for (; i < batch_size; ++i)
for (; i < row_end; ++i)
{
size_t k = key[i];
AggregateDataPtr & place = map[k];

View File

@ -54,7 +54,7 @@ MutableColumnPtr ColumnFixedString::cloneResized(size_t size) const
bool ColumnFixedString::isDefaultAt(size_t index) const
{
assert(index < size());
return memoryIsZero(chars.data() + index * n, n);
return memoryIsZero(chars.data() + index * n, 0, n);
}
void ColumnFixedString::insert(const Field & x)

View File

@ -27,7 +27,7 @@ static UInt64 toBits64(const Int8 * bytes64)
}
#endif
size_t countBytesInFilter(const UInt8 * filt, size_t sz)
size_t countBytesInFilter(const UInt8 * filt, size_t start, size_t end)
{
size_t count = 0;
@ -37,18 +37,20 @@ size_t countBytesInFilter(const UInt8 * filt, size_t sz)
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt);
const Int8 * end = pos + sz;
pos += start;
const Int8 * end_pos = pos + (end - start);
#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + sz / 64 * 64;
const Int8 * end_pos64 = pos + (end - start) / 64 * 64;
for (; pos < end64; pos += 64)
for (; pos < end_pos64; pos += 64)
count += __builtin_popcountll(toBits64(pos));
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos)
for (; pos < end_pos; ++pos)
count += *pos != 0;
return count;
@ -56,10 +58,10 @@ size_t countBytesInFilter(const UInt8 * filt, size_t sz)
size_t countBytesInFilter(const IColumn::Filter & filt)
{
return countBytesInFilter(filt.data(), filt.size());
return countBytesInFilter(filt.data(), 0, filt.size());
}
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map)
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map, size_t start, size_t end)
{
size_t count = 0;
@ -68,20 +70,20 @@ size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * nu
* It would be better to use != 0, then this does not allow SSE2.
*/
const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data());
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map);
const Int8 * end = pos + filt.size();
const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data()) + start;
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map) + start;
const Int8 * end_pos = pos + (end - start);
#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + filt.size() / 64 * 64;
const Int8 * end_pos64 = pos + (end - start) / 64 * 64;
for (; pos < end64; pos += 64, pos2 += 64)
for (; pos < end_pos64; pos += 64, pos2 += 64)
count += __builtin_popcountll(toBits64(pos) & ~toBits64(pos2));
/// TODO Add duff device for tail?
#endif
for (; pos < end; ++pos, ++pos2)
for (; pos < end_pos; ++pos, ++pos2)
count += (*pos & ~*pos2) != 0;
return count;
@ -96,17 +98,18 @@ std::vector<size_t> countColumnsSizeInSelector(IColumn::ColumnIndex num_columns,
return counts;
}
bool memoryIsByte(const void * data, size_t size, uint8_t byte)
bool memoryIsByte(const void * data, size_t start, size_t end, uint8_t byte)
{
size_t size = end - start;
if (size == 0)
return true;
const auto * ptr = reinterpret_cast<const uint8_t *>(data);
const auto * ptr = reinterpret_cast<const uint8_t *>(data) + start;
return *ptr == byte && memcmp(ptr, ptr + 1, size - 1) == 0;
}
bool memoryIsZero(const void * data, size_t size)
bool memoryIsZero(const void * data, size_t start, size_t end)
{
return memoryIsByte(data, size, 0x0);
return memoryIsByte(data, start, end, 0x0);
}
namespace ErrorCodes

View File

@ -53,17 +53,17 @@ inline UInt64 bytes64MaskToBits64Mask(const UInt8 * bytes64)
}
/// Counts how many bytes of `filt` are greater than zero.
size_t countBytesInFilter(const UInt8 * filt, size_t sz);
size_t countBytesInFilter(const UInt8 * filt, size_t start, size_t end);
size_t countBytesInFilter(const IColumn::Filter & filt);
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map);
size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map, size_t start, size_t end);
/// Returns vector with num_columns elements. vector[i] is the count of i values in selector.
/// Selector must contain values from 0 to num_columns - 1. NOTE: this is not checked.
std::vector<size_t> countColumnsSizeInSelector(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector);
/// Returns true, if the memory contains only zeros.
bool memoryIsZero(const void * data, size_t size);
bool memoryIsByte(const void * data, size_t size, uint8_t byte);
bool memoryIsZero(const void * data, size_t start, size_t end);
bool memoryIsByte(const void * data, size_t start, size_t end, uint8_t byte);
/// The general implementation of `filter` function for ColumnArray and ColumnString.
template <typename T>

View File

@ -256,10 +256,10 @@ private:
return lut[toLUTIndex(v)];
}
template <typename T, typename Divisor>
inline T roundDown(T x, Divisor divisor) const
template <typename DateOrTime, typename Divisor>
inline DateOrTime roundDown(DateOrTime x, Divisor divisor) const
{
static_assert(std::is_integral_v<T> && std::is_integral_v<Divisor>);
static_assert(std::is_integral_v<DateOrTime> && std::is_integral_v<Divisor>);
assert(divisor > 0);
if (likely(offset_is_whole_number_of_hours_during_epoch))
@ -273,7 +273,15 @@ private:
}
Time date = find(x).date;
return date + (x - date) / divisor * divisor;
Time res = date + (x - date) / divisor * divisor;
if constexpr (std::is_unsigned_v<DateOrTime> || std::is_same_v<DateOrTime, DayNum>)
{
if (unlikely(res < 0))
return 0;
return res;
}
else
return res;
}
public:
@ -564,11 +572,16 @@ public:
}
/// NOTE: Assuming timezone offset is a multiple of 15 minutes.
inline Time toStartOfMinute(Time t) const { return toStartOfMinuteInterval(t, 1); }
inline Time toStartOfFiveMinutes(Time t) const { return toStartOfMinuteInterval(t, 5); }
inline Time toStartOfFifteenMinutes(Time t) const { return toStartOfMinuteInterval(t, 15); }
inline Time toStartOfTenMinutes(Time t) const { return toStartOfMinuteInterval(t, 10); }
inline Time toStartOfHour(Time t) const { return roundDown(t, 3600); }
template <typename DateOrTime>
DateOrTime toStartOfMinute(DateOrTime t) const { return toStartOfMinuteInterval(t, 1); }
template <typename DateOrTime>
DateOrTime toStartOfFiveMinutes(DateOrTime t) const { return toStartOfMinuteInterval(t, 5); }
template <typename DateOrTime>
DateOrTime toStartOfFifteenMinutes(DateOrTime t) const { return toStartOfMinuteInterval(t, 15); }
template <typename DateOrTime>
DateOrTime toStartOfTenMinutes(DateOrTime t) const { return toStartOfMinuteInterval(t, 10); }
template <typename DateOrTime>
DateOrTime toStartOfHour(DateOrTime t) const { return roundDown(t, 3600); }
/** Number of calendar day since the beginning of UNIX epoch (1970-01-01 is zero)
* We use just two bytes for it. It covers the range up to 2105 and slightly more.
@ -953,7 +966,8 @@ public:
return lut[toLUTIndex(ExtendedDayNum(d / days * days))].date;
}
inline Time toStartOfHourInterval(Time t, UInt64 hours) const
template <typename DateOrTime>
DateOrTime toStartOfHourInterval(DateOrTime t, UInt64 hours) const
{
if (hours == 1)
return toStartOfHour(t);
@ -993,10 +1007,19 @@ public:
time = time / seconds * seconds;
}
return values.date + time;
Time res = values.date + time;
if constexpr (std::is_unsigned_v<DateOrTime> || std::is_same_v<DateOrTime, DayNum>)
{
if (unlikely(res < 0))
return 0;
return res;
}
else
return res;
}
inline Time toStartOfMinuteInterval(Time t, UInt64 minutes) const
template <typename DateOrTime>
DateOrTime toStartOfMinuteInterval(DateOrTime t, UInt64 minutes) const
{
UInt64 divisor = 60 * minutes;
if (likely(offset_is_whole_number_of_minutes_during_epoch))
@ -1007,10 +1030,19 @@ public:
}
Time date = find(t).date;
return date + (t - date) / divisor * divisor;
Time res = date + (t - date) / divisor * divisor;
if constexpr (std::is_unsigned_v<DateOrTime> || std::is_same_v<DateOrTime, DayNum>)
{
if (unlikely(res < 0))
return 0;
return res;
}
else
return res;
}
inline Time toStartOfSecondInterval(Time t, UInt64 seconds) const
template <typename DateOrTime>
DateOrTime toStartOfSecondInterval(DateOrTime t, UInt64 seconds) const
{
if (seconds == 1)
return t;

View File

@ -117,6 +117,25 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
bytes_read += size_decompressed;
bytes += size_decompressed;
}
else if (nextimpl_working_buffer_offset > 0)
{
/// Need to skip some bytes in decompressed data (seek happened before readBig call).
size_compressed = new_size_compressed;
bytes += offset();
/// This is for clang static analyzer.
assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
/// Read partial data from first block. Won't run here at second block.
/// Avoid to call nextImpl and unnecessary memcpy in read when the second block fits entirely to output buffer.
size_t size_partial = std::min((size_decompressed - nextimpl_working_buffer_offset),(n - bytes_read));
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;
bytes_read += read(to + bytes_read, size_partial);
}
else
{
size_compressed = new_size_compressed;
@ -124,17 +143,12 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// This is for clang static analyzer.
assert(size_decompressed + additional_size_at_the_end_of_buffer > 0);
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
/// Manually take nextimpl_working_buffer_offset into account, because we don't use
/// nextImpl in this method.
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;
///Read partial data from last block.
pos = working_buffer.begin();
bytes_read += read(to + bytes_read, n - bytes_read);
break;
}

View File

@ -10,8 +10,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int INCONSISTENT_RESERVATIONS;
extern const int NO_RESERVATIONS_PROVIDED;
}
IVolume::IVolume(
@ -45,43 +43,4 @@ UInt64 IVolume::getMaxUnreservedFreeSpace() const
return res;
}
MultiDiskReservation::MultiDiskReservation(Reservations & reservations_, UInt64 size_)
: reservations(std::move(reservations_))
, size(size_)
{
if (reservations.empty())
{
throw Exception("At least one reservation must be provided to MultiDiskReservation", ErrorCodes::NO_RESERVATIONS_PROVIDED);
}
for (auto & reservation : reservations)
{
if (reservation->getSize() != size_)
{
throw Exception("Reservations must have same size", ErrorCodes::INCONSISTENT_RESERVATIONS);
}
}
}
Disks MultiDiskReservation::getDisks() const
{
Disks res;
res.reserve(reservations.size());
for (const auto & reservation : reservations)
{
res.push_back(reservation->getDisk());
}
return res;
}
void MultiDiskReservation::update(UInt64 new_size)
{
for (auto & reservation : reservations)
{
reservation->update(new_size);
}
size = new_size;
}
}

View File

@ -81,22 +81,4 @@ public:
bool perform_ttl_move_on_insert = true;
};
/// Reservation for multiple disks at once. Can be used in RAID1 implementation.
class MultiDiskReservation : public IReservation
{
public:
MultiDiskReservation(Reservations & reservations, UInt64 size);
UInt64 getSize() const override { return size; }
DiskPtr getDisk(size_t i) const override { return reservations[i]->getDisk(); }
Disks getDisks() const override;
void update(UInt64 new_size) override;
private:
Reservations reservations;
UInt64 size;
};
}

View File

@ -1,6 +1,7 @@
#include "StoragePolicy.h"
#include "DiskFactory.h"
#include "DiskLocal.h"
#include "createVolume.h"
#include <Interpreters/Context.h>
#include <Common/escapeForFileName.h>

View File

@ -5,7 +5,6 @@
#include <Disks/IDisk.h>
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/VolumeRAID1.h>
#include <Disks/SingleDiskVolume.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentMetrics.h>

View File

@ -1,29 +0,0 @@
#include "VolumeRAID1.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
namespace DB
{
ReservationPtr VolumeRAID1::reserve(UInt64 bytes)
{
/// This volume can not store data which size is greater than `max_data_part_size`
/// to ensure that parts of size greater than that go to another volume(s).
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
Reservations res(disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
res[i] = disks[i]->reserve(bytes);
if (!res[i])
return {};
}
return std::make_unique<MultiDiskReservation>(res, bytes);
}
}

View File

@ -1,49 +0,0 @@
#pragma once
#include <Disks/createVolume.h>
#include <Disks/VolumeJBOD.h>
namespace DB
{
class VolumeRAID1;
using VolumeRAID1Ptr = std::shared_ptr<VolumeRAID1>;
/// Volume which reserves space on each underlying disk.
///
/// NOTE: Just interface implementation, doesn't used in codebase,
/// also not available for user.
class VolumeRAID1 : public VolumeJBOD
{
public:
VolumeRAID1(String name_, Disks disks_, UInt64 max_data_part_size_, bool are_merges_avoided_in_config_)
: VolumeJBOD(name_, disks_, max_data_part_size_, are_merges_avoided_in_config_)
{
}
VolumeRAID1(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: VolumeJBOD(name_, config, config_prefix, disk_selector)
{
}
VolumeRAID1(
VolumeRAID1 & volume_raid1,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector)
: VolumeJBOD(volume_raid1, config, config_prefix, disk_selector)
{
}
VolumeType getType() const override { return VolumeType::RAID1; }
ReservationPtr reserve(UInt64 bytes) override;
};
}

View File

@ -2,7 +2,6 @@
#include <Disks/SingleDiskVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/VolumeRAID1.h>
#include <boost/algorithm/string.hpp>
@ -23,11 +22,6 @@ VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, Volume
/// for such type of reservation will be with one disk.
return std::make_shared<SingleDiskVolume>(other_volume->getName(), reservation->getDisk(), other_volume->max_data_part_size);
}
if (other_volume->getType() == VolumeType::RAID1)
{
auto volume = std::dynamic_pointer_cast<VolumeRAID1>(other_volume);
return std::make_shared<VolumeRAID1>(volume->getName(), reservation->getDisks(), volume->max_data_part_size, volume->are_merges_avoided);
}
return nullptr;
}

View File

@ -3576,7 +3576,7 @@ private:
const auto & nullable_col = assert_cast<const ColumnNullable &>(*col);
const auto & null_map = nullable_col.getNullMapData();
if (!memoryIsZero(null_map.data(), null_map.size()))
if (!memoryIsZero(null_map.data(), 0, null_map.size()))
throw Exception{"Cannot convert NULL value to non-Nullable type",
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
}

View File

@ -186,7 +186,7 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->addBatchArray(input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
that->addBatchArray(0, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)

View File

@ -147,7 +147,7 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam
/// Unnest consecutive trailing -State combinators
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->addBatch(input_rows_count, places.data(), 0, aggregate_arguments, arena.get());
that->addBatch(0, input_rows_count, places.data(), 0, aggregate_arguments, arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)

View File

@ -125,7 +125,7 @@ public:
if (in)
{
const auto & in_data = in->getData();
if (!memoryIsZero(in_data.data(), in_data.size() * sizeof(in_data[0])))
if (!memoryIsZero(in_data.data(), 0, in_data.size() * sizeof(in_data[0])))
{
throw Exception(ErrorCodes::FUNCTION_THROW_IF_VALUE_IS_NON_ZERO,
message.value_or("Value passed to '" + getName() + "' function is non zero"));

View File

@ -164,7 +164,7 @@ namespace
return time_zone.toStartOfHourInterval(t, hours);
}
static UInt32 execute(Int64 t, Int64 hours, const DateLUTImpl & time_zone, Int64 scale_multiplier)
static Int64 execute(Int64 t, Int64 hours, const DateLUTImpl & time_zone, Int64 scale_multiplier)
{
return time_zone.toStartOfHourInterval(t / scale_multiplier, hours);
}
@ -182,7 +182,7 @@ namespace
return time_zone.toStartOfMinuteInterval(t, minutes);
}
static UInt32 execute(Int64 t, Int64 minutes, const DateLUTImpl & time_zone, Int64 scale_multiplier)
static Int64 execute(Int64 t, Int64 minutes, const DateLUTImpl & time_zone, Int64 scale_multiplier)
{
return time_zone.toStartOfMinuteInterval(t / scale_multiplier, minutes);
}
@ -200,7 +200,7 @@ namespace
return time_zone.toStartOfSecondInterval(t, seconds);
}
static UInt32 execute(Int64 t, Int64 seconds, const DateLUTImpl & time_zone, Int64 scale_multiplier)
static Int64 execute(Int64 t, Int64 seconds, const DateLUTImpl & time_zone, Int64 scale_multiplier)
{
return time_zone.toStartOfSecondInterval(t / scale_multiplier, seconds);
}

View File

@ -354,11 +354,6 @@ Aggregator::Params::StatsCollectingParams::StatsCollectingParams(
{
}
bool Aggregator::Params::StatsCollectingParams::isCollectionAndUseEnabled() const
{
return key != 0;
}
Block Aggregator::getHeader(bool final) const
{
return params.getHeader(final);
@ -849,6 +844,45 @@ bool Aggregator::hasSparseArguments(AggregateFunctionInstruction * aggregate_ins
return false;
}
void Aggregator::executeOnBlockSmall(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
initDataVariantsWithSizeHint(result, method_chosen, params);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
}
executeImpl(result, row_begin, row_end, key_columns, aggregate_instructions);
}
void Aggregator::executeImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, overflow_row);
if (false) {} // NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
/** It's interesting - if you remove `noinline`, then gcc for some reason will inline this function, and the performance decreases (~ 10%).
* (Probably because after the inline of this function, more internal functions no longer be inlined.)
* Inline does not make sense, since the inner loop is entirely inside this function.
@ -857,7 +891,8 @@ template <typename Method>
void NO_INLINE Aggregator::executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
@ -870,17 +905,17 @@ void NO_INLINE Aggregator::executeImpl(
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
{
executeImplBatch<false, true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch<false, true>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
else
#endif
{
executeImplBatch<false, false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch<false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
}
else
{
executeImplBatch<true, false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch<true, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
}
@ -889,7 +924,8 @@ void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const
{
@ -901,7 +937,7 @@ void NO_INLINE Aggregator::executeImplBatch(
/// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0);
for (size_t i = 0; i < rows; ++i)
for (size_t i = row_begin; i < row_end; ++i)
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
return;
}
@ -925,7 +961,8 @@ void NO_INLINE Aggregator::executeImplBatch(
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
rows,
row_begin,
row_end,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
@ -941,10 +978,14 @@ void NO_INLINE Aggregator::executeImplBatch(
}
}
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
/// NOTE: only row_end-row_start is required, but:
/// - this affects only optimize_aggregation_in_order,
/// - this is just a pointer, so it should not be significant,
/// - and plus this will require other changes in the interface.
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
@ -1029,7 +1070,7 @@ void NO_INLINE Aggregator::executeImplBatch(
}
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(rows, columns_data.data(), places.get());
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
}
#endif
@ -1045,11 +1086,11 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
@ -1057,10 +1098,13 @@ void NO_INLINE Aggregator::executeImplBatch(
template <bool use_compiled_functions>
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
size_t row_begin, size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
{
if (row_begin == row_end)
return;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
{
@ -1081,7 +1125,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
}
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(rows, columns_data.data(), res);
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), res);
#if defined(MEMORY_SANITIZER)
@ -1112,11 +1156,23 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(
inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
0,
inst->offsets[static_cast<ssize_t>(row_end - 1)],
res + inst->state_offset,
inst->batch_arguments,
arena);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSparseSinglePlace(
row_begin, row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
else
inst->batch_that->addBatchSinglePlace(rows, res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlace(
row_begin, row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
}
}
@ -1208,16 +1264,27 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
}
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
bool Aggregator::executeOnBlock(const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
bool & no_more_keys) const
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
return executeOnBlock(block.getColumns(),
/* row_begin= */ 0, block.rows(),
result,
key_columns,
aggregate_columns,
no_more_keys);
}
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
bool Aggregator::executeOnBlock(Columns columns,
size_t row_begin, size_t row_end,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -1273,27 +1340,19 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
// #if USE_EMBEDDED_COMPILER
// if (compiled_aggregate_functions_holder)
// {
// executeWithoutKeyImpl<true>(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
// executeWithoutKeyImpl<true>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
// }
// else
// #endif
{
executeWithoutKeyImpl<false>(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
executeWithoutKeyImpl<false>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
}
}
else
{
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, aggregate_functions_instructions.data(), \
no_more_keys, overflow_row_ptr);
if (false) {} // NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);
}
size_t result_size = result.sizeWithoutOverflowRow();
@ -1715,7 +1774,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
auto insert_aggregates_into_columns_function = compiled_functions.insert_aggregates_into_columns_function;
insert_aggregates_into_columns_function(places.size(), columns_data.data(), places.data());
insert_aggregates_into_columns_function(0, places.size(), columns_data.data(), places.data());
}
#endif
@ -1744,7 +1803,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoBatch(places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
}
}
catch (...)
@ -1764,7 +1823,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
}
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset);
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(0, places.size(), places.data(), offset);
}
if (exception)
@ -2524,6 +2583,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
/// For all rows.
size_t rows = block.rows();
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
for (size_t i = 0; i < rows; ++i)
@ -2562,7 +2622,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
{
/// Merge state of aggregate functions.
aggregate_functions[j]->mergeBatch(
rows, places.get(), offsets_of_aggregate_states[j],
0, rows,
places.get(), offsets_of_aggregate_states[j],
aggregate_columns[j]->data(),
aggregates_pool);
}

View File

@ -941,9 +941,10 @@ public:
size_t max_entries_for_hash_table_stats_,
size_t max_size_to_preallocate_for_aggregation_);
bool isCollectionAndUseEnabled() const;
bool isCollectionAndUseEnabled() const { return key != 0; }
void disable() { key = 0; }
const UInt64 key = 0;
UInt64 key = 0;
const size_t max_entries_for_hash_table_stats = 0;
const size_t max_size_to_preallocate_for_aggregation = 0;
};
@ -1022,12 +1023,17 @@ public:
using AggregateFunctionsPlainPtrs = std::vector<const IAggregateFunction *>;
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool executeOnBlock(const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys) const;
bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool executeOnBlock(Columns columns,
size_t row_begin, size_t row_end,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys) const;
/// Used for aggregate projection.
@ -1160,12 +1166,33 @@ private:
void destroyAllAggregateStates(AggregatedDataVariants & result) const;
/// Used for optimize_aggregation_in_order:
/// - No two-level aggregation
/// - No external aggregation
/// - No without_key support (it is implemented using executeOnIntervalWithoutKeyImpl())
void executeOnBlockSmall(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions) const;
void executeImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys = false,
AggregateDataPtr overflow_row = nullptr) const;
/// Process one data block, aggregate the data into a hash table.
template <typename Method>
void executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
@ -1177,7 +1204,8 @@ private:
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const;
@ -1185,7 +1213,8 @@ private:
template <bool use_compiled_functions>
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;

View File

@ -309,11 +309,12 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * places_type = b.getInt8Ty()->getPointerTo()->getPointerTo();
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func_definition->args().begin();
llvm::Value * rows_count_arg = arguments++;
llvm::Value * row_start_arg = arguments++;
llvm::Value * row_end_arg = arguments++;
llvm::Value * columns_arg = arguments++;
llvm::Value * places_arg = arguments++;
@ -322,6 +323,9 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition);
b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry);
auto * places_start_arg = entry_builder.CreateInBoundsGEP(nullptr, places_arg, row_start_arg);
std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0;
@ -338,7 +342,16 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg);
if (argument_type->isNullable())
{
data_placeholder.null_init = b.CreateExtractValue(data, {1});
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg);
}
else
{
data_placeholder.null_init = nullptr;
}
columns.emplace_back(data_placeholder);
}
@ -350,15 +363,15 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.CreateCondBr(b.CreateICmpEQ(row_start_arg, row_end_arg), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
auto * counter_phi = b.CreatePHI(row_start_arg->getType(), 2);
counter_phi->addIncoming(row_start_arg, entry);
auto * places_phi = b.CreatePHI(places_arg->getType(), 2);
places_phi->addIncoming(places_arg, entry);
auto * places_phi = b.CreatePHI(places_start_arg->getType(), 2);
places_phi->addIncoming(places_start_arg, entry);
for (auto & col : columns)
{
@ -428,7 +441,7 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();
@ -443,11 +456,12 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * places_type = b.getInt8Ty()->getPointerTo();
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, size_type, column_data_type->getPointerTo(), places_type }, false);
auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func_definition->args().begin();
llvm::Value * rows_count_arg = arguments++;
llvm::Value * row_start_arg = arguments++;
llvm::Value * row_end_arg = arguments++;
llvm::Value * columns_arg = arguments++;
llvm::Value * place_arg = arguments++;
@ -456,6 +470,8 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition);
b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry);
std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0;
@ -472,7 +488,16 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
data_placeholder.data_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.data_init, row_start_arg);
if (argument_type->isNullable())
{
data_placeholder.null_init = b.CreateExtractValue(data, {1});
data_placeholder.null_init = entry_builder.CreateInBoundsGEP(nullptr, data_placeholder.null_init, row_start_arg);
}
else
{
data_placeholder.null_init = nullptr;
}
columns.emplace_back(data_placeholder);
}
@ -484,12 +509,12 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.CreateCondBr(b.CreateICmpEQ(row_start_arg, row_end_arg), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
auto * counter_phi = b.CreatePHI(row_start_arg->getType(), 2);
counter_phi->addIncoming(row_start_arg, entry);
for (auto & col : columns)
{
@ -555,7 +580,7 @@ static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & mod
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();
@ -600,35 +625,47 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * aggregate_data_places_type = b.getInt8Ty()->getPointerTo()->getPointerTo();
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), aggregate_data_places_type }, false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, size_type, column_data_type->getPointerTo(), aggregate_data_places_type }, false);
auto * aggregate_loop_func = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func->args().begin();
llvm::Value * rows_count_arg = &*arguments++;
llvm::Value * row_start_arg = &*arguments++;
llvm::Value * row_end_arg = &*arguments++;
llvm::Value * columns_arg = &*arguments++;
llvm::Value * aggregate_data_places_arg = &*arguments++;
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func);
b.SetInsertPoint(entry);
llvm::IRBuilder<> entry_builder(entry);
std::vector<ColumnDataPlaceholder> columns(functions.size());
for (size_t i = 0; i < functions.size(); ++i)
{
auto return_type = functions[i].function->getReturnType();
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, i));
columns[i].data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(return_type))->getPointerTo());
columns[i].null_init = return_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
columns[i].data_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].data_init, row_start_arg);
if (return_type->isNullable())
{
columns[i].null_init = b.CreateExtractValue(data, {1});
columns[i].null_init = entry_builder.CreateInBoundsGEP(nullptr, columns[i].null_init, row_start_arg);
}
else
{
columns[i].null_init = nullptr;
}
}
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.CreateCondBr(b.CreateICmpEQ(row_start_arg, row_end_arg), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
auto * counter_phi = b.CreatePHI(row_start_arg->getType(), 2);
counter_phi->addIncoming(row_start_arg, entry);
auto * aggregate_data_place_phi = b.CreatePHI(aggregate_data_places_type, 2);
aggregate_data_place_phi->addIncoming(aggregate_data_places_arg, entry);
@ -682,7 +719,7 @@ static void compileInsertAggregatesIntoResultColumns(llvm::Module & module, cons
aggregate_data_place_phi->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_place_phi, 1), cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.CreateCondBr(b.CreateICmpEQ(value, row_end_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();

View File

@ -26,6 +26,7 @@ struct ColumnData
*/
ColumnData getColumnData(const IColumn * column);
using ColumnDataRowsOffset = size_t;
using ColumnDataRowsSize = size_t;
using JITCompiledFunction = void (*)(ColumnDataRowsSize, ColumnData *);
@ -51,10 +52,10 @@ struct AggregateFunctionWithOffset
};
using JITCreateAggregateStatesFunction = void (*)(AggregateDataPtr);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITAddIntoAggregateStatesFunctionSinglePlace = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsOffset, ColumnDataRowsOffset, ColumnData *, AggregateDataPtr *);
using JITAddIntoAggregateStatesFunctionSinglePlace = void (*)(ColumnDataRowsOffset, ColumnDataRowsOffset, ColumnData *, AggregateDataPtr);
using JITMergeAggregateStatesFunction = void (*)(AggregateDataPtr, AggregateDataPtr);
using JITInsertAggregateStatesIntoColumnsFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITInsertAggregateStatesIntoColumnsFunction = void (*)(ColumnDataRowsOffset, ColumnDataRowsOffset, ColumnData *, AggregateDataPtr *);
struct CompiledAggregateFunctions
{

View File

@ -57,6 +57,17 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
pipeline.dropTotalsAndExtremes();
bool allow_to_use_two_level_group_by = pipeline.getNumStreams() > 1 || params.max_bytes_before_external_group_by != 0;
/// optimize_aggregation_in_order
if (group_by_info)
{
/// two-level aggregation is not supported anyway for in order aggregation.
allow_to_use_two_level_group_by = false;
/// It is incorrect for in order aggregation.
params.stats_collecting_params.disable();
}
if (!allow_to_use_two_level_group_by)
{
params.group_by_two_level_threshold = 0;
@ -71,78 +82,71 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
if (group_by_info)
{
bool need_finish_sorting = (group_by_info->order_key_prefix_descr.size() < group_by_sort_description.size());
if (need_finish_sorting)
if (pipeline.getNumStreams() > 1)
{
/// TOO SLOW
/** The pipeline is the following:
*
* --> AggregatingInOrder --> MergingAggregatedBucket
* --> AggregatingInOrder --> FinishAggregatingInOrder --> ResizeProcessor --> MergingAggregatedBucket
* --> AggregatingInOrder --> MergingAggregatedBucket
*/
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
/// We want to merge aggregated data in batches of size
/// not greater than 'aggregation_in_order_max_block_bytes'.
/// So, we reduce 'max_bytes' value for aggregation in 'merge_threads' times.
return std::make_shared<AggregatingInOrderTransform>(
header, transform_params,
group_by_info, group_by_sort_description,
max_block_size, aggregation_in_order_max_block_bytes / merge_threads,
many_data, counter++);
});
aggregating_in_order = collector.detachProcessors(0);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
transform_params,
group_by_sort_description,
max_block_size,
aggregation_in_order_max_block_bytes);
pipeline.addTransform(std::move(transform));
/// Do merge of aggregated data in parallel.
pipeline.resize(merge_threads);
pipeline.addSimpleTransform([&](const Block &)
{
return std::make_shared<MergingAggregatedBucketTransform>(transform_params);
});
aggregating_sorted = collector.detachProcessors(1);
}
else
{
if (pipeline.getNumStreams() > 1)
pipeline.addSimpleTransform([&](const Block & header)
{
/** The pipeline is the following:
*
* --> AggregatingInOrder --> MergingAggregatedBucket
* --> AggregatingInOrder --> FinishAggregatingInOrder --> ResizeProcessor --> MergingAggregatedBucket
* --> AggregatingInOrder --> MergingAggregatedBucket
*/
return std::make_shared<AggregatingInOrderTransform>(
header, transform_params,
group_by_info, group_by_sort_description,
max_block_size, aggregation_in_order_max_block_bytes);
});
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
/// We want to merge aggregated data in batches of size
/// not greater than 'aggregation_in_order_max_block_bytes'.
/// So, we reduce 'max_bytes' value for aggregation in 'merge_threads' times.
return std::make_shared<AggregatingInOrderTransform>(
header, transform_params, group_by_sort_description,
max_block_size, aggregation_in_order_max_block_bytes / merge_threads,
many_data, counter++);
});
aggregating_in_order = collector.detachProcessors(0);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
transform_params,
group_by_sort_description,
max_block_size,
aggregation_in_order_max_block_bytes);
pipeline.addTransform(std::move(transform));
/// Do merge of aggregated data in parallel.
pipeline.resize(merge_threads);
pipeline.addSimpleTransform([&](const Block &)
{
return std::make_shared<MergingAggregatedBucketTransform>(transform_params);
});
aggregating_sorted = collector.detachProcessors(1);
}
else
pipeline.addSimpleTransform([&](const Block & header)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(
header, transform_params, group_by_sort_description,
max_block_size, aggregation_in_order_max_block_bytes);
});
return std::make_shared<FinalizeAggregatedTransform>(header, transform_params);
});
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizeAggregatedTransform>(header, transform_params);
});
aggregating_in_order = collector.detachProcessors(0);
}
finalizing = collector.detachProcessors(2);
return;
aggregating_in_order = collector.detachProcessors(0);
}
finalizing = collector.detachProcessors(2);
return;
}
/// If there are several sources, then we perform parallel aggregation

View File

@ -159,8 +159,9 @@ void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggrega
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns,
columns_for_aggregator, no_more_keys);
aggregator->executeOnBlock(
aggregate_chunk, /* row_begin= */ 0, length,
aggregation_result, key_columns, columns_for_aggregator, no_more_keys);
}

View File

@ -1,6 +1,8 @@
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Storages/SelectQueryInfo.h>
#include <Core/SortCursor.h>
#include <Interpreters/sortBlock.h>
#include <base/range.h>
namespace DB
@ -9,16 +11,19 @@ namespace DB
AggregatingInOrderTransform::AggregatingInOrderTransform(
Block header,
AggregatingTransformParamsPtr params_,
InputOrderInfoPtr group_by_info_,
const SortDescription & group_by_description_,
size_t max_block_size_, size_t max_block_bytes_)
: AggregatingInOrderTransform(std::move(header), std::move(params_),
group_by_description_, max_block_size_, max_block_bytes_,
group_by_info_, group_by_description_,
max_block_size_, max_block_bytes_,
std::make_unique<ManyAggregatedData>(1), 0)
{
}
AggregatingInOrderTransform::AggregatingInOrderTransform(
Block header, AggregatingTransformParamsPtr params_,
InputOrderInfoPtr group_by_info_,
const SortDescription & group_by_description_,
size_t max_block_size_, size_t max_block_bytes_,
ManyAggregatedDataPtr many_data_, size_t current_variant)
@ -26,6 +31,8 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
, max_block_size(max_block_size_)
, max_block_bytes(max_block_bytes_)
, params(std::move(params_))
, group_by_info(group_by_info_)
, sort_description(group_by_description_)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
, variants(*many_data->variants[current_variant])
@ -33,8 +40,18 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
/// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform
res_header = params->getCustomHeader(false);
for (const auto & column_description : group_by_description_)
for (size_t i = 0; i < group_by_info->order_key_prefix_descr.size(); ++i)
{
const auto & column_description = group_by_description_[i];
group_by_description.emplace_back(column_description, res_header.getPositionByName(column_description.column_name));
}
if (group_by_info->order_key_prefix_descr.size() < group_by_description_.size())
{
group_by_key = true;
/// group_by_description may contains duplicates, so we use keys_size from Aggregator::params
key_columns_raw.resize(params->params.keys_size);
}
}
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
@ -70,6 +87,8 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
{
materialized_columns.push_back(chunk.getColumns().at(params->params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back();
if (group_by_key)
key_columns_raw[i] = materialized_columns.back().get();
}
Aggregator::NestedColumnsHolder nested_columns_holder;
@ -83,16 +102,19 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (!cur_block_size)
{
res_key_columns.resize(params->params.keys_size);
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
for (size_t i = 0; i < params->params.aggregates_size; ++i)
res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn();
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
params->aggregator.addArenasToAggregateColumns(variants, res_aggregate_columns);
if (!group_by_key)
{
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.aggregates_size; ++i)
res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn();
params->aggregator.addArenasToAggregateColumns(variants, res_aggregate_columns);
}
++cur_block_size;
}
@ -113,18 +135,26 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
{
if (group_by_key)
params->aggregator.executeOnBlockSmall(variants, key_begin, key_end, key_columns_raw, aggregate_function_instructions.data());
else
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
}
current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage;
/// We finalize last key aggregation state if a new key found.
if (key_end != rows)
{
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
if (!group_by_key)
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
/// If max_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk.
if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes)
{
if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false);
cur_block_bytes += current_memory_usage;
finalizeCurrentChunk(std::move(chunk), key_end);
return;
@ -155,7 +185,7 @@ void AggregatingInOrderTransform::finalizeCurrentChunk(Chunk chunk, size_t key_e
block_end_reached = true;
need_generate = true;
variants.without_key = nullptr;
variants.invalidate();
}
void AggregatingInOrderTransform::work()
@ -234,19 +264,34 @@ void AggregatingInOrderTransform::generate()
{
if (cur_block_size && is_consume_finished)
{
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
variants.without_key = nullptr;
if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false);
else
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
variants.invalidate();
}
Block res = res_header.cloneEmpty();
bool group_by_key_needs_empty_block = is_consume_finished && !cur_block_size;
if (!group_by_key || group_by_key_needs_empty_block)
{
Block res = res_header.cloneEmpty();
for (size_t i = 0; i < res_key_columns.size(); ++i)
res.getByPosition(i).column = std::move(res_key_columns[i]);
for (size_t i = 0; i < res_key_columns.size(); ++i)
res.getByPosition(i).column = std::move(res_key_columns[i]);
for (size_t i = 0; i < res_aggregate_columns.size(); ++i)
res.getByPosition(i + res_key_columns.size()).column = std::move(res_aggregate_columns[i]);
for (size_t i = 0; i < res_aggregate_columns.size(); ++i)
res.getByPosition(i + res_key_columns.size()).column = std::move(res_aggregate_columns[i]);
to_push_chunk = convertToChunk(res);
}
else
{
/// Sorting is required after aggregation, for proper merging, via
/// FinishAggregatingInOrderTransform/MergingAggregatedBucketTransform
sortBlock(group_by_block, sort_description);
to_push_chunk = convertToChunk(group_by_block);
}
to_push_chunk = convertToChunk(res);
if (!to_push_chunk.getNumRows())
return;

View File

@ -9,6 +9,9 @@
namespace DB
{
struct InputOrderInfo;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;
struct ChunkInfoWithAllocatedBytes : public ChunkInfo
{
explicit ChunkInfoWithAllocatedBytes(Int64 allocated_bytes_)
@ -20,12 +23,14 @@ class AggregatingInOrderTransform : public IProcessor
{
public:
AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params,
const SortDescription & group_by_description,
InputOrderInfoPtr group_by_info_,
const SortDescription & group_by_description_,
size_t max_block_size_, size_t max_block_bytes_,
ManyAggregatedDataPtr many_data, size_t current_variant);
AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params,
const SortDescription & group_by_description,
InputOrderInfoPtr group_by_info_,
const SortDescription & group_by_description_,
size_t max_block_size_, size_t max_block_bytes_);
~AggregatingInOrderTransform() override;
@ -51,7 +56,14 @@ private:
MutableColumns res_aggregate_columns;
AggregatingTransformParamsPtr params;
InputOrderInfoPtr group_by_info;
/// For sortBlock()
SortDescription sort_description;
SortDescriptionWithPositions group_by_description;
bool group_by_key = false;
Block group_by_block;
ColumnRawPtrs key_columns_raw;
Aggregator::AggregateColumns aggregate_columns;

View File

@ -533,7 +533,7 @@ void AggregatingTransform::consume(Chunk chunk)
}
else
{
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys))
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), 0, num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}
}

View File

@ -64,7 +64,7 @@ void CheckConstraintsTransform::onConsume(Chunk chunk)
/// Check if constraint value is nullable
const auto & null_map = column_nullable->getNullMapColumn();
const PaddedPODArray<UInt8> & null_map_data = null_map.getData();
bool null_map_contains_null = !memoryIsZero(null_map_data.raw_data(), null_map_data.size() * sizeof(UInt8));
bool null_map_contains_null = !memoryIsZero(null_map_data.raw_data(), 0, null_map_data.size() * sizeof(UInt8));
if (null_map_contains_null)
throw Exception(
@ -84,7 +84,7 @@ void CheckConstraintsTransform::onConsume(Chunk chunk)
size_t size = res_column_uint8.size();
/// Is violated.
if (!memoryIsByte(res_data, size, 1))
if (!memoryIsByte(res_data, 0, size, 1))
{
size_t row_idx = 0;
for (; row_idx < size; ++row_idx)

View File

@ -59,6 +59,7 @@
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Disks/createVolume.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h>

View File

@ -93,7 +93,7 @@ def check_settings(
node_name,
sleep_in_send_tables_status_ms,
sleep_in_send_data_ms,
sleep_after_receiving_query_ms
sleep_after_receiving_query_ms,
):
attempts = 0
while attempts < 1000:
@ -171,19 +171,19 @@ def update_configs(
"node_1",
node_1_sleep_in_send_tables_status,
node_1_sleep_in_send_data,
node_1_sleep_after_receiving_query
node_1_sleep_after_receiving_query,
)
check_settings(
"node_2",
node_2_sleep_in_send_tables_status,
node_2_sleep_in_send_data,
node_2_sleep_after_receiving_query
node_2_sleep_after_receiving_query,
)
check_settings(
"node_3",
node_3_sleep_in_send_tables_status,
node_3_sleep_in_send_data,
node_3_sleep_after_receiving_query
node_3_sleep_after_receiving_query,
)

View File

@ -0,0 +1,24 @@
1969-12-31 20:30:00
1970-01-01
1970-01-01
1970-01-01
1970-01-01
1970-01-01
1970-01-02 20:30:00
1969-12-31 20:30:00
1969-12-31 20:30:00
1969-12-31 20:30:00
1969-12-31 20:30:00
1969-12-31 20:30:00
1969-12-31 23:15:30
1970-01-01
1970-01-01
1970-01-01
1970-01-01
1970-01-01
1970-01-02 23:15:30
1969-12-31 23:15:30
1969-12-31 23:15:30
1969-12-31 23:15:30
1969-12-31 23:15:30
1969-12-31 23:15:30

View File

@ -0,0 +1,27 @@
-- America/Paramaribo : partial hours timezones
select toDateTime(0, 'America/Paramaribo');
select toMonday(toDateTime(0, 'America/Paramaribo'));
select toStartOfWeek(toDateTime(0, 'America/Paramaribo'));
select toStartOfMonth(toDateTime(0, 'America/Paramaribo'));
select toStartOfQuarter(toDateTime(0, 'America/Paramaribo'));
select toStartOfYear(toDateTime(0, 'America/Paramaribo'));
select toTime(toDateTime(0, 'America/Paramaribo'), 'America/Paramaribo');
select toStartOfMinute(toDateTime(0, 'America/Paramaribo'));
select toStartOfFiveMinute(toDateTime(0, 'America/Paramaribo'));
select toStartOfTenMinutes(toDateTime(0, 'America/Paramaribo'));
select toStartOfFifteenMinutes(toDateTime(0, 'America/Paramaribo'));
select toStartOfHour(toDateTime(0, 'America/Paramaribo'));
-- Africa/Monrovia : partial minutes timezones
select toDateTime(0, 'Africa/Monrovia');
select toMonday(toDateTime(0, 'Africa/Monrovia'));
select toStartOfWeek(toDateTime(0, 'Africa/Monrovia'));
select toStartOfMonth(toDateTime(0, 'Africa/Monrovia'));
select toStartOfQuarter(toDateTime(0, 'Africa/Monrovia'));
select toStartOfYear(toDateTime(0, 'Africa/Monrovia'));
select toTime(toDateTime(0, 'Africa/Monrovia'), 'Africa/Monrovia');
select toStartOfMinute(toDateTime(0, 'Africa/Monrovia'));
select toStartOfFiveMinute(toDateTime(0, 'Africa/Monrovia'));
select toStartOfTenMinutes(toDateTime(0, 'Africa/Monrovia'));
select toStartOfFifteenMinutes(toDateTime(0, 'Africa/Monrovia'));
select toStartOfHour(toDateTime(0, 'Africa/Monrovia'));

View File

@ -0,0 +1,205 @@
0 0 0
-- { echoOn }
insert into data_02233 select number%10, number%3, number from numbers(100);
explain pipeline select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1;
(Expression)
ExpressionTransform × 2
(Sorting)
MergeSortingTransform
LimitsCheckingTransform
PartialSortingTransform
(Expression)
ExpressionTransform × 2
(TotalsHaving)
TotalsHavingTransform 1 → 2
(Aggregating)
FinalizeAggregatedTransform
AggregatingInOrderTransform
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromMergeTree)
MergeTreeInOrder 0 → 1
explain pipeline select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1;
(Expression)
ExpressionTransform × 2
(Sorting)
MergeSortingTransform
LimitsCheckingTransform
PartialSortingTransform
(Expression)
ExpressionTransform × 2
(TotalsHaving)
TotalsHavingTransform 1 → 2
(Aggregating)
AggregatingTransform
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromMergeTree)
MergeTreeInOrder 0 → 1
select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1;
0 0 4
0 1 3
0 2 3
1 0 3
1 1 4
1 2 3
2 0 3
2 1 3
2 2 4
3 0 4
3 1 3
3 2 3
4 0 3
4 1 4
4 2 3
5 0 3
5 1 3
5 2 4
6 0 4
6 1 3
6 2 3
7 0 3
7 1 4
7 2 3
8 0 3
8 1 3
8 2 4
9 0 4
9 1 3
9 2 3
0 0 100
select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1, max_block_size=1;
0 0 4
0 1 3
0 2 3
1 0 3
1 1 4
1 2 3
2 0 3
2 1 3
2 2 4
3 0 4
3 1 3
3 2 3
4 0 3
4 1 4
4 2 3
5 0 3
5 1 3
5 2 4
6 0 4
6 1 3
6 2 3
7 0 3
7 1 4
7 2 3
8 0 3
8 1 3
8 2 4
9 0 4
9 1 3
9 2 3
0 0 100
select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1;
0 0 4
0 1 3
0 2 3
1 0 3
1 1 4
1 2 3
2 0 3
2 1 3
2 2 4
3 0 4
3 1 3
3 2 3
4 0 3
4 1 4
4 2 3
5 0 3
5 1 3
5 2 4
6 0 4
6 1 3
6 2 3
7 0 3
7 1 4
7 2 3
8 0 3
8 1 3
8 2 4
9 0 4
9 1 3
9 2 3
0 0 100
-- fuzzer
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
0 0 0
0 1 0
0 2 0
0 3 0
0 4 0
0 5 0
0 6 0
0 7 0
0 8 0
0 9 0
1 0 1
1 1 1
1 2 1
1 3 1
1 4 1
1 5 1
1 6 1
1 7 1
1 8 1
1 9 1
2 0 2
2 1 2
2 2 2
2 3 2
2 4 2
2 5 2
2 6 2
2 7 2
2 8 2
2 9 2
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key WITH TOTALS ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
0 0 0
0 1 0
0 2 0
0 3 0
0 4 0
0 5 0
0 6 0
0 7 0
0 8 0
0 9 0
1 0 1
1 1 1
1 2 1
1 3 1
1 4 1
1 5 1
1 6 1
1 7 1
1 8 1
1 9 1
2 0 2
2 1 2
2 2 2
2 3 2
2 4 2
2 5 2
2 6 2
2 7 2
2 8 2
2 9 2
0 0 0

View File

@ -0,0 +1,21 @@
drop table if exists data_02233;
create table data_02233 (parent_key Int, child_key Int, value Int) engine=MergeTree() order by parent_key;
-- before inserting data, it may produce empty header
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key ORDER BY parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key WITH TOTALS ORDER BY parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
-- { echoOn }
insert into data_02233 select number%10, number%3, number from numbers(100);
explain pipeline select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1;
explain pipeline select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1;
select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1;
select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1, max_block_size=1;
select parent_key, child_key, count() from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1;
-- fuzzer
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key WITH TOTALS ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
-- { echoOff }
drop table data_02233;

View File

@ -0,0 +1,70 @@
-- { echoOn }
explain pipeline select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1;
(Expression)
ExpressionTransform × 2
(Sorting)
MergeSortingTransform
LimitsCheckingTransform
PartialSortingTransform
(Expression)
ExpressionTransform × 2
(TotalsHaving)
TotalsHavingTransform 1 → 2
(Aggregating)
MergingAggregatedBucketTransform
FinishAggregatingInOrderTransform 2 → 1
AggregatingInOrderTransform × 2
(Expression)
ExpressionTransform × 2
(SettingQuotaAndLimits)
(ReadFromMergeTree)
MergeTreeInOrder × 2 0 → 1
explain pipeline select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1;
(Expression)
ExpressionTransform × 2
(Sorting)
MergeSortingTransform
LimitsCheckingTransform
PartialSortingTransform
(Expression)
ExpressionTransform × 2
(TotalsHaving)
TotalsHavingTransform 1 → 2
(Aggregating)
AggregatingTransform
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromMergeTree)
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings optimize_aggregation_in_order=1;
[1,2] 10 100 2000
[1,2] 20 200 4000
[1,1,2,2] 0 0 6000
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings optimize_aggregation_in_order=1, max_block_size=1;
[1,2] 10 100 2000
[1,2] 20 200 4000
[1,1,2,2] 0 0 6000
-- sum() can be compiled, check that compiled version works correctly
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings optimize_aggregation_in_order=1, compile_aggregate_expressions=1, min_count_to_compile_aggregate_expression=0;
[1,2] 10 100 2000
[1,2] 20 200 4000
[1,1,2,2] 0 0 6000
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key;
[1,2] 10 100 2000
[1,2] 20 200 4000
[1,1,2,2] 0 0 6000
-- fuzzer
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
100 10 100
200 20 200
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key WITH TOTALS ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
100 10 100
200 20 200
0 0 0

View File

@ -0,0 +1,21 @@
drop table if exists data_02233;
create table data_02233 (partition Int, parent_key Int, child_key Int, value Int) engine=MergeTree() partition by partition order by parent_key;
insert into data_02233 values (1, 10, 100, 1000)(1, 20, 200, 2000);
insert into data_02233 values (2, 10, 100, 1000)(2, 20, 200, 2000);
-- { echoOn }
explain pipeline select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1, optimize_aggregation_in_order=1;
explain pipeline select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings max_threads=1;
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings optimize_aggregation_in_order=1;
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings optimize_aggregation_in_order=1, max_block_size=1;
-- sum() can be compiled, check that compiled version works correctly
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key settings optimize_aggregation_in_order=1, compile_aggregate_expressions=1, min_count_to_compile_aggregate_expression=0;
select groupArraySorted(partition), parent_key, child_key, sum(value) from data_02233 group by parent_key, child_key with totals order by parent_key, child_key;
-- fuzzer
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
SELECT child_key, parent_key, child_key FROM data_02233 GROUP BY parent_key, child_key, child_key WITH TOTALS ORDER BY child_key, parent_key ASC NULLS LAST SETTINGS max_threads = 1, optimize_aggregation_in_order = 1;
-- { echoOff }
drop table data_02233;