diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index f45c30a6c55..ebda123575b 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -28,9 +28,22 @@ namespace DB /** Разные структуры данных, которые могут использоваться для агрегации - * Для эффективности сами данные для агрегации кладутся в пул. + * Для эффективности, сами данные для агрегации кладутся в пул. * Владение данными (состояний агрегатных функций) и пулом * захватывается позднее - в функции convertToBlock, объектом ColumnAggregateFunction. + * + * Большинство структур данных существует в двух вариантах: обычном и двухуровневом (TwoLevel). + * Двухуровневая хэш-таблица работает чуть медленнее при маленьком количестве различных ключей, + * но при большом количестве различных ключей лучше масштабируется, так как позволяет + * распараллелить некоторые операции (слияние, пост-обработку) естественным образом. + * + * Чтобы обеспечить эффективную работу в большом диапазоне условий, + * сначала используются одноуровневые хэш-таблицы, + * а при достижении количеством различных ключей достаточно большого размера, + * они конвертируются в двухуровневые. + * + * PS. Существует много различных подходов к эффективной реализации параллельной и распределённой агрегации, + * лучшим образом подходящих для разных случаев, и этот подход - всего лишь один из них, выбранный по совокупности причин. */ typedef AggregateDataPtr AggregatedDataWithoutKey; @@ -55,7 +68,10 @@ struct TrivialHash } }; -/// Превращает хэш-таблицу в что-то типа lookup-таблицы. Остаётся неоптимальность - в ячейках хранятся ключи. +/** Превращает хэш-таблицу в что-то типа lookup-таблицы. Остаётся неоптимальность - в ячейках хранятся ключи. + * Также компилятору не удаётся полностью удалить код хождения по цепочке разрешения коллизий, хотя он не нужен. + * TODO Переделать в полноценную lookup-таблицу. + */ template struct HashTableFixedGrower { @@ -153,9 +169,9 @@ struct AggregationMethodOneNumber /** Вставить ключ из хэш-таблицы в столбцы. */ - static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) { - static_cast *>(key_columns[0])->insertData(reinterpret_cast(&it->first), sizeof(it->first)); + static_cast *>(key_columns[0])->insertData(reinterpret_cast(&value.first), sizeof(value.first)); } }; @@ -206,9 +222,9 @@ struct AggregationMethodString it->first.data = pool.insert(it->first.data, it->first.size); } - static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) { - key_columns[0]->insertData(it->first.data, it->first.size); + key_columns[0]->insertData(value.first.data, value.first.size); } }; @@ -259,9 +275,9 @@ struct AggregationMethodFixedString it->first.data = pool.insert(it->first.data, it->first.size); } - static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) { - key_columns[0]->insertData(it->first.data, it->first.size); + key_columns[0]->insertData(value.first.data, value.first.size); } }; @@ -304,13 +320,13 @@ struct AggregationMethodKeys128 { } - static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) { size_t offset = 0; for (size_t i = 0; i < keys_size; ++i) { size_t size = key_sizes[i]; - key_columns[i]->insertData(reinterpret_cast(&it->first) + offset, size); + key_columns[i]->insertData(reinterpret_cast(&value.first) + offset, size); offset += size; } } @@ -356,10 +372,10 @@ struct AggregationMethodHashed it->second.first = placeKeysInPool(i, keys_size, keys, pool); } - static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + static void insertKeyIntoColumns(const typename Data::value_type & value, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) { for (size_t i = 0; i < keys_size; ++i) - key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size); + key_columns[i]->insertDataWithTerminatingZero(value.second.first[i].data, value.second.first[i].size); } }; @@ -730,6 +746,26 @@ protected: const Sizes & key_sizes, size_t start_row, bool final) const; + template + void convertToBlockImplFinal( + Method & method, + Table & data, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row) const; + + template + void convertToBlockImplNotFinal( + Method & method, + Table & data, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row) const; + template void destroyImpl( Method & method) const; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index bf474e29943..40c94479644 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -304,43 +304,6 @@ void NO_INLINE Aggregator::executeImplCase( #pragma GCC diagnostic pop - -template -void NO_INLINE Aggregator::convertToBlockImpl( - Method & method, - ColumnPlainPtrs & key_columns, - AggregateColumnsData & aggregate_columns, - ColumnPlainPtrs & final_aggregate_columns, - const Sizes & key_sizes, - size_t start_row, - bool final) const -{ - if (!final) - { - size_t j = start_row; - for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j) - { - method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); - - for (size_t i = 0; i < aggregates_size; ++i) - (*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]; - } - } - else - { - for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it) - { - method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); - - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->insertResultInto( - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], - *final_aggregate_columns[i]); - } - } -} - - template void NO_INLINE Aggregator::mergeDataImpl( Table & table_dst, @@ -507,6 +470,65 @@ void NO_INLINE Aggregator::mergeStreamsImpl( } +template +void Aggregator::convertToBlockImpl( + Method & method, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row, + bool final) const +{ + if (final) + convertToBlockImplFinal(method, method.data, key_columns, aggregate_columns, final_aggregate_columns, key_sizes, start_row); + else + convertToBlockImplNotFinal(method, method.data, key_columns, aggregate_columns, final_aggregate_columns, key_sizes, start_row); +} + + +template +void NO_INLINE Aggregator::convertToBlockImplFinal( + Method & method, + Table & data, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row) const +{ + for (typename Table::const_iterator it = data.begin(); it != data.end(); ++it) + { + method.insertKeyIntoColumns(*it, key_columns, keys_size, key_sizes); + + for (size_t i = 0; i < aggregates_size; ++i) + aggregate_functions[i]->insertResultInto( + Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], + *final_aggregate_columns[i]); + } +} + +template +void NO_INLINE Aggregator::convertToBlockImplNotFinal( + Method & method, + Table & data, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row) const +{ + size_t j = start_row; + for (typename Table::const_iterator it = data.begin(); it != data.end(); ++it, ++j) + { + method.insertKeyIntoColumns(*it, key_columns, keys_size, key_sizes); + + for (size_t i = 0; i < aggregates_size; ++i) + (*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]; + } +} + + template void NO_INLINE Aggregator::destroyImpl( Method & method) const