dbms: added aggregation method 'concat' - performance improvement in generic case [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-02-22 19:09:16 +03:00
parent f8f806ff58
commit dc6259d931
5 changed files with 162 additions and 16 deletions

View File

@ -63,7 +63,7 @@ private:
size_t growth_factor; size_t growth_factor;
size_t linear_growth_threshold; size_t linear_growth_threshold;
/// Последний непрерывный кусок памяти. /// Последний непрерывный кусок памяти.
Chunk * head; Chunk * head;
size_t size_in_bytes; size_t size_in_bytes;
@ -77,7 +77,7 @@ private:
size_t nextSize(size_t min_next_size) const size_t nextSize(size_t min_next_size) const
{ {
size_t size_after_grow = 0; size_t size_after_grow = 0;
if (head->size() < linear_growth_threshold) if (head->size() < linear_growth_threshold)
size_after_grow = head->size() * growth_factor; size_after_grow = head->size() * growth_factor;
else else
@ -119,6 +119,14 @@ public:
return res; return res;
} }
/** Отменить только что сделанное выделение памяти.
* Нужно передать размер не меньше того, который был только что выделен.
*/
void rollback(size_t size)
{
head->pos -= size;
}
/// Вставить строку без выравнивания. /// Вставить строку без выравнивания.
const char * insert(const char * data, size_t size) const char * insert(const char * data, size_t size)
{ {
@ -126,7 +134,7 @@ public:
memcpy(res, data, size); memcpy(res, data, size);
return res; return res;
} }
/// Размер выделенного пула в байтах /// Размер выделенного пула в байтах
size_t size() const size_t size() const
{ {

View File

@ -143,4 +143,41 @@ static inline StringRef * ALWAYS_INLINE extractKeysAndPlaceInPool(
} }
/** Скопировать ключи в пул в непрерывный кусок памяти.
* Потом разместить в пуле StringRef-ы на них.
*
* [key1][key2]...[keyN][ref1][ref2]...[refN]
* ^---------------------| |
* ^---------------------|
* ^---return-value----^
*
* Вернуть StringRef на кусок памяти с ключами (без учёта StringRef-ов после них).
*/
static inline StringRef ALWAYS_INLINE extractKeysAndPlaceInPoolContiguous(
size_t i, size_t keys_size, const ConstColumnPlainPtrs & key_columns, StringRefs & keys, Arena & pool)
{
size_t sum_keys_size = 0;
for (size_t j = 0; j < keys_size; ++j)
{
keys[j] = key_columns[j]->getDataAtWithTerminatingZero(i);
sum_keys_size += keys[j].size;
}
char * res = pool.alloc(sum_keys_size + keys_size * sizeof(StringRef));
char * place = res;
for (size_t j = 0; j < keys_size; ++j)
{
memcpy(place, keys[j].data, keys[j].size);
keys[j].data = place;
place += keys[j].size;
}
/// Размещаем в пуле StringRef-ы на только что скопированные ключи.
memcpy(place, &keys[0], keys_size * sizeof(StringRef));
return {res, sum_keys_size};
}
} }

View File

@ -101,7 +101,8 @@ struct AggregationMethodOneNumber
size_t keys_size, /// Количество ключевых столбцов. size_t keys_size, /// Количество ключевых столбцов.
size_t i, /// Из какой строки блока достать ключ. size_t i, /// Из какой строки блока достать ключ.
const Sizes & key_sizes, /// Если ключи фиксированной длины - их длины. Не используется в методах агрегации по ключам переменной длины. const Sizes & key_sizes, /// Если ключи фиксированной длины - их длины. Не используется в методах агрегации по ключам переменной длины.
StringRefs & keys) const /// Сюда могут быть записаны ссылки на данные ключей в столбцах. Они могут быть использованы в дальнейшем. StringRefs & keys, /// Сюда могут быть записаны ссылки на данные ключей в столбцах. Они могут быть использованы в дальнейшем.
Arena & pool) const
{ {
return unionCastToUInt64(vec[i]); return unionCastToUInt64(vec[i]);
} }
@ -117,6 +118,10 @@ struct AggregationMethodOneNumber
{ {
} }
/** Действие, которое нужно сделать, если ключ не новый. Например, откатить выделение памяти в пуле.
*/
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) {}
/** Вставить ключ из хэш-таблицы в столбцы. /** Вставить ключ из хэш-таблицы в столбцы.
*/ */
static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
@ -161,7 +166,8 @@ struct AggregationMethodString
size_t keys_size, size_t keys_size,
size_t i, size_t i,
const Sizes & key_sizes, const Sizes & key_sizes,
StringRefs & keys) const StringRefs & keys,
Arena & pool) const
{ {
return StringRef( return StringRef(
&(*chars)[i == 0 ? 0 : (*offsets)[i - 1]], &(*chars)[i == 0 ? 0 : (*offsets)[i - 1]],
@ -177,6 +183,8 @@ struct AggregationMethodString
value.first.data = pool.insert(value.first.data, value.first.size); value.first.data = pool.insert(value.first.data, value.first.size);
} }
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) {}
static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{ {
key_columns[0]->insertData(value.first.data, value.first.size); key_columns[0]->insertData(value.first.data, value.first.size);
@ -219,7 +227,8 @@ struct AggregationMethodFixedString
size_t keys_size, size_t keys_size,
size_t i, size_t i,
const Sizes & key_sizes, const Sizes & key_sizes,
StringRefs & keys) const StringRefs & keys,
Arena & pool) const
{ {
return StringRef(&(*chars)[i * n], n); return StringRef(&(*chars)[i * n], n);
} }
@ -233,6 +242,8 @@ struct AggregationMethodFixedString
value.first.data = pool.insert(value.first.data, value.first.size); value.first.data = pool.insert(value.first.data, value.first.size);
} }
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) {}
static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{ {
key_columns[0]->insertData(value.first.data, value.first.size); key_columns[0]->insertData(value.first.data, value.first.size);
@ -268,7 +279,8 @@ struct AggregationMethodKeysFixed
size_t keys_size, size_t keys_size,
size_t i, size_t i,
const Sizes & key_sizes, const Sizes & key_sizes,
StringRefs & keys) const StringRefs & keys,
Arena & pool) const
{ {
return packFixed<Key>(i, keys_size, key_columns, key_sizes); return packFixed<Key>(i, keys_size, key_columns, key_sizes);
} }
@ -281,6 +293,8 @@ struct AggregationMethodKeysFixed
{ {
} }
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) {}
static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{ {
size_t offset = 0; size_t offset = 0;
@ -294,6 +308,64 @@ struct AggregationMethodKeysFixed
}; };
/// Для остальных случаев. Агрегирует по конкатенации ключей. (При этом, строки, содержащие нули посередине, могут склеиться.)
template <typename TData>
struct AggregationMethodConcat
{
typedef TData Data;
typedef typename Data::key_type Key;
typedef typename Data::mapped_type Mapped;
typedef typename Data::iterator iterator;
typedef typename Data::const_iterator const_iterator;
Data data;
AggregationMethodConcat() {}
template <typename Other>
AggregationMethodConcat(const Other & other) : data(other.data) {}
struct State
{
void init(ConstColumnPlainPtrs & key_columns)
{
}
Key getKey(
const ConstColumnPlainPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes,
StringRefs & keys,
Arena & pool) const
{
return extractKeysAndPlaceInPoolContiguous(i, keys_size, key_columns, keys, pool);
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, StringRefs & keys, Arena & pool)
{
}
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool)
{
pool.rollback(key.size + keys.size() * sizeof(keys[0]));
}
static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{
/// См. функцию extractKeysAndPlaceInPoolContiguous.
const StringRef * key_refs = reinterpret_cast<const StringRef *>(value.first.data + value.first.size);
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDataWithTerminatingZero(key_refs[i].data, key_refs[i].size);
}
};
/// Для остальных случаев. Агрегирует по 128 битному хэшу от ключа. (При этом, строки, содержащие нули посередине, могут склеиться.) /// Для остальных случаев. Агрегирует по 128 битному хэшу от ключа. (При этом, строки, содержащие нули посередине, могут склеиться.)
template <typename TData> template <typename TData>
struct AggregationMethodHashed struct AggregationMethodHashed
@ -322,7 +394,8 @@ struct AggregationMethodHashed
size_t keys_size, size_t keys_size,
size_t i, size_t i,
const Sizes & key_sizes, const Sizes & key_sizes,
StringRefs & keys) const StringRefs & keys,
Arena & pool) const
{ {
return hash128(i, keys_size, key_columns, keys); return hash128(i, keys_size, key_columns, keys);
} }
@ -336,6 +409,8 @@ struct AggregationMethodHashed
value.second.first = placeKeysInPool(i, keys_size, keys, pool); value.second.first = placeKeysInPool(i, keys_size, keys, pool);
} }
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) {}
static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes)
{ {
for (size_t i = 0; i < keys_size; ++i) for (size_t i = 0; i < keys_size; ++i)
@ -388,6 +463,7 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128; std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256; std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
std::unique_ptr<AggregationMethodHashed<AggregatedDataHashed>> hashed; std::unique_ptr<AggregationMethodHashed<AggregatedDataHashed>> hashed;
std::unique_ptr<AggregationMethodConcat<AggregatedDataWithStringKey>> concat;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level; std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level; std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
@ -396,6 +472,7 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level; std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level; std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
std::unique_ptr<AggregationMethodHashed<AggregatedDataHashedTwoLevel>> hashed_two_level; std::unique_ptr<AggregationMethodHashed<AggregatedDataHashedTwoLevel>> hashed_two_level;
std::unique_ptr<AggregationMethodConcat<AggregatedDataWithStringKeyTwoLevel>> concat_two_level;
/// В этом и подобных макросах, вариант without_key не учитывается. /// В этом и подобных макросах, вариант without_key не учитывается.
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \ #define APPLY_FOR_AGGREGATED_VARIANTS(M) \
@ -408,13 +485,15 @@ struct AggregatedDataVariants : private boost::noncopyable
M(keys128, false) \ M(keys128, false) \
M(keys256, false) \ M(keys256, false) \
M(hashed, false) \ M(hashed, false) \
M(concat, false) \
M(key32_two_level, true) \ M(key32_two_level, true) \
M(key64_two_level, true) \ M(key64_two_level, true) \
M(key_string_two_level, true) \ M(key_string_two_level, true) \
M(key_fixed_string_two_level, true) \ M(key_fixed_string_two_level, true) \
M(keys128_two_level, true) \ M(keys128_two_level, true) \
M(keys256_two_level, true) \ M(keys256_two_level, true) \
M(hashed_two_level, true) M(hashed_two_level, true) \
M(concat_two_level, true)
enum class Type enum class Type
{ {
@ -527,7 +606,8 @@ struct AggregatedDataVariants : private boost::noncopyable
M(key_fixed_string) \ M(key_fixed_string) \
M(keys128) \ M(keys128) \
M(keys256) \ M(keys256) \
M(hashed) M(hashed) \
M(concat)
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \ #define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key8) \ M(key8) \
@ -557,7 +637,8 @@ struct AggregatedDataVariants : private boost::noncopyable
M(key_fixed_string_two_level) \ M(key_fixed_string_two_level) \
M(keys128_two_level) \ M(keys128_two_level) \
M(keys256_two_level) \ M(keys256_two_level) \
M(hashed_two_level) M(hashed_two_level) \
M(concat_two_level)
}; };
typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr; typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;

View File

@ -193,7 +193,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys. bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу. /// Получаем ключ для вставки в хэш-таблицу.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys); typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool);
if (!no_more_keys) /// Вставляем. if (!no_more_keys) /// Вставляем.
{ {
@ -206,6 +206,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
AggregateFunctionsList::forEach(AggregateFunctionsUpdater( AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i)); aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i));
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
} }
else else
@ -224,7 +225,10 @@ void NO_INLINE Aggregator::executeSpecializedCase(
/// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего. /// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего.
if (no_more_keys && overflow && !overflow_row) if (no_more_keys && overflow && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
}
/// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключом. /// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключом.
if (inserted) if (inserted)
@ -237,6 +241,8 @@ void NO_INLINE Aggregator::executeSpecializedCase(
AggregateFunctionsList::forEach(AggregateFunctionsCreator( AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, aggregate_data)); aggregate_functions, offsets_of_aggregate_states, aggregate_columns, aggregate_data));
} }
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row; AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;

View File

@ -357,8 +357,10 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
if (keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0])) if (keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0]))
return AggregatedDataVariants::Type::key_fixed_string; return AggregatedDataVariants::Type::key_fixed_string;
/// Иначе будем агрегировать по хэшу от ключей. /// Иначе будем агрегировать по конкатенации ключей.
return AggregatedDataVariants::Type::hashed; return AggregatedDataVariants::Type::concat;
/// NOTE AggregatedDataVariants::Type::hashed не используется.
} }
@ -439,7 +441,7 @@ void NO_INLINE Aggregator::executeImplCase(
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys. bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу. /// Получаем ключ для вставки в хэш-таблицу.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys); typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool);
if (!no_more_keys) /// Вставляем. if (!no_more_keys) /// Вставляем.
{ {
@ -451,6 +453,7 @@ void NO_INLINE Aggregator::executeImplCase(
for (size_t j = 0; j < aggregates_size; ++j) /// NOTE: Заменить индекс на два указателя? for (size_t j = 0; j < aggregates_size; ++j) /// NOTE: Заменить индекс на два указателя?
aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i);
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
} }
else else
@ -469,7 +472,10 @@ void NO_INLINE Aggregator::executeImplCase(
/// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего. /// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего.
if (no_more_keys && overflow && !overflow_row) if (no_more_keys && overflow && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
}
/// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключом. /// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключом.
if (inserted) if (inserted)
@ -480,6 +486,8 @@ void NO_INLINE Aggregator::executeImplCase(
aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states); aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(aggregate_data); createAggregateStates(aggregate_data);
} }
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row; AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
@ -1281,6 +1289,8 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
mergeSingleLevelDataImpl<decltype(res->keys256)::element_type>(non_empty_data); mergeSingleLevelDataImpl<decltype(res->keys256)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::hashed) else if (res->type == AggregatedDataVariants::Type::hashed)
mergeSingleLevelDataImpl<decltype(res->hashed)::element_type>(non_empty_data); mergeSingleLevelDataImpl<decltype(res->hashed)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::concat)
mergeSingleLevelDataImpl<decltype(res->concat)::element_type>(non_empty_data);
else if (res->type == AggregatedDataVariants::Type::key32_two_level) else if (res->type == AggregatedDataVariants::Type::key32_two_level)
mergeTwoLevelDataImpl<decltype(res->key32_two_level)::element_type>(non_empty_data, thread_pool.get()); mergeTwoLevelDataImpl<decltype(res->key32_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::key64_two_level) else if (res->type == AggregatedDataVariants::Type::key64_two_level)
@ -1295,6 +1305,8 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
mergeTwoLevelDataImpl<decltype(res->keys256_two_level)::element_type>(non_empty_data, thread_pool.get()); mergeTwoLevelDataImpl<decltype(res->keys256_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::hashed_two_level) else if (res->type == AggregatedDataVariants::Type::hashed_two_level)
mergeTwoLevelDataImpl<decltype(res->hashed_two_level)::element_type>(non_empty_data, thread_pool.get()); mergeTwoLevelDataImpl<decltype(res->hashed_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type == AggregatedDataVariants::Type::concat_two_level)
mergeTwoLevelDataImpl<decltype(res->concat_two_level)::element_type>(non_empty_data, thread_pool.get());
else if (res->type != AggregatedDataVariants::Type::without_key) else if (res->type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -1341,7 +1353,7 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
bool inserted; /// Вставили новый ключ, или такой ключ уже был? bool inserted; /// Вставили новый ключ, или такой ключ уже был?
/// Получаем ключ для вставки в хэш-таблицу. /// Получаем ключ для вставки в хэш-таблицу.
auto key = state.getKey(key_columns, keys_size, i, result.key_sizes, keys); auto key = state.getKey(key_columns, keys_size, i, result.key_sizes, keys, *aggregates_pool);
data.emplace(key, it, inserted); data.emplace(key, it, inserted);
@ -1353,6 +1365,8 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states); aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(aggregate_data); createAggregateStates(aggregate_data);
} }
else
method.onExistingKey(key, keys, *aggregates_pool);
/// Мерджим состояния агрегатных функций. /// Мерджим состояния агрегатных функций.
for (size_t j = 0; j < aggregates_size; ++j) for (size_t j = 0; j < aggregates_size; ++j)