diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 0375f4e9853..78651f88072 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -19,6 +19,11 @@ #include #include +#include +#include +#include + + namespace DB { @@ -47,8 +52,242 @@ typedef HashMap AggregatedDataWithStringKey; typedef HashMap AggregatedDataWithKeys128; typedef HashMap, UInt128TrivialHash> AggregatedDataHashed; -class Aggregator; +/// Для случая, когда есть один числовой ключ. +struct AggregationMethodKey64 +{ + typedef AggregatedDataWithUInt64Key Data; + typedef Data::key_type Key; + typedef Data::mapped_type Mapped; + typedef Data::iterator iterator; + typedef Data::const_iterator const_iterator; + + Data data; + + const IColumn * column; + + /** Вызывается в начале обработки каждого блока. + * Устанавливает переменные, необходимые для остальных методов, вызываемых во внутренних циклах. + */ + void init(ConstColumnPlainPtrs & key_columns) + { + column = key_columns[0]; + } + + /// Достать из ключевых столбцов ключ для вставки в хэш-таблицу. + Key getKey( + const ConstColumnPlainPtrs & key_columns, /// Ключевые столбцы. + size_t keys_size, /// Количество ключевых столбцов. + size_t i, /// Из какой строки блока достать ключ. + const Sizes & key_sizes, /// Если ключи фиксированной длины - их длины. Не используется в методах агрегации по ключам переменной длины. + StringRefs & keys) const /// Сюда могут быть записаны ссылки на данные ключей в столбцах. Они могут быть использованы в дальнейшем. + { + return column->get64(i); + } + + /// Из значения в хэш-таблице получить AggregateDataPtr. + static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } + static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } + + /** Разместить дополнительные данные, если это необходимо, в случае, когда в хэш-таблицу был вставлен новый ключ. + */ + void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys) + { + } + + /** Вставить ключ из хэш-таблицы в столбцы. + */ + static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + { + key_columns[0]->insertData(reinterpret_cast(&it->first), sizeof(it->first)); + } +}; + + +/// Для случая, когда есть один строковый ключ. +struct AggregationMethodString +{ + typedef AggregatedDataWithStringKey Data; + typedef Data::key_type Key; + typedef Data::mapped_type Mapped; + typedef Data::iterator iterator; + typedef Data::const_iterator const_iterator; + + Data data; + Arena string_pool; /// NOTE Может быть лучше вместо этого использовать aggregates_pool? + + const ColumnString::Offsets_t * offsets; + const ColumnString::Chars_t * chars; + + void init(ConstColumnPlainPtrs & key_columns) + { + const IColumn & column = *key_columns[0]; + const ColumnString & column_string = static_cast(column); + offsets = &column_string.getOffsets(); + chars = &column_string.getChars(); + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes, + StringRefs & keys) const + { + return StringRef(&(*chars)[i == 0 ? 0 : (*offsets)[i - 1]], (i == 0 ? (*offsets)[i] : ((*offsets)[i] - (*offsets)[i - 1])) - 1); + } + + static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } + static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } + + void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys) + { + it->first.data = string_pool.insert(it->first.data, it->first.size); + } + + static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + { + key_columns[0]->insertData(it->first.data, it->first.size); + } +}; + + +/// Для случая, когда есть один строковый ключ фиксированной длины. +struct AggregationMethodFixedString +{ + typedef AggregatedDataWithStringKey Data; + typedef Data::key_type Key; + typedef Data::mapped_type Mapped; + typedef Data::iterator iterator; + typedef Data::const_iterator const_iterator; + + Data data; + Arena string_pool; /// NOTE Может быть лучше вместо этого использовать aggregates_pool? + + size_t n; + const ColumnFixedString::Chars_t * chars; + + void init(ConstColumnPlainPtrs & key_columns) + { + const IColumn & column = *key_columns[0]; + const ColumnFixedString & column_string = static_cast(column); + n = column_string.getN(); + chars = &column_string.getChars(); + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes, + StringRefs & keys) const + { + return StringRef(&(*chars)[i * n], n); + } + + static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } + static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } + + void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys) + { + it->first.data = string_pool.insert(it->first.data, it->first.size); + } + + static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + { + key_columns[0]->insertData(it->first.data, it->first.size); + } +}; + + +/// Для случая, когда все ключи фиксированной длины, и они помещаются в 128 бит. +struct AggregationMethodKeys128 +{ + typedef AggregatedDataWithKeys128 Data; + typedef Data::key_type Key; + typedef Data::mapped_type Mapped; + typedef Data::iterator iterator; + typedef Data::const_iterator const_iterator; + + Data data; + + void init(ConstColumnPlainPtrs & key_columns) + { + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes, + StringRefs & keys) const + { + return pack128(i, keys_size, key_columns, key_sizes); + } + + static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } + static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } + + void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys) + { + } + + static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + { + size_t offset = 0; + for (size_t i = 0; i < keys_size; ++i) + { + size_t size = key_sizes[i]; + key_columns[i]->insertData(reinterpret_cast(&it->first) + offset, size); + offset += size; + } + } +}; + + +/// Для остальных случаев. Агрегирует по 128 битному хэшу от ключа. (При этом, строки, содержащие нули посередине, могут склеиться.) +struct AggregationMethodHashed +{ + typedef AggregatedDataHashed Data; + typedef Data::key_type Key; + typedef Data::mapped_type Mapped; + typedef Data::iterator iterator; + typedef Data::const_iterator const_iterator; + + Data data; + Arena keys_pool; /// NOTE Может быть лучше вместо этого использовать aggregates_pool? + + void init(ConstColumnPlainPtrs & key_columns) + { + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes, + StringRefs & keys) const + { + return hash128(i, keys_size, key_columns, keys); + } + + static AggregateDataPtr & getAggregateData(Mapped & value) { return value.second; } + static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value.second; } + + void onNewKey(iterator & it, size_t keys_size, size_t i, StringRefs & keys) + { + it->second.first = placeKeysInPool(i, keys_size, keys, keys_pool); + } + + static void insertKeyIntoColumns(const_iterator & it, ColumnPlainPtrs & key_columns, size_t keys_size, const Sizes & key_sizes) + { + for (size_t i = 0; i < keys_size; ++i) + key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size); + } +}; + + +class Aggregator; struct AggregatedDataVariants : private boost::noncopyable { @@ -69,6 +308,9 @@ struct AggregatedDataVariants : private boost::noncopyable * В этом случае, пул не сможет знать, по каким смещениям хранятся объекты. */ Aggregator * aggregator = nullptr; + + size_t keys_size; /// Количество ключей NOTE нужно ли это поле? + Sizes key_sizes; /// Размеры ключей, если ключи фиксированной длины /// Пулы для состояний агрегатных функций. Владение потом будет передано в ColumnAggregateFunction. Arenas aggregates_pools; @@ -78,34 +320,21 @@ struct AggregatedDataVariants : private boost::noncopyable */ AggregatedDataWithoutKey without_key = nullptr; - /// Специализация для случая, когда есть один числовой ключ. - /// unique_ptr - для ленивой инициализации (так как иначе HashMap в конструкторе выделяет и зануляет слишком много памяти). - std::unique_ptr key64; - - /// Специализация для случая, когда есть один строковый ключ. - std::unique_ptr key_string; - Arena string_pool; - - size_t keys_size; /// Количество ключей - Sizes key_sizes; /// Размеры ключей, если ключи фиксированной длины - - /// Специализация для случая, когда ключи фискированной длины помещаются в 128 бит. - std::unique_ptr keys128; - - /** Агрегирует по 128 битному хэшу от ключа. - * (При этом, строки, содержащие нули посередине, могут склеиться.) - */ - std::unique_ptr hashed; - Arena keys_pool; + std::unique_ptr key64; + std::unique_ptr key_string; + std::unique_ptr key_fixed_string; + std::unique_ptr keys128; + std::unique_ptr hashed; enum Type { - EMPTY = 0, - WITHOUT_KEY = 1, - KEY_64 = 2, - KEY_STRING = 3, - KEYS_128 = 4, - HASHED = 5, + EMPTY = 0, + WITHOUT_KEY = 1, + KEY_64 = 2, + KEY_STRING = 3, + KEY_FIXED_STRING = 4, + KEYS_128 = 5, + HASHED = 6, }; Type type = EMPTY; @@ -120,12 +349,13 @@ struct AggregatedDataVariants : private boost::noncopyable switch (type) { - case EMPTY: break; - case WITHOUT_KEY: break; - case KEY_64: key64 .reset(new AggregatedDataWithUInt64Key); break; - case KEY_STRING: key_string .reset(new AggregatedDataWithStringKey); break; - case KEYS_128: keys128 .reset(new AggregatedDataWithKeys128); break; - case HASHED: hashed .reset(new AggregatedDataHashed); break; + case EMPTY: break; + case WITHOUT_KEY: break; + case KEY_64: key64 .reset(new AggregationMethodKey64); break; + case KEY_STRING: key_string .reset(new AggregationMethodString); break; + case KEY_FIXED_STRING: key_fixed_string.reset(new AggregationMethodFixedString); break; + case KEYS_128: keys128 .reset(new AggregationMethodKeys128); break; + case HASHED: hashed .reset(new AggregationMethodHashed); break; default: throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -136,12 +366,13 @@ struct AggregatedDataVariants : private boost::noncopyable { switch (type) { - case EMPTY: return 0; - case WITHOUT_KEY: return 1; - case KEY_64: return key64->size() + (without_key != nullptr); - case KEY_STRING: return key_string->size() + (without_key != nullptr); - case KEYS_128: return keys128->size() + (without_key != nullptr); - case HASHED: return hashed->size() + (without_key != nullptr); + case EMPTY: return 0; + case WITHOUT_KEY: return 1; + case KEY_64: return key64->data.size() + (without_key != nullptr); + case KEY_STRING: return key_string->data.size() + (without_key != nullptr); + case KEY_FIXED_STRING: return key_fixed_string->data.size() + (without_key != nullptr); + case KEYS_128: return keys128->data.size() + (without_key != nullptr); + case HASHED: return hashed->data.size() + (without_key != nullptr); default: throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -152,12 +383,13 @@ struct AggregatedDataVariants : private boost::noncopyable { switch (type) { - case EMPTY: return "EMPTY"; - case WITHOUT_KEY: return "WITHOUT_KEY"; - case KEY_64: return "KEY_64"; - case KEY_STRING: return "KEY_STRING"; - case KEYS_128: return "KEYS_128"; - case HASHED: return "HASHED"; + case EMPTY: return "EMPTY"; + case WITHOUT_KEY: return "WITHOUT_KEY"; + case KEY_64: return "KEY_64"; + case KEY_STRING: return "KEY_STRING"; + case KEY_FIXED_STRING: return "KEY_FIXED_STRING"; + case KEYS_128: return "KEYS_128"; + case HASHED: return "HASHED"; default: throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -177,7 +409,7 @@ public: Aggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW) : keys(keys_), aggregates(aggregates_), aggregates_size(aggregates.size()), - overflow_row(overflow_row_), total_size_of_aggregate_states(0), all_aggregates_has_trivial_destructor(false), initialized(false), + overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), log(&Logger::get("Aggregator")) { @@ -189,7 +421,7 @@ public: Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW) : key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()), - overflow_row(overflow_row_), total_size_of_aggregate_states(0), all_aggregates_has_trivial_destructor(false), initialized(false), + overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), log(&Logger::get("Aggregator")) { @@ -238,11 +470,11 @@ protected: bool overflow_row; Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций. - size_t total_size_of_aggregate_states; /// Суммарный размер строки из агрегатных функций. - bool all_aggregates_has_trivial_destructor; + size_t total_size_of_aggregate_states = 0; /// Суммарный размер строки из агрегатных функций. + bool all_aggregates_has_trivial_destructor = false; /// Для инициализации от первого блока при конкуррентном использовании. - bool initialized; + bool initialized = false; Poco::FastMutex mutex; size_t max_rows_to_group_by; @@ -264,6 +496,52 @@ protected: * Используется в обработчике исключений при агрегации, так как RAII в данном случае не применим. */ void destroyAggregateStates(AggregatedDataVariants & result); + + + typedef std::vector AggregateColumns; + typedef std::vector AggregateColumnsData; + + template + void executeImpl( + Method & method, + Arena * aggregates_pool, + size_t rows, + ConstColumnPlainPtrs & key_columns, + AggregateColumns & aggregate_columns, + const Sizes & key_sizes, + StringRefs & keys, + bool no_more_keys, + AggregateDataPtr overflow_row) const; + + template + void convertToBlockImpl( + Method & method, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row, + bool final) const; + + template + void mergeDataImpl( + Method & method_dst, + Method & method_src) const; + + template + void mergeStreamsImpl( + Method & method, + Arena * aggregates_pool, + size_t start_row, + size_t rows, + ConstColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + const Sizes & key_sizes, + StringRefs & keys) const; + + template + void destroyImpl( + Method & method) const; }; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index f4ff78c934d..e68157fc365 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -3,9 +3,6 @@ #include #include -#include -#include -#include #include #include @@ -19,7 +16,16 @@ namespace DB AggregatedDataVariants::~AggregatedDataVariants() { if (aggregator && !aggregator->all_aggregates_has_trivial_destructor) - aggregator->destroyAggregateStates(*this); + { + try + { + aggregator->destroyAggregateStates(*this); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } } @@ -85,13 +91,13 @@ void Aggregator::initialize(Block & block) DataTypes argument_types(arguments_size); for (size_t j = 0; j < arguments_size; ++j) argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type; - + col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types, aggregates[i].parameters); col.column = new ColumnAggregateFunction(aggregates[i].function); sample.insert(col); } - } + } } @@ -119,26 +125,205 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu /// Если есть один числовой ключ, который помещается в 64 бита if (keys_size == 1 && key_columns[0]->isNumeric()) - { - return AggregatedDataVariants::KEY_64; - } /// Если ключи помещаются в 128 бит, будем использовать хэш-таблицу по упакованным в 128-бит ключам if (keys_fit_128_bits) return AggregatedDataVariants::KEYS_128; /// Если есть один строковый ключ, то используем хэш-таблицу с ним - if (keys_size == 1 - && (dynamic_cast(key_columns[0]) || dynamic_cast(key_columns[0]) - || dynamic_cast(key_columns[0]))) + if (keys_size == 1 && dynamic_cast(key_columns[0])) return AggregatedDataVariants::KEY_STRING; + if (keys_size == 1 && dynamic_cast(key_columns[0])) + return AggregatedDataVariants::KEY_FIXED_STRING; + /// Иначе будем агрегировать по хэшу от ключей. return AggregatedDataVariants::HASHED; } +template +void Aggregator::executeImpl( + Method & method, + Arena * aggregates_pool, + size_t rows, + ConstColumnPlainPtrs & key_columns, + AggregateColumns & aggregate_columns, + const Sizes & key_sizes, + StringRefs & keys, + bool no_more_keys, + AggregateDataPtr overflow_row) const +{ + method.init(key_columns); + + /// Для всех строчек. + for (size_t i = 0; i < rows; ++i) + { + typename Method::iterator it; + bool inserted; /// Вставили новый ключ, или такой ключ уже был? + bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys. + + /// Получаем ключ для вставки в хэш-таблицу. + typename Method::Key key = method.getKey(key_columns, keys_size, i, key_sizes, keys); + + if (!no_more_keys) /// Вставляем. + method.data.emplace(key, it, inserted); + else + { + /// Будем добавлять только если ключ уже есть. + inserted = false; + it = method.data.find(key); + if (method.data.end() == it) + overflow = true; + } + + /// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего. + if (overflow && !overflow_row) + continue; + + /// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключём. + if (inserted) + { + method.onNewKey(it, keys_size, i, keys); + + AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second); + aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states); + + for (size_t j = 0; j < aggregates_size; ++j) + aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]); + } + + AggregateDataPtr value = !overflow ? Method::getAggregateData(it->second) : overflow_row; + + /// Добавляем значения в агрегатные функции. + for (size_t j = 0; j < aggregates_size; ++j) + aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); + } +} + + +template +void Aggregator::convertToBlockImpl( + Method & method, + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + size_t start_row, + bool final) const +{ + if (!final) + { + size_t j = start_row; + for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j) + { + method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); + + for (size_t i = 0; i < aggregates_size; ++i) + (*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]; + } + } + else + { + for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it) + { + method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes); + + for (size_t i = 0; i < aggregates_size; ++i) + aggregate_functions[i]->insertResultInto( + Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], + *final_aggregate_columns[i]); + } + } +} + + +template +void Aggregator::mergeDataImpl( + Method & method_dst, + Method & method_src) const +{ + for (typename Method::iterator it = method_src.data.begin(); it != method_src.data.end(); ++it) + { + typename Method::iterator res_it; + bool inserted; + method_dst.data.emplace(it->first, res_it, inserted); + + if (!inserted) + { + for (size_t i = 0; i < aggregates_size; ++i) + { + aggregate_functions[i]->merge( + Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i], + Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); + + aggregate_functions[i]->destroy( + Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); + } + } + else + { + res_it->second = it->second; + } + } +} + + +template +void Aggregator::mergeStreamsImpl( + Method & method, + Arena * aggregates_pool, + size_t start_row, + size_t rows, + ConstColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + const Sizes & key_sizes, + StringRefs & keys) const +{ + method.init(key_columns); + + /// Для всех строчек. + for (size_t i = start_row; i < rows; ++i) + { + typename Method::iterator it; + bool inserted; /// Вставили новый ключ, или такой ключ уже был? + + /// Получаем ключ для вставки в хэш-таблицу. + typename Method::Key key = method.getKey(key_columns, keys_size, i, key_sizes, keys); + + method.data.emplace(key, it, inserted); + + if (inserted) + { + method.onNewKey(it, keys_size, i, keys); + + AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second); + aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states); + + for (size_t j = 0; j < aggregates_size; ++j) + aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]); + } + + /// Мерджим состояния агрегатных функций. + for (size_t j = 0; j < aggregates_size; ++j) + aggregate_functions[j]->merge( + Method::getAggregateData(it->second) + offsets_of_aggregate_states[j], + (*aggregate_columns[j])[i]); + } +} + + +template +void Aggregator::destroyImpl( + Method & method) const +{ + for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it) + for (size_t i = 0; i < aggregates_size; ++i) + aggregate_functions[i]->destroy(Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); +} + + /** Результат хранится в оперативке и должен полностью помещаться в оперативку. */ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & result) @@ -146,7 +331,6 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re StringRefs key(keys_size); ConstColumnPlainPtrs key_columns(keys_size); - typedef std::vector AggregateColumns; AggregateColumns aggregate_columns(aggregates_size); /** Используется, если есть ограничение на максимальное количество строк при агрегации, @@ -244,227 +428,24 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re } } } - + + AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr; + if (result.type == AggregatedDataVariants::KEY_64) - { - AggregatedDataWithUInt64Key & res = *result.key64; - const IColumn & column = *key_columns[0]; - - /// Для всех строчек - for (size_t i = 0; i < rows; ++i) - { - /// Строим ключ - UInt64 key = column.get64(i); - - AggregatedDataWithUInt64Key::iterator it; - bool inserted; - bool overflow = false; - - if (!no_more_keys) - res.emplace(key, it, inserted); - else - { - inserted = false; - it = res.find(key); - if (res.end() == it) - overflow = true; - } - - if (overflow && !overflow_row) - continue; - - if (inserted) - { - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - AggregateDataPtr value = overflow ? result.without_key : it->second; - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); - } - } + executeImpl(*result.key64, result.aggregates_pool, rows, key_columns, aggregate_columns, + result.key_sizes, key, no_more_keys, overflow_row_ptr); else if (result.type == AggregatedDataVariants::KEY_STRING) - { - AggregatedDataWithStringKey & res = *result.key_string; - const IColumn & column = *key_columns[0]; - - if (const ColumnString * column_string = dynamic_cast(&column)) - { - const ColumnString::Offsets_t & offsets = column_string->getOffsets(); - const ColumnString::Chars_t & data = column_string->getChars(); - - /// Для всех строчек - for (size_t i = 0; i < rows; ++i) - { - /// Строим ключ - StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1); - - AggregatedDataWithStringKey::iterator it; - bool inserted; - bool overflow = false; - - if (!no_more_keys) - res.emplace(ref, it, inserted); - else - { - inserted = false; - it = res.find(ref); - if (res.end() == it) - overflow = true; - } - - if (overflow && !overflow_row) - continue; - - if (inserted) - { - it->first.data = result.string_pool.insert(ref.data, ref.size); - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - AggregateDataPtr value = overflow ? result.without_key : it->second; - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); - } - } - else if (const ColumnFixedString * column_string = dynamic_cast(&column)) - { - size_t n = column_string->getN(); - const ColumnFixedString::Chars_t & data = column_string->getChars(); - - /// Для всех строчек - for (size_t i = 0; i < rows; ++i) - { - /// Строим ключ - StringRef ref(&data[i * n], n); - - AggregatedDataWithStringKey::iterator it; - bool inserted; - bool overflow = false; - - if (!no_more_keys) - res.emplace(ref, it, inserted); - else - { - inserted = false; - it = res.find(ref); - if (res.end() == it) - overflow = true; - } - - if (overflow && !overflow_row) - continue; - - if (inserted) - { - it->first.data = result.string_pool.insert(ref.data, ref.size); - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - AggregateDataPtr value = overflow ? result.without_key : it->second; - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); - } - } - else - throw Exception("Illegal type of column when aggregating by string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN); - } + executeImpl(*result.key_string, result.aggregates_pool, rows, key_columns, aggregate_columns, + result.key_sizes, key, no_more_keys, overflow_row_ptr); + else if (result.type == AggregatedDataVariants::KEY_FIXED_STRING) + executeImpl(*result.key_fixed_string, result.aggregates_pool, rows, key_columns, aggregate_columns, + result.key_sizes, key, no_more_keys, overflow_row_ptr); else if (result.type == AggregatedDataVariants::KEYS_128) - { - AggregatedDataWithKeys128 & res = *result.keys128; - - /// Для всех строчек - for (size_t i = 0; i < rows; ++i) - { - AggregatedDataWithKeys128::iterator it; - bool inserted; - bool overflow = false; - UInt128 key128 = pack128(i, keys_size, key_columns, key_sizes); - - if (!no_more_keys) - res.emplace(key128, it, inserted); - else - { - inserted = false; - it = res.find(key128); - if (res.end() == it) - overflow = true; - } - - if (overflow && !overflow_row) - continue; - - if (inserted) - { - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - AggregateDataPtr value = overflow ? result.without_key : it->second; - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); - } - } + executeImpl(*result.keys128, result.aggregates_pool, rows, key_columns, aggregate_columns, + result.key_sizes, key, no_more_keys, overflow_row_ptr); else if (result.type == AggregatedDataVariants::HASHED) - { - AggregatedDataHashed & res = *result.hashed; - - /// Для всех строчек - for (size_t i = 0; i < rows; ++i) - { - AggregatedDataHashed::iterator it; - bool inserted; - bool overflow = false; - UInt128 key128 = hash128(i, keys_size, key_columns, key); - - if (!no_more_keys) - res.emplace(key128, it, inserted); - else - { - inserted = false; - it = res.find(key128); - if (res.end() == it) - overflow = true; - } - - if (overflow && !overflow_row) - continue; - - if (inserted) - { - it->second.first = placeKeysInPool(i, keys_size, key, result.keys_pool); - it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second.second + offsets_of_aggregate_states[j]); - } - - AggregateDataPtr value = overflow ? result.without_key : it->second.second; - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->add(value + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); - } - } + executeImpl(*result.hashed, result.aggregates_pool, rows, key_columns, aggregate_columns, + result.key_sizes, key, no_more_keys, overflow_row_ptr); else if (result.type != AggregatedDataVariants::WITHOUT_KEY) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -506,10 +487,8 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi if (data_variants.empty()) return Block(); - typedef std::vector AggregateColumns; - ColumnPlainPtrs key_columns(keys_size); - AggregateColumns aggregate_columns(aggregates_size); + AggregateColumnsData aggregate_columns(aggregates_size); ColumnPlainPtrs final_aggregate_columns(aggregates_size); for (size_t i = 0; i < keys_size; ++i) @@ -561,130 +540,20 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi size_t start_row = overflow_row ? 1 : 0; if (data_variants.type == AggregatedDataVariants::KEY_64) - { - AggregatedDataWithUInt64Key & data = *data_variants.key64; - - IColumn & first_column = *key_columns[0]; - - size_t j = start_row; - - if (!final) - { - for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - first_column.insertData(reinterpret_cast(&it->first), sizeof(it->first)); - - for (size_t i = 0; i < aggregates_size; ++i) - (*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i]; - } - } - else - { - for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - first_column.insertData(reinterpret_cast(&it->first), sizeof(it->first)); - - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->insertResultInto(it->second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - } - } - } + convertToBlockImpl(*data_variants.key64, key_columns, aggregate_columns, + final_aggregate_columns, data_variants.key_sizes, start_row, final); else if (data_variants.type == AggregatedDataVariants::KEY_STRING) - { - AggregatedDataWithStringKey & data = *data_variants.key_string; - IColumn & first_column = *key_columns[0]; - - size_t j = start_row; - - if (!final) - { - for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - first_column.insertData(it->first.data, it->first.size); - - for (size_t i = 0; i < aggregates_size; ++i) - (*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i]; - } - } - else - { - for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - first_column.insertData(it->first.data, it->first.size); - - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->insertResultInto(it->second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - } - } - } + convertToBlockImpl(*data_variants.key_string, key_columns, aggregate_columns, + final_aggregate_columns, data_variants.key_sizes, start_row, final); + else if (data_variants.type == AggregatedDataVariants::KEY_FIXED_STRING) + convertToBlockImpl(*data_variants.key_fixed_string, key_columns, aggregate_columns, + final_aggregate_columns, data_variants.key_sizes, start_row, final); else if (data_variants.type == AggregatedDataVariants::KEYS_128) - { - AggregatedDataWithKeys128 & data = *data_variants.keys128; - - size_t j = start_row; - - if (!final) - { - for (AggregatedDataWithKeys128::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - size_t offset = 0; - for (size_t i = 0; i < keys_size; ++i) - { - size_t size = data_variants.key_sizes[i]; - key_columns[i]->insertData(reinterpret_cast(&it->first) + offset, size); - offset += size; - } - - for (size_t i = 0; i < aggregates_size; ++i) - (*aggregate_columns[i])[j] = it->second + offsets_of_aggregate_states[i]; - } - } - else - { - for (AggregatedDataWithKeys128::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - size_t offset = 0; - for (size_t i = 0; i < keys_size; ++i) - { - size_t size = data_variants.key_sizes[i]; - key_columns[i]->insertData(reinterpret_cast(&it->first) + offset, size); - offset += size; - } - - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->insertResultInto(it->second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - } - } - } + convertToBlockImpl(*data_variants.keys128, key_columns, aggregate_columns, + final_aggregate_columns, data_variants.key_sizes, start_row, final); else if (data_variants.type == AggregatedDataVariants::HASHED) - { - AggregatedDataHashed & data = *data_variants.hashed; - - size_t j = start_row; - - if (!final) - { - for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - for (size_t i = 0; i < keys_size; ++i) - key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size); - - for (size_t i = 0; i < aggregates_size; ++i) - (*aggregate_columns[i])[j] = it->second.second + offsets_of_aggregate_states[i]; - } - } - else - { - for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it, ++j) - { - for (size_t i = 0; i < keys_size; ++i) - key_columns[i]->insertDataWithTerminatingZero(it->second.first[i].data, it->second.first[i].size); - - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->insertResultInto(it->second.second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - } - } - } + convertToBlockImpl(*data_variants.hashed, key_columns, aggregate_columns, + final_aggregate_columns, data_variants.key_sizes, start_row, final); else if (data_variants.type != AggregatedDataVariants::WITHOUT_KEY) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -755,103 +624,17 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]); } } - + if (res->type == AggregatedDataVariants::KEY_64) - { - AggregatedDataWithUInt64Key & res_data = *res->key64; - AggregatedDataWithUInt64Key & current_data = *current.key64; - - for (AggregatedDataWithUInt64Key::const_iterator it = current_data.begin(); it != current_data.end(); ++it) - { - AggregatedDataWithUInt64Key::iterator res_it; - bool inserted; - res_data.emplace(it->first, res_it, inserted); - - if (!inserted) - { - for (size_t i = 0; i < aggregates_size; ++i) - { - aggregate_functions[i]->merge(res_it->second + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i]); - aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - } - } - else - res_it->second = it->second; - } - } + mergeDataImpl(*res->key64, *current.key64); else if (res->type == AggregatedDataVariants::KEY_STRING) - { - AggregatedDataWithStringKey & res_data = *res->key_string; - AggregatedDataWithStringKey & current_data = *current.key_string; - - for (AggregatedDataWithStringKey::const_iterator it = current_data.begin(); it != current_data.end(); ++it) - { - AggregatedDataWithStringKey::iterator res_it; - bool inserted; - res_data.emplace(it->first, res_it, inserted); - - if (!inserted) - { - for (size_t i = 0; i < aggregates_size; ++i) - { - aggregate_functions[i]->merge(res_it->second + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i]); - aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - } - } - else - res_it->second = it->second; - } - } + mergeDataImpl(*res->key_string, *current.key_string); + else if (res->type == AggregatedDataVariants::KEY_FIXED_STRING) + mergeDataImpl(*res->key_fixed_string, *current.key_fixed_string); else if (res->type == AggregatedDataVariants::KEYS_128) - { - AggregatedDataWithKeys128 & res_data = *res->keys128; - AggregatedDataWithKeys128 & current_data = *current.keys128; - - for (AggregatedDataWithKeys128::iterator it = current_data.begin(); it != current_data.end(); ++it) - { - AggregatedDataWithKeys128::iterator res_it; - bool inserted; - res_data.emplace(it->first, res_it, inserted); - - if (!inserted) - { - for (size_t i = 0; i < aggregates_size; ++i) - { - aggregate_functions[i]->merge(res_it->second + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i]); - aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - } - } - else - { - res_it->second = it->second; - } - } - } + mergeDataImpl(*res->keys128, *current.keys128); else if (res->type == AggregatedDataVariants::HASHED) - { - AggregatedDataHashed & res_data = *res->hashed; - AggregatedDataHashed & current_data = *current.hashed; - - for (AggregatedDataHashed::iterator it = current_data.begin(); it != current_data.end(); ++it) - { - AggregatedDataHashed::iterator res_it; - bool inserted; - res_data.emplace(it->first, res_it, inserted); - - if (!inserted) - { - for (size_t i = 0; i < aggregates_size; ++i) - { - aggregate_functions[i]->merge(res_it->second.second + offsets_of_aggregate_states[i], it->second.second + offsets_of_aggregate_states[i]); - aggregate_functions[i]->destroy(it->second.second + offsets_of_aggregate_states[i]); - } - } - else - { - res_it->second = it->second; - } - } - } + mergeDataImpl(*res->hashed, *current.hashed); else if (res->type != AggregatedDataVariants::WITHOUT_KEY) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -877,9 +660,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu StringRefs key(keys_size); ConstColumnPlainPtrs key_columns(keys_size); - typedef ColumnAggregateFunction::Container_t * AggregateColumn; - typedef std::vector AggregateColumns; - AggregateColumns aggregate_columns(aggregates_size); + AggregateColumnsData aggregate_columns(aggregates_size); Block empty_block; initialize(empty_block); @@ -935,148 +716,15 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu size_t start_row = overflow_row ? 1 : 0; if (result.type == AggregatedDataVariants::KEY_64) - { - AggregatedDataWithUInt64Key & res = *result.key64; - const IColumn & column = *key_columns[0]; - - /// Для всех строчек - for (size_t i = start_row; i < rows; ++i) - { - /// Строим ключ - UInt64 key = column.get64(i); - - AggregatedDataWithUInt64Key::iterator it; - bool inserted; - res.emplace(key, it, inserted); - - if (inserted) - { - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]); - } - } + mergeStreamsImpl(*result.key64, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key); else if (result.type == AggregatedDataVariants::KEY_STRING) - { - AggregatedDataWithStringKey & res = *result.key_string; - const IColumn & column = *key_columns[0]; - - if (const ColumnString * column_string = dynamic_cast(&column)) - { - const ColumnString::Offsets_t & offsets = column_string->getOffsets(); - const ColumnString::Chars_t & data = column_string->getChars(); - - /// Для всех строчек - for (size_t i = start_row; i < rows; ++i) - { - /// Строим ключ - StringRef ref(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1); - - AggregatedDataWithStringKey::iterator it; - bool inserted; - res.emplace(ref, it, inserted); - - if (inserted) - { - it->first.data = result.string_pool.insert(ref.data, ref.size); - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]); - } - } - else if (const ColumnFixedString * column_string = dynamic_cast(&column)) - { - size_t n = column_string->getN(); - const ColumnFixedString::Chars_t & data = column_string->getChars(); - - /// Для всех строчек - for (size_t i = start_row; i < rows; ++i) - { - /// Строим ключ - StringRef ref(&data[i * n], n); - - AggregatedDataWithStringKey::iterator it; - bool inserted; - res.emplace(ref, it, inserted); - - if (inserted) - { - it->first.data = result.string_pool.insert(ref.data, ref.size); - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]); - } - } - else - throw Exception("Illegal type of column when aggregating by string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN); - } + mergeStreamsImpl(*result.key_string, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key); + else if (result.type == AggregatedDataVariants::KEY_FIXED_STRING) + mergeStreamsImpl(*result.key_fixed_string, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key); else if (result.type == AggregatedDataVariants::KEYS_128) - { - AggregatedDataWithKeys128 & res = *result.keys128; - - /// Для всех строчек - for (size_t i = start_row; i < rows; ++i) - { - AggregatedDataWithKeys128::iterator it; - bool inserted; - res.emplace(pack128(i, keys_size, key_columns, key_sizes), it, inserted); - - if (inserted) - { - it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); - } - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->merge(it->second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]); - } - } + mergeStreamsImpl(*result.keys128, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key); else if (result.type == AggregatedDataVariants::HASHED) - { - AggregatedDataHashed & res = *result.hashed; - - /// Для всех строчек - for (size_t i = start_row; i < rows; ++i) - { - AggregatedDataHashed::iterator it; - bool inserted; - res.emplace(hash128(i, keys_size, key_columns, key), it, inserted); - - if (inserted) - { - it->second.first = placeKeysInPool(i, keys_size, key, result.keys_pool); - it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states); - - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->create(it->second.second + offsets_of_aggregate_states[j]); - } - - /// Добавляем значения - for (size_t j = 0; j < aggregates_size; ++j) - aggregate_functions[j]->merge(it->second.second + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]); - } - } + mergeStreamsImpl(*result.hashed, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key); else if (result.type != AggregatedDataVariants::WITHOUT_KEY) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -1100,30 +748,19 @@ void Aggregator::destroyAggregateStates(AggregatedDataVariants & result) for (size_t i = 0; i < aggregates_size; ++i) aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); } + if (result.type == AggregatedDataVariants::KEY_64) - { - AggregatedDataWithUInt64Key & res_data = *result.key64; - - for (AggregatedDataWithUInt64Key::const_iterator it = res_data.begin(); it != res_data.end(); ++it) - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - } + destroyImpl(*result.key64); else if (result.type == AggregatedDataVariants::KEY_STRING) - { - AggregatedDataWithStringKey & res_data = *result.key_string; - - for (AggregatedDataWithStringKey::const_iterator it = res_data.begin(); it != res_data.end(); ++it) - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - } + destroyImpl(*result.key_string); + else if (result.type == AggregatedDataVariants::KEY_FIXED_STRING) + destroyImpl(*result.key_fixed_string); + else if (result.type == AggregatedDataVariants::KEYS_128) + destroyImpl(*result.keys128); else if (result.type == AggregatedDataVariants::HASHED) - { - AggregatedDataHashed & res_data = *result.hashed; - - for (AggregatedDataHashed::iterator it = res_data.begin(); it != res_data.end(); ++it) - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i]->destroy(it->second.second + offsets_of_aggregate_states[i]); - } + destroyImpl(*result.hashed); + else if (result.type != AggregatedDataVariants::WITHOUT_KEY) + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); } diff --git a/dbms/src/Interpreters/SplittingAggregator.cpp b/dbms/src/Interpreters/SplittingAggregator.cpp index bf80db339f0..b9f8cc99138 100644 --- a/dbms/src/Interpreters/SplittingAggregator.cpp +++ b/dbms/src/Interpreters/SplittingAggregator.cpp @@ -14,7 +14,7 @@ namespace DB void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedDataVariants & results) { - Stopwatch watch; + //Stopwatch watch; /// Читаем все данные while (Block block = stream->read()) @@ -49,17 +49,14 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData /// Каким способом выполнять агрегацию? if (method == AggregatedDataVariants::EMPTY) - { method = chooseAggregationMethod(key_columns, key_sizes); - // LOG_TRACE(log, "Aggregation method: " << result.getMethodName()); - } /// Подготавливаем массивы, куда будут складываться ключи или хэши от ключей. if (method == AggregatedDataVariants::KEY_64) { keys64.resize(rows); } - else if (method == AggregatedDataVariants::KEY_STRING) + else if (method == AggregatedDataVariants::KEY_STRING || method == AggregatedDataVariants::KEY_FIXED_STRING) { hashes64.resize(rows); string_refs.resize(rows); @@ -93,9 +90,9 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData /// Параллельно вычисляем хэши и ключи. - LOG_TRACE(log, "Calculating keys and hashes."); + // LOG_TRACE(log, "Calculating keys and hashes."); - watch.start(); + // watch.start(); for (size_t thread_no = 0; thread_no < threads; ++thread_no) pool.schedule(boost::bind(&SplittingAggregator::calculateHashesThread, this, @@ -109,12 +106,12 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData rethrowFirstException(exceptions); - LOG_TRACE(log, "Calculated keys and hashes in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec."); - watch.restart(); + // LOG_TRACE(log, "Calculated keys and hashes in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec."); + // watch.restart(); /// Параллельно агрегируем в независимые хэш-таблицы - LOG_TRACE(log, "Parallel aggregating."); + // LOG_TRACE(log, "Parallel aggregating."); for (size_t thread_no = 0; thread_no < threads; ++thread_no) pool.schedule(boost::bind(&SplittingAggregator::aggregateThread, this, @@ -128,20 +125,13 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData rethrowFirstException(exceptions); - LOG_TRACE(log, "Parallel aggregated in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec."); + // LOG_TRACE(log, "Parallel aggregated in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec."); /// Проверка ограничений if (max_rows_to_group_by && size_of_all_results > max_rows_to_group_by && group_by_overflow_mode == OverflowMode::BREAK) break; } - -/* double elapsed_seconds = watch.elapsedSeconds(); - size_t rows = result.size(); - LOG_TRACE(log, std::fixed << std::setprecision(3) - << "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)" - << " in " << elapsed_seconds << " sec." - << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");*/ } @@ -188,33 +178,32 @@ void SplittingAggregator::calculateHashesThread(Block & block, size_t begin, siz else if (method == AggregatedDataVariants::KEY_STRING) { const IColumn & column = *key_columns[0]; + const ColumnString & column_string = dynamic_cast(column); - if (const ColumnString * column_string = dynamic_cast(&column)) + const ColumnString::Offsets_t & offsets = column_string.getOffsets(); + const ColumnString::Chars_t & data = column_string.getChars(); + + for (size_t i = begin; i < end; ++i) { - const ColumnString::Offsets_t & offsets = column_string->getOffsets(); - const ColumnString::Chars_t & data = column_string->getChars(); - - for (size_t i = begin; i < end; ++i) - { - string_refs[i] = StringRef(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1); - hashes64[i] = hash_func_string(string_refs[i]); - thread_nums[i] = (hashes64[i] >> 32) % threads; - } + string_refs[i] = StringRef(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1); + hashes64[i] = hash_func_string(string_refs[i]); + thread_nums[i] = (hashes64[i] >> 32) % threads; } - else if (const ColumnFixedString * column_string = dynamic_cast(&column)) + } + else if (method == AggregatedDataVariants::KEY_FIXED_STRING) + { + const IColumn & column = *key_columns[0]; + const ColumnFixedString & column_string = dynamic_cast(column); + + size_t n = column_string.getN(); + const ColumnFixedString::Chars_t & data = column_string.getChars(); + + for (size_t i = begin; i < end; ++i) { - size_t n = column_string->getN(); - const ColumnFixedString::Chars_t & data = column_string->getChars(); - - for (size_t i = begin; i < end; ++i) - { - string_refs[i] = StringRef(&data[i * n], n); - hashes64[i] = hash_func_string(string_refs[i]); - thread_nums[i] = (hashes64[i] >> 32) % threads; - } + string_refs[i] = StringRef(&data[i * n], n); + hashes64[i] = hash_func_string(string_refs[i]); + thread_nums[i] = (hashes64[i] >> 32) % threads; } - else - throw Exception("Illegal type of column when aggregating by string key: " + column.getName(), ErrorCodes::ILLEGAL_COLUMN); } else if (method == AggregatedDataVariants::KEYS_128) { @@ -261,7 +250,7 @@ void SplittingAggregator::aggregateThread( if (method == AggregatedDataVariants::KEY_64) { - AggregatedDataWithUInt64Key & res = *result.key64; + AggregatedDataWithUInt64Key & res = result.key64->data; for (size_t i = 0; i < rows; ++i) { @@ -299,7 +288,7 @@ void SplittingAggregator::aggregateThread( } else if (method == AggregatedDataVariants::KEY_STRING) { - AggregatedDataWithStringKey & res = *result.key_string; + AggregatedDataWithStringKey & res = result.key_string->data; for (size_t i = 0; i < rows; ++i) { @@ -323,7 +312,45 @@ void SplittingAggregator::aggregateThread( if (inserted) { - it->first.data = result.string_pool.insert(ref.data, ref.size); + it->first.data = result.key_string->string_pool.insert(ref.data, ref.size); + it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); + + for (size_t j = 0; j < aggregates_size; ++j) + aggregate_functions[j]->create(it->second + offsets_of_aggregate_states[j]); + } + + /// Добавляем значения + for (size_t j = 0; j < aggregates_size; ++j) + aggregate_functions[j]->add(it->second + offsets_of_aggregate_states[j], &aggregate_columns[j][0], i); + } + } + else if (method == AggregatedDataVariants::KEY_FIXED_STRING) + { + AggregatedDataWithStringKey & res = result.key_fixed_string->data; + + for (size_t i = 0; i < rows; ++i) + { + if (thread_nums[i] != thread_no) + continue; + + AggregatedDataWithStringKey::iterator it; + bool inserted; + + StringRef ref = string_refs[i]; + + if (!no_more_keys) + res.emplace(ref, it, inserted, hashes64[i]); + else + { + inserted = false; + it = res.find(ref); + if (res.end() == it) + continue; + } + + if (inserted) + { + it->first.data = result.key_fixed_string->string_pool.insert(ref.data, ref.size); it->second = result.aggregates_pool->alloc(total_size_of_aggregate_states); for (size_t j = 0; j < aggregates_size; ++j) @@ -337,7 +364,7 @@ void SplittingAggregator::aggregateThread( } else if (method == AggregatedDataVariants::KEYS_128) { - AggregatedDataWithKeys128 & res = *result.keys128; + AggregatedDataWithKeys128 & res = result.keys128->data; for (size_t i = 0; i < rows; ++i) { @@ -374,7 +401,7 @@ void SplittingAggregator::aggregateThread( else if (method == AggregatedDataVariants::HASHED) { StringRefs key(keys_size); - AggregatedDataHashed & res = *result.hashed; + AggregatedDataHashed & res = result.hashed->data; for (size_t i = 0; i < rows; ++i) { @@ -397,7 +424,7 @@ void SplittingAggregator::aggregateThread( if (inserted) { - it->second.first = extractKeysAndPlaceInPool(i, keys_size, key_columns, key, result.keys_pool); + it->second.first = extractKeysAndPlaceInPool(i, keys_size, key_columns, key, result.hashed->keys_pool); it->second.second = result.aggregates_pool->alloc(total_size_of_aggregate_states); for (size_t j = 0; j < aggregates_size; ++j)