From 83cf1f81ecd63359fbfe12b312377298e3d64bd7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 30 Nov 2015 19:57:05 +0300 Subject: [PATCH 01/14] dbms: Aggregator: preparation [#METR-17000]. --- .../DataStreams/AggregatingBlockInputStream.h | 8 +- .../MergingAggregatedBlockInputStream.h | 6 +- ...ggregatedMemoryEfficientBlockInputStream.h | 3 +- .../ParallelAggregatingBlockInputStream.h | 10 +- dbms/include/DB/Interpreters/Aggregator.h | 83 +++--- ...regatedMemoryEfficientBlockInputStream.cpp | 7 +- dbms/src/Interpreters/Aggregator.cpp | 256 +++++++++--------- .../Interpreters/InterpreterSelectQuery.cpp | 19 +- 8 files changed, 201 insertions(+), 191 deletions(-) diff --git a/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h index be21624dabd..b600d068e59 100644 --- a/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h @@ -22,12 +22,8 @@ public: * Агрегатные функции ищутся везде в выражении. * Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены. */ - AggregatingBlockInputStream(BlockInputStreamPtr input_, const Names & key_names, const AggregateDescriptions & aggregates, - bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, - Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_) - : aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, - compiler_, min_count_to_compile_, group_by_two_level_threshold_), - final(final_) + AggregatingBlockInputStream(BlockInputStreamPtr input_, const Aggregator::Params & params, bool final_) + : aggregator(params), final(final_) { children.push_back(input_); } diff --git a/dbms/include/DB/DataStreams/MergingAggregatedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedBlockInputStream.h index 83f66f341bb..abb5657507e 100644 --- a/dbms/include/DB/DataStreams/MergingAggregatedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingAggregatedBlockInputStream.h @@ -16,10 +16,8 @@ using Poco::SharedPtr; class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream { public: - MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_, - const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_) - : aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0), - final(final_), max_threads(max_threads_) + MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Aggregator::Params & params, bool final_, size_t max_threads_) + : aggregator(params), final(final_), max_threads(max_threads_) { children.push_back(input_); } diff --git a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index 2b331b94a34..472123385c7 100644 --- a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -29,8 +29,7 @@ namespace DB class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream { public: - MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_, - const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_); + MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Aggregator::Params & params, bool final_); String getName() const override { return "MergingAggregatedMemoryEfficient"; } diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 39cf19b7f33..d0777a536a3 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -23,14 +23,10 @@ public: */ ParallelAggregatingBlockInputStream( BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, - const Names & key_names, const AggregateDescriptions & aggregates, - bool overflow_row_, bool final_, size_t max_threads_, - size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, - Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_) - : aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, - compiler_, min_count_to_compile_, group_by_two_level_threshold_), + const Aggregator::Params & params, bool final_, size_t max_threads_) + : aggregator(params), final(final_), max_threads(std::min(inputs.size(), max_threads_)), - keys_size(aggregator.getNumberOfKeys()), aggregates_size(aggregator.getNumberOfAggregates()), + keys_size(params.keys_size), aggregates_size(params.aggregates_size), handler(*this), processor(inputs, additional_input_at_end, max_threads, handler) { children = inputs; diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 264cb525724..0c81cd9206b 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -761,18 +761,55 @@ typedef std::vector ManyAggregatedDataVariants; class Aggregator { public: - Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, - size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_, - size_t group_by_two_level_threshold_) - : key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()), - overflow_row(overflow_row_), - max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), - compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_), + struct Params + { + /// Что считать. + Names key_names; + ColumnNumbers keys; /// Номера столбцов - вычисляются позже. + AggregateDescriptions aggregates; + size_t keys_size; + size_t aggregates_size; + + /// Настройки приближённого вычисления GROUP BY. + bool overflow_row; /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by. + size_t max_rows_to_group_by; + OverflowMode group_by_overflow_mode; + + /// Для динамической компиляции. + Compiler * compiler; + UInt32 min_count_to_compile; + + /// Настройки двухуровневой агрегации (используется для большого количества ключей). + /** При каком количестве ключей, начинает использоваться двухуровневая агрегация. + * 0 - никогда не использовать. + */ + size_t group_by_two_level_threshold; + + Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, + size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_, + size_t group_by_two_level_threshold_) + : key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()), + overflow_row(overflow_row_), + max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), + compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_) + { + std::sort(key_names.begin(), key_names.end()); + key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end()); + keys_size = key_names.size(); + } + + /// Только параметры, имеющие значение при мердже. + Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_) + : Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0) {} + + /// Вычислить номера столбцов в keys и aggregates. + void calculateColumnNumbers(const Block & block); + }; + + Aggregator(const Params & params_) + : params(params_), isCancelled([]() { return false; }) { - std::sort(key_names.begin(), key_names.end()); - key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end()); - keys_size = key_names.size(); } /// Агрегировать источник. Получить результат в виде одной из структур данных. @@ -827,15 +864,11 @@ public: /// Для IBlockInputStream. String getID() const; - size_t getNumberOfKeys() const { return keys_size; } - size_t getNumberOfAggregates() const { return aggregates_size; } - protected: friend struct AggregatedDataVariants; - ColumnNumbers keys; - Names key_names; - AggregateDescriptions aggregates; + Params params; + AggregateFunctionsPlainPtrs aggregate_functions; /** Данный массив служит для двух целей. @@ -857,11 +890,6 @@ protected: using AggregateFunctionInstructions = std::vector; - size_t keys_size; - size_t aggregates_size; - /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by. - bool overflow_row; - Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций. size_t total_size_of_aggregate_states = 0; /// Суммарный размер строки из агрегатных функций. bool all_aggregates_has_trivial_destructor = false; @@ -870,18 +898,10 @@ protected: bool initialized = false; std::mutex mutex; - size_t max_rows_to_group_by; - OverflowMode group_by_overflow_mode; - Block sample; Logger * log = &Logger::get("Aggregator"); - - /** Для динамической компиляции, если предусмотрено. */ - Compiler * compiler = nullptr; - UInt32 min_count_to_compile; - /** Динамически скомпилированная библиотека для агрегации, если есть. * Смысл динамической компиляции в том, чтобы специализировать код * под конкретный список агрегатных функций. @@ -902,11 +922,6 @@ protected: bool compiled_if_possible = false; void compileIfPossible(AggregatedDataVariants::Type type); - /** При каком количестве ключей, начинает использоваться двухуровневая агрегация. - * 0 - никогда не использовать. - */ - size_t group_by_two_level_threshold; - /// Возвращает true, если можно прервать текущую задачу. CancellationHook isCancelled; diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 44a5889338b..cc00f788915 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -6,11 +6,8 @@ namespace DB MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream( - BlockInputStreams inputs_, const Names & keys_names_, - const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_) - : aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0), - final(final_), - inputs(inputs_.begin(), inputs_.end()) + BlockInputStreams inputs_, const Aggregator::Params & params, bool final_) + : aggregator(params), final(final_), inputs(inputs_.begin(), inputs_.end()) { children = inputs_; } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 1c50998c3e3..0b71a7fd002 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -59,6 +59,19 @@ void AggregatedDataVariants::convertToTwoLevel() } +void Aggregator::Params::calculateColumnNumbers(const Block & block) +{ + if (keys.empty() && !key_names.empty()) + for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it) + keys.push_back(block.getPositionByName(*it)); + + for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it) + if (it->arguments.empty() && !it->argument_names.empty()) + for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt) + it->arguments.push_back(block.getPositionByName(*jt)); +} + + void Aggregator::initialize(const Block & block) { if (isCancelled()) @@ -71,21 +84,21 @@ void Aggregator::initialize(const Block & block) initialized = true; - aggregate_functions.resize(aggregates_size); - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_functions[i] = &*aggregates[i].function; + aggregate_functions.resize(params.aggregates_size); + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_functions[i] = params.aggregates[i].function.get(); /// Инициализируем размеры состояний и смещения для агрегатных функций. - offsets_of_aggregate_states.resize(aggregates_size); + offsets_of_aggregate_states.resize(params.aggregates_size); total_size_of_aggregate_states = 0; all_aggregates_has_trivial_destructor = true; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) { offsets_of_aggregate_states[i] = total_size_of_aggregate_states; - total_size_of_aggregate_states += aggregates[i].function->sizeOfData(); + total_size_of_aggregate_states += params.aggregates[i].function->sizeOfData(); - if (!aggregates[i].function->hasTrivialDestructor()) + if (!params.aggregates[i].function->hasTrivialDestructor()) all_aggregates_has_trivial_destructor = false; } @@ -99,14 +112,7 @@ void Aggregator::initialize(const Block & block) return; /// Преобразуем имена столбцов в номера, если номера не заданы - if (keys.empty() && !key_names.empty()) - for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it) - keys.push_back(block.getPositionByName(*it)); - - for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it) - if (it->arguments.empty() && !it->argument_names.empty()) - for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt) - it->arguments.push_back(block.getPositionByName(*jt)); + params.calculateColumnNumbers(block); if (isCancelled()) return; @@ -114,24 +120,24 @@ void Aggregator::initialize(const Block & block) /// Создадим пример блока, описывающего результат if (!sample) { - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) { - sample.insert(block.getByPosition(keys[i]).cloneEmpty()); + sample.insert(block.getByPosition(params.keys[i]).cloneEmpty()); if (auto converted = sample.getByPosition(i).column->convertToFullColumnIfConst()) sample.getByPosition(i).column = converted; } - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) { ColumnWithTypeAndName col; - col.name = aggregates[i].column_name; + col.name = params.aggregates[i].column_name; - size_t arguments_size = aggregates[i].arguments.size(); + size_t arguments_size = params.aggregates[i].arguments.size(); DataTypes argument_types(arguments_size); for (size_t j = 0; j < arguments_size; ++j) - argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type; + argument_types[j] = block.getByPosition(params.aggregates[i].arguments[j]).type; - col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types, aggregates[i].parameters); + col.type = new DataTypeAggregateFunction(params.aggregates[i].function, argument_types, params.aggregates[i].parameters); col.column = col.type->createColumn(); sample.insert(col); @@ -175,7 +181,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type) /// Список типов агрегатных функций. std::stringstream aggregate_functions_typenames_str; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) { IAggregateFunction & func = *aggregate_functions[i]; @@ -306,7 +312,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type) * Если счётчик достигнул значения min_count_to_compile, то асинхронно (в отдельном потоке) запускается компиляция, * по окончании которой вызывается колбэк on_ready. */ - SharedLibraryPtr lib = compiler->getOrCount(key, min_count_to_compile, + SharedLibraryPtr lib = params.compiler->getOrCount(key, params.min_count_to_compile, "-include /usr/share/clickhouse/headers/dbms/include/DB/Interpreters/SpecializedAggregator.h", get_code, on_ready); @@ -329,8 +335,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu bool has_arrays_of_non_fixed_elems = false; bool all_non_array_keys_are_fixed = true; - key_sizes.resize(keys_size); - for (size_t j = 0; j < keys_size; ++j) + key_sizes.resize(params.keys_size); + for (size_t j = 0; j < params.keys_size; ++j) { if (key_columns[j]->isFixed()) { @@ -354,11 +360,11 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu } /// Если ключей нет - if (keys_size == 0) + if (params.keys_size == 0) return AggregatedDataVariants::Type::without_key; /// Если есть один числовой ключ, который помещается в 64 бита - if (keys_size == 1 && key_columns[0]->isNumeric()) + if (params.keys_size == 1 && key_columns[0]->isNumeric()) { size_t size_of_field = key_columns[0]->sizeOfField(); if (size_of_field == 1) @@ -379,10 +385,10 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu return AggregatedDataVariants::Type::keys256; /// Если есть один строковый ключ, то используем хэш-таблицу с ним - if (keys_size == 1 && typeid_cast(key_columns[0])) + if (params.keys_size == 1 && typeid_cast(key_columns[0])) return AggregatedDataVariants::Type::key_string; - if (keys_size == 1 && typeid_cast(key_columns[0])) + if (params.keys_size == 1 && typeid_cast(key_columns[0])) return AggregatedDataVariants::Type::key_fixed_string; /** Если есть массивы. @@ -401,7 +407,7 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const { - for (size_t j = 0; j < aggregates_size; ++j) + for (size_t j = 0; j < params.aggregates_size; ++j) { try { @@ -475,7 +481,7 @@ void NO_INLINE Aggregator::executeImplCase( bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys. /// Получаем ключ для вставки в хэш-таблицу. - typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool); + typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool); if (!no_more_keys) /// Вставляем. { @@ -522,7 +528,7 @@ void NO_INLINE Aggregator::executeImplCase( /// exception-safety - если не удалось выделить память или создать состояния, то не будут вызываться деструкторы. aggregate_data = nullptr; - method.onNewKey(*it, keys_size, i, keys, *aggregates_pool); + method.onNewKey(*it, params.keys_size, i, keys, *aggregates_pool); AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states); createAggregateStates(place); @@ -549,7 +555,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( AggregateFunctionInstruction * aggregate_instructions) const { /// Оптимизация в случае единственной агрегатной функции count. - AggregateFunctionCount * agg_count = aggregates_size == 1 + AggregateFunctionCount * agg_count = params.aggregates_size == 1 ? typeid_cast(aggregate_functions[0]) : NULL; @@ -580,8 +586,8 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, /// result будет уничтожать состояния агрегатных функций в деструкторе result.aggregator = this; - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_columns[i].resize(aggregates[i].arguments.size()); + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_columns[i].resize(params.aggregates[i].arguments.size()); /** Константные столбцы не поддерживаются напрямую при агрегации. * Чтобы они всё-равно работали, материализуем их. @@ -589,9 +595,9 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, Columns materialized_columns; /// Запоминаем столбцы, с которыми будем работать - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) { - key_columns[i] = block.getByPosition(keys[i]).column; + key_columns[i] = block.getByPosition(params.keys[i]).column; if (auto converted = key_columns[i]->convertToFullColumnIfConst()) { @@ -600,14 +606,14 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, } } - AggregateFunctionInstructions aggregate_functions_instructions(aggregates_size + 1); - aggregate_functions_instructions[aggregates_size].that = nullptr; + AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1); + aggregate_functions_instructions[params.aggregates_size].that = nullptr; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) { for (size_t j = 0; j < aggregate_columns[i].size(); ++j) { - aggregate_columns[i][j] = block.getByPosition(aggregates[i].arguments[j]).column; + aggregate_columns[i][j] = block.getByPosition(params.aggregates[i].arguments[j]).column; if (auto converted = aggregate_columns[i][j]->convertToFullColumnIfConst()) { @@ -631,18 +637,18 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, if (result.empty()) { result.init(chooseAggregationMethod(key_columns, key_sizes)); - result.keys_size = keys_size; + result.keys_size = params.keys_size; result.key_sizes = key_sizes; LOG_TRACE(log, "Aggregation method: " << result.getMethodName()); - if (compiler) + if (params.compiler) compileIfPossible(result.type); } if (isCancelled()) return true; - if ((overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key) + if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key) { AggregateDataPtr place = result.aggregates_pool->alloc(total_size_of_aggregate_states); createAggregateStates(place); @@ -667,7 +673,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, else { /// Сюда пишутся данные, не поместившиеся в max_rows_to_group_by при group_by_overflow_mode = any. - AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr; + AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr; bool is_two_level = result.isTwoLevel(); @@ -719,7 +725,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, size_t result_size = result.sizeWithoutOverflowRow(); - if (group_by_two_level_threshold && result.isConvertibleToTwoLevel() && result_size >= group_by_two_level_threshold) + if (params.group_by_two_level_threshold && result.isConvertibleToTwoLevel() && result_size >= params.group_by_two_level_threshold) result.convertToTwoLevel(); /// Проверка ограничений. @@ -732,15 +738,15 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const { - if (!no_more_keys && max_rows_to_group_by && result_size > max_rows_to_group_by) + if (!no_more_keys && params.max_rows_to_group_by && result_size > params.max_rows_to_group_by) { - if (group_by_overflow_mode == OverflowMode::THROW) + if (params.group_by_overflow_mode == OverflowMode::THROW) throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(result_size) - + " rows, maximum: " + toString(max_rows_to_group_by), + + " rows, maximum: " + toString(params.max_rows_to_group_by), ErrorCodes::TOO_MUCH_ROWS); - else if (group_by_overflow_mode == OverflowMode::BREAK) + else if (params.group_by_overflow_mode == OverflowMode::BREAK) return false; - else if (group_by_overflow_mode == OverflowMode::ANY) + else if (params.group_by_overflow_mode == OverflowMode::ANY) no_more_keys = true; else throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); @@ -757,9 +763,9 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re if (isCancelled()) return; - StringRefs key(keys_size); - ConstColumnPlainPtrs key_columns(keys_size); - AggregateColumns aggregate_columns(aggregates_size); + StringRefs key(params.keys_size); + ConstColumnPlainPtrs key_columns(params.keys_size); + AggregateColumns aggregate_columns(params.aggregates_size); Sizes key_sizes; /** Используется, если есть ограничение на максимальное количество строк при агрегации, @@ -827,9 +833,9 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( { for (typename Table::const_iterator it = data.begin(); it != data.end(); ++it) { - method.insertKeyIntoColumns(*it, key_columns, keys_size, key_sizes); + method.insertKeyIntoColumns(*it, key_columns, params.keys_size, key_sizes); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->insertResultInto( Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); @@ -847,9 +853,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( size_t j = 0; for (typename Table::const_iterator it = data.begin(); it != data.end(); ++it, ++j) { - method.insertKeyIntoColumns(*it, key_columns, keys_size, key_sizes); + method.insertKeyIntoColumns(*it, key_columns, params.keys_size, key_sizes); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) (*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]; } } @@ -864,11 +870,11 @@ Block Aggregator::prepareBlockAndFill( { Block res = sample.cloneEmpty(); - ColumnPlainPtrs key_columns(keys_size); - AggregateColumnsData aggregate_columns(aggregates_size); - ColumnPlainPtrs final_aggregate_columns(aggregates_size); + ColumnPlainPtrs key_columns(params.keys_size); + AggregateColumnsData aggregate_columns(params.aggregates_size); + ColumnPlainPtrs final_aggregate_columns(params.aggregates_size); - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) { key_columns[i] = res.getByPosition(i).column; key_columns[i]->reserve(rows); @@ -876,12 +882,12 @@ Block Aggregator::prepareBlockAndFill( try { - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) { if (!final) { /// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций. - ColumnAggregateFunction & column_aggregate_func = static_cast(*res.getByPosition(i + keys_size).column); + ColumnAggregateFunction & column_aggregate_func = static_cast(*res.getByPosition(i + params.keys_size).column); for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j) column_aggregate_func.addArena(data_variants.aggregates_pools[j]); @@ -891,7 +897,7 @@ Block Aggregator::prepareBlockAndFill( } else { - ColumnWithTypeAndName & column = res.getByPosition(i + keys_size); + ColumnWithTypeAndName & column = res.getByPosition(i + params.keys_size); column.type = aggregate_functions[i]->getReturnType(); column.column = column.type->createColumn(); column.column->reserve(rows); @@ -926,7 +932,7 @@ Block Aggregator::prepareBlockAndFill( * а также деструкторы будут вызываться у AggregatedDataVariants. * Поэтому, вручную "откатываем" их. */ - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) if (aggregate_columns[i]) aggregate_columns[i]->clear(); @@ -948,18 +954,18 @@ BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & d const Sizes & key_sizes, bool final) { - if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row) + if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) { AggregatedDataWithoutKey & data = data_variants.without_key; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) if (!final) (*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i]; else aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); - if (overflow_row) - for (size_t i = 0; i < keys_size; ++i) + if (params.overflow_row) + for (size_t i = 0; i < params.keys_size; ++i) key_columns[i]->insertDefault(); } }; @@ -1112,7 +1118,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( { for (auto & block : blocks) { - for (size_t column_num = keys_size; column_num < keys_size + aggregates_size; ++column_num) + for (size_t column_num = params.keys_size; column_num < params.keys_size + params.aggregates_size; ++column_num) { IColumn & col = *block.getByPosition(column_num).column; if (ColumnAggregateFunction * col_aggregate = typeid_cast(&col)) @@ -1151,7 +1157,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b if (isCancelled()) return BlocksList(); - if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row) + if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey( data_variants, final, data_variants.type != AggregatedDataVariants::Type::without_key)); @@ -1173,7 +1179,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b */ for (auto & block : blocks) { - for (size_t column_num = keys_size; column_num < keys_size + aggregates_size; ++column_num) + for (size_t column_num = params.keys_size; column_num < params.keys_size + params.aggregates_size; ++column_num) { IColumn & col = *block.getByPosition(column_num).column; if (ColumnAggregateFunction * col_aggregate = typeid_cast(&col)) @@ -1226,12 +1232,12 @@ void NO_INLINE Aggregator::mergeDataImpl( if (!inserted) { - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i], Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->destroy( Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); } @@ -1259,12 +1265,12 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl( ? overflows : Method::getAggregateData(res_it->second); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( res_data + offsets_of_aggregate_states[i], Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->destroy( Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); @@ -1286,12 +1292,12 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl( AggregateDataPtr res_data = Method::getAggregateData(res_it->second); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( res_data + offsets_of_aggregate_states[i], Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->destroy( Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); @@ -1311,12 +1317,12 @@ void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl( AggregateDataPtr res_data = overflows; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( res_data + offsets_of_aggregate_states[i], Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->destroy( Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); @@ -1336,10 +1342,10 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( AggregatedDataWithoutKey & res_data = res->without_key; AggregatedDataWithoutKey & current_data = non_empty_data[i]->without_key; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i]); - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]); current_data = nullptr; @@ -1451,7 +1457,7 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl( if (task.valid()) task.get_future().get(); - if (no_more_keys && overflow_row) + if (no_more_keys && params.overflow_row) { for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) { @@ -1533,7 +1539,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va } /// В какой структуре данных агрегированы данные? - if (res->type == AggregatedDataVariants::Type::without_key || overflow_row) + if (res->type == AggregatedDataVariants::Type::without_key || params.overflow_row) mergeWithoutKeyDataImpl(non_empty_data); std::unique_ptr thread_pool; @@ -1607,21 +1613,21 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( Table & data, AggregateDataPtr overflow_row) const { - ConstColumnPlainPtrs key_columns(keys_size); - AggregateColumnsData aggregate_columns(aggregates_size); + ConstColumnPlainPtrs key_columns(params.keys_size); + AggregateColumnsData aggregate_columns(params.aggregates_size); /// Запоминаем столбцы, с которыми будем работать - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) key_columns[i] = block.getByPosition(i).column; - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_columns[i] = &typeid_cast(*block.getByPosition(keys_size + i).column).getData(); + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_columns[i] = &typeid_cast(*block.getByPosition(params.keys_size + i).column).getData(); typename Method::State state; state.init(key_columns); /// Для всех строчек. - StringRefs keys(keys_size); + StringRefs keys(params.keys_size); size_t rows = block.rowsInFirstColumn(); for (size_t i = 0; i < rows; ++i) { @@ -1631,7 +1637,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys. /// Получаем ключ для вставки в хэш-таблицу. - auto key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool); + auto key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool); if (!no_more_keys) { @@ -1658,7 +1664,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second); aggregate_data = nullptr; - method.onNewKey(*it, keys_size, i, keys, *aggregates_pool); + method.onNewKey(*it, params.keys_size, i, keys, *aggregates_pool); AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states); createAggregateStates(place); @@ -1670,7 +1676,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row; /// Мерджим состояния агрегатных функций. - for (size_t j = 0; j < aggregates_size; ++j) + for (size_t j = 0; j < params.aggregates_size; ++j) aggregate_functions[j]->merge( value + offsets_of_aggregate_states[j], (*aggregate_columns[j])[i]); @@ -1701,11 +1707,11 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( Block & block, AggregatedDataVariants & result) const { - AggregateColumnsData aggregate_columns(aggregates_size); + AggregateColumnsData aggregate_columns(params.aggregates_size); /// Запоминаем столбцы, с которыми будем работать - for (size_t i = 0; i < aggregates_size; ++i) - aggregate_columns[i] = &typeid_cast(*block.getByPosition(keys_size + i).column).getData(); + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_columns[i] = &typeid_cast(*block.getByPosition(params.keys_size + i).column).getData(); AggregatedDataWithoutKey & res = result.without_key; if (!res) @@ -1716,7 +1722,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( } /// Добавляем значения - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0]); /// Пораньше освобождаем память. @@ -1729,10 +1735,10 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants if (isCancelled()) return; - StringRefs key(keys_size); - ConstColumnPlainPtrs key_columns(keys_size); + StringRefs key(params.keys_size); + ConstColumnPlainPtrs key_columns(params.keys_size); - AggregateColumnsData aggregate_columns(aggregates_size); + AggregateColumnsData aggregate_columns(params.aggregates_size); Block empty_block; initialize(empty_block); @@ -1748,7 +1754,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants using BucketToBlocks = std::map; BucketToBlocks bucket_to_blocks; - /// Читаем все данные. TODO memory-savvy режим, при котором в один момент времени обрабатывается только одна корзина. + /// Читаем все данные. LOG_TRACE(log, "Reading blocks of partially aggregated data."); size_t total_input_rows = 0; @@ -1772,7 +1778,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants sample = bucket_to_blocks.begin()->second.front().cloneEmpty(); /// Каким способом выполнять агрегацию? - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) key_columns[i] = sample.getByPosition(i).column; Sizes key_sizes; @@ -1803,7 +1809,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants result.aggregator = this; result.init(method); - result.keys_size = keys_size; + result.keys_size = params.keys_size; result.key_sizes = key_sizes; bool has_blocks_with_unknown_bucket = bucket_to_blocks.count(-1); @@ -1923,10 +1929,10 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) if (blocks.empty()) return {}; - StringRefs key(keys_size); - ConstColumnPlainPtrs key_columns(keys_size); + StringRefs key(params.keys_size); + ConstColumnPlainPtrs key_columns(params.keys_size); - AggregateColumnsData aggregate_columns(aggregates_size); + AggregateColumnsData aggregate_columns(params.aggregates_size); Block empty_block; initialize(empty_block); @@ -1935,7 +1941,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) sample = blocks.front().cloneEmpty(); /// Каким способом выполнять агрегацию? - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) key_columns[i] = sample.getByPosition(i).column; Sizes key_sizes; @@ -1948,7 +1954,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) result.aggregator = this; result.init(method); - result.keys_size = keys_size; + result.keys_size = params.keys_size; result.key_sizes = key_sizes; LOG_TRACE(log, "Merging partially aggregated blocks."); @@ -2050,7 +2056,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( for (size_t i = 0; i < rows; ++i) { /// Получаем ключ. Вычисляем на его основе номер корзины. - typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *pool); + typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool); auto hash = method.data.hash(key); auto bucket = method.data.getBucketFromHash(hash); @@ -2102,16 +2108,16 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) AggregatedDataVariants data; - StringRefs key(keys_size); - ConstColumnPlainPtrs key_columns(keys_size); + StringRefs key(params.keys_size); + ConstColumnPlainPtrs key_columns(params.keys_size); Sizes key_sizes; /// Запоминаем столбцы, с которыми будем работать - for (size_t i = 0; i < keys_size; ++i) + for (size_t i = 0; i < params.keys_size; ++i) key_columns[i] = block.getByPosition(i).column; AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes); - data.keys_size = keys_size; + data.keys_size = params.keys_size; data.key_sizes = key_sizes; #define M(NAME) \ @@ -2170,7 +2176,7 @@ void NO_INLINE Aggregator::destroyImpl( if (nullptr == data) continue; - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) if (!aggregate_functions[i]->isState()) aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]); } @@ -2185,12 +2191,12 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) LOG_TRACE(log, "Destroying aggregate states"); /// В какой структуре данных агрегированы данные? - if (result.type == AggregatedDataVariants::Type::without_key || overflow_row) + if (result.type == AggregatedDataVariants::Type::without_key || params.overflow_row) { AggregatedDataWithoutKey & res_data = result.without_key; if (nullptr != res_data) - for (size_t i = 0; i < aggregates_size; ++i) + for (size_t i = 0; i < params.aggregates_size; ++i) if (!aggregate_functions[i]->isState()) aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); } @@ -2211,22 +2217,22 @@ String Aggregator::getID() const { std::stringstream res; - if (keys.empty()) + if (params.keys.empty()) { res << "key_names"; - for (size_t i = 0; i < key_names.size(); ++i) - res << ", " << key_names[i]; + for (size_t i = 0; i < params.key_names.size(); ++i) + res << ", " << params.key_names[i]; } else { res << "keys"; - for (size_t i = 0; i < keys.size(); ++i) - res << ", " << keys[i]; + for (size_t i = 0; i < params.keys.size(); ++i) + res << ", " << params.keys[i]; } res << ", aggregates"; - for (size_t i = 0; i < aggregates.size(); ++i) - res << ", " << aggregates[i].column_name; + for (size_t i = 0; i < params.aggregates_size; ++i) + res << ", " << params.aggregates[i].column_name; return res.str(); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index c52cb773274..c1c3b2d7ebf 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -852,12 +852,15 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, AggregateDescriptions aggregates; query_analyzer->getAggregateInfo(key_names, aggregates); + Aggregator::Params params(key_names, aggregates, + overflow_row, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode, + settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, + streams.size() > 1 ? settings.group_by_two_level_threshold : SettingUInt64(0)); + /// Если источников несколько, то выполняем параллельную агрегацию if (streams.size() > 1) { - streams[0] = new ParallelAggregatingBlockInputStream(streams, stream_with_non_joined_data, key_names, aggregates, overflow_row, final, - settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode, - settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, settings.group_by_two_level_threshold); + streams[0] = new ParallelAggregatingBlockInputStream(streams, stream_with_non_joined_data, params, final, settings.max_threads); stream_with_non_joined_data = nullptr; streams.resize(1); @@ -873,9 +876,7 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, if (stream_with_non_joined_data) inputs.push_back(stream_with_non_joined_data); - streams[0] = new AggregatingBlockInputStream(new ConcatBlockInputStream(inputs), key_names, aggregates, overflow_row, final, - settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode, - settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, 0); + streams[0] = new AggregatingBlockInputStream(new ConcatBlockInputStream(inputs), params, final); stream_with_non_joined_data = nullptr; } @@ -903,17 +904,19 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina * но при этом может работать медленнее. */ + Aggregator::Params params(key_names, aggregates, overflow_row); + if (!settings.distributed_aggregation_memory_efficient) { /// Склеим несколько источников в один, распараллеливая работу. executeUnion(); /// Теперь объединим агрегированные блоки - streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads); + streams[0] = new MergingAggregatedBlockInputStream(streams[0], params, final, original_max_threads); } else { - streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, key_names, aggregates, overflow_row, final); + streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, params, final); streams.resize(1); } } From 1ca3d930dac007d0f66e6cc2b8bc4ae6378ed090 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Wed, 25 Nov 2015 17:22:14 +0300 Subject: [PATCH 02/14] dbms: do not allocate memory for default strings in cache dictionaries [#METR-17328] --- dbms/include/DB/Dictionaries/CacheDictionary.h | 7 ++++--- dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h index dfe2d907cc5..db95813fc73 100644 --- a/dbms/include/DB/Dictionaries/CacheDictionary.h +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -522,7 +522,10 @@ private: else { const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; - map[id] = String{string_ref}; + + if (!cell.isDefault()) + map[id] = String{string_ref}; + total_length += string_ref.size + 1; } } @@ -551,8 +554,6 @@ private: out->getChars().reserve(total_length); - // const auto & null_value = std::get(attribute.null_values); - for (const auto row : ext::range(0, ext::size(ids))) { const auto id = ids[row]; diff --git a/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h b/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h index e0a5981229c..bcae70f721e 100644 --- a/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h +++ b/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h @@ -557,7 +557,10 @@ private: else { const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; - map[key] = String{string_ref}; + + if (!cell.isDefault()) + map[key] = String{string_ref}; + total_length += string_ref.size + 1; } } From 386d560d3971b5ce8aa77739402b98b2a6644ddc Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 30 Nov 2015 18:15:45 +0300 Subject: [PATCH 03/14] dbms: refactor DoubleConverter [#METR-17328] --- dbms/include/DB/IO/DoubleConverter.h | 29 ++++++++++++++----- dbms/include/DB/IO/WriteHelpers.h | 16 +++++----- dbms/src/Common/formatReadable.cpp | 8 ++--- dbms/src/Core/FieldVisitors.cpp | 8 ++--- dbms/src/Functions/FunctionsMiscellaneous.cpp | 12 ++++---- 5 files changed, 44 insertions(+), 29 deletions(-) diff --git a/dbms/include/DB/IO/DoubleConverter.h b/dbms/include/DB/IO/DoubleConverter.h index c2079fd8f7d..c381dd6f44e 100644 --- a/dbms/include/DB/IO/DoubleConverter.h +++ b/dbms/include/DB/IO/DoubleConverter.h @@ -2,6 +2,7 @@ #include + namespace DB { @@ -15,14 +16,28 @@ template <> struct DoubleToStringConverterFlags static constexpr auto flags = double_conversion::DoubleToStringConverter::EMIT_TRAILING_DECIMAL_POINT; }; -template -const double_conversion::DoubleToStringConverter & getDoubleToStringConverter() +template +class DoubleConverter { - static const double_conversion::DoubleToStringConverter instance{ - DoubleToStringConverterFlags::flags, "inf", "nan", 'e', -6, 21, 6, 1 - }; + DoubleConverter(const DoubleConverter &) = delete; + DoubleConverter & operator=(const DoubleConverter &) = delete; - return instance; -} + DoubleConverter() = default; + +public: + /** @todo Add commentary on how this constant is deduced. + * e.g. it's minus sign, integral zero, decimal point, up to 5 leading zeros and kBase10MaximalLength digits. */ + static constexpr auto MAX_REPRESENTATION_LENGTH = 26; + using BufferType = char[MAX_REPRESENTATION_LENGTH]; + + static const auto & instance() + { + static const double_conversion::DoubleToStringConverter instance{ + DoubleToStringConverterFlags::flags, "inf", "nan", 'e', -6, 21, 6, 1 + }; + + return instance; + } +}; } diff --git a/dbms/include/DB/IO/WriteHelpers.h b/dbms/include/DB/IO/WriteHelpers.h index bda31fc7e8b..eff104ed984 100644 --- a/dbms/include/DB/IO/WriteHelpers.h +++ b/dbms/include/DB/IO/WriteHelpers.h @@ -87,28 +87,28 @@ inline void writeBoolText(bool x, WriteBuffer & buf) inline void writeFloatText(double x, WriteBuffer & buf) { - char tmp[25]; - double_conversion::StringBuilder builder{tmp, sizeof(tmp)}; + DoubleConverter::BufferType buffer; + double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = getDoubleToStringConverter().ToShortest(x, &builder); + const auto result = DoubleConverter::instance().ToShortest(x, &builder); if (!result) throw Exception("Cannot print double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); - buf.write(tmp, builder.position()); + buf.write(buffer, builder.position()); } inline void writeFloatText(float x, WriteBuffer & buf) { - char tmp[25]; - double_conversion::StringBuilder builder{tmp, sizeof(tmp)}; + DoubleConverter::BufferType buffer; + double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = getDoubleToStringConverter().ToShortestSingle(x, &builder); + const auto result = DoubleConverter::instance().ToShortestSingle(x, &builder); if (!result) throw Exception("Cannot print float number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); - buf.write(tmp, builder.position()); + buf.write(buffer, builder.position()); } diff --git a/dbms/src/Common/formatReadable.cpp b/dbms/src/Common/formatReadable.cpp index 2ada676e1f4..9c0121e37b6 100644 --- a/dbms/src/Common/formatReadable.cpp +++ b/dbms/src/Common/formatReadable.cpp @@ -14,15 +14,15 @@ static void formatReadable(double size, DB::WriteBuffer & out, int precision, co for (; i + 1 < units_size && fabs(size) >= delimiter; ++i) size /= delimiter; - char tmp[25]; - double_conversion::StringBuilder builder{tmp, sizeof(tmp)}; + DB::DoubleConverter::BufferType buffer; + double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = DB::getDoubleToStringConverter().ToFixed(size, precision, &builder); + const auto result = DB::DoubleConverter::instance().ToFixed(size, precision, &builder); if (!result) throw DB::Exception("Cannot print float or double number", DB::ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); - out.write(tmp, builder.position()); + out.write(buffer, builder.position()); writeCString(units[i], out); } diff --git a/dbms/src/Core/FieldVisitors.cpp b/dbms/src/Core/FieldVisitors.cpp index b9b7c1cb417..8373b481fa8 100644 --- a/dbms/src/Core/FieldVisitors.cpp +++ b/dbms/src/Core/FieldVisitors.cpp @@ -34,15 +34,15 @@ String FieldVisitorDump::operator() (const Array & x) const String FieldVisitorToString::formatFloat(const Float64 x) { - char tmp[25]; - double_conversion::StringBuilder builder{tmp, sizeof(tmp)}; + DoubleConverter::BufferType buffer; + double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = getDoubleToStringConverter().ToShortest(x, &builder); + const auto result = DoubleConverter::instance().ToShortest(x, &builder); if (!result) throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); - return { tmp, tmp + builder.position() }; + return { buffer, buffer + builder.position() }; } String FieldVisitorToString::operator() (const Array & x) const diff --git a/dbms/src/Functions/FunctionsMiscellaneous.cpp b/dbms/src/Functions/FunctionsMiscellaneous.cpp index fd8d18b4531..a79c9bd598f 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.cpp +++ b/dbms/src/Functions/FunctionsMiscellaneous.cpp @@ -35,10 +35,10 @@ static void numWidthConstant(T a, UInt64 & c) inline UInt64 floatWidth(const double x) { - char tmp[25]; - double_conversion::StringBuilder builder{tmp, sizeof(tmp)}; + DoubleConverter::BufferType buffer; + double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = getDoubleToStringConverter().ToShortest(x, &builder); + const auto result = DoubleConverter::instance().ToShortest(x, &builder); if (!result) throw Exception("Cannot print double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); @@ -48,10 +48,10 @@ inline UInt64 floatWidth(const double x) inline UInt64 floatWidth(const float x) { - char tmp[25]; - double_conversion::StringBuilder builder{tmp, sizeof(tmp)}; + DoubleConverter::BufferType buffer; + double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = getDoubleToStringConverter().ToShortestSingle(x, &builder); + const auto result = DoubleConverter::instance().ToShortestSingle(x, &builder); if (!result) throw Exception("Cannot print float number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); From 48e840a199434ded4887303df63f7828391975dd Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 30 Nov 2015 22:57:46 +0300 Subject: [PATCH 04/14] dbms: external aggregation: development [#METR-17000]. --- dbms/include/DB/Interpreters/Aggregator.h | 60 +++++++++---- dbms/include/DB/Interpreters/Limits.h | 1 + dbms/include/DB/Interpreters/Settings.h | 5 +- dbms/src/Interpreters/Aggregator.cpp | 89 ++++++++++++++++++- .../Interpreters/InterpreterSelectQuery.cpp | 4 +- 5 files changed, 139 insertions(+), 20 deletions(-) diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 0c81cd9206b..7bbf2c3a9c7 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -4,6 +4,8 @@ #include #include +#include + #include #include @@ -602,7 +604,7 @@ struct AggregatedDataVariants : private boost::noncopyable }; Type type = Type::EMPTY; - AggregatedDataVariants() : aggregates_pools(1, new Arena), aggregates_pool(&*aggregates_pools.back()) {} + AggregatedDataVariants() : aggregates_pools(1, new Arena), aggregates_pool(aggregates_pools.back().get()) {} bool empty() const { return type == Type::EMPTY; } void invalidate() { type = Type::EMPTY; } @@ -627,6 +629,7 @@ struct AggregatedDataVariants : private boost::noncopyable type = type_; } + /// Количество строк (разных ключей). size_t size() const { switch (type) @@ -771,27 +774,37 @@ public: size_t aggregates_size; /// Настройки приближённого вычисления GROUP BY. - bool overflow_row; /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by. - size_t max_rows_to_group_by; - OverflowMode group_by_overflow_mode; + const bool overflow_row; /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by. + const size_t max_rows_to_group_by; + const OverflowMode group_by_overflow_mode; /// Для динамической компиляции. Compiler * compiler; - UInt32 min_count_to_compile; + const UInt32 min_count_to_compile; /// Настройки двухуровневой агрегации (используется для большого количества ключей). - /** При каком количестве ключей, начинает использоваться двухуровневая агрегация. - * 0 - никогда не использовать. + /** При каком количестве ключей или размере состояния агрегации в байтах, + * начинает использоваться двухуровневая агрегация. Достаточно срабатывания хотя бы одного из порогов. + * 0 - соответствующий порог не задан. */ - size_t group_by_two_level_threshold; + const size_t group_by_two_level_threshold; + const size_t group_by_two_level_threshold_bytes; - Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, - size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_, - size_t group_by_two_level_threshold_) + /// Настройки для сброса временных данных в файловую систему (внешняя агрегация). + const size_t max_bytes_before_external_group_by; /// 0 - не использовать внешнюю агрегацию. + const std::string tmp_path; + + Params( + const Names & key_names_, const AggregateDescriptions & aggregates_, + bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, + Compiler * compiler_, UInt32 min_count_to_compile_, + size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, + size_t max_bytes_before_external_group_by_, const std::string & tmp_path_) : key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()), - overflow_row(overflow_row_), - max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), - compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_) + overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), + compiler(compiler_), min_count_to_compile(min_count_to_compile_), + group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), + max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_) { std::sort(key_names.begin(), key_names.end()); key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end()); @@ -800,7 +813,7 @@ public: /// Только параметры, имеющие значение при мердже. Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_) - : Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0) {} + : Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") {} /// Вычислить номера столбцов в keys и aggregates. void calculateColumnNumbers(const Block & block); @@ -894,6 +907,9 @@ protected: size_t total_size_of_aggregate_states = 0; /// Суммарный размер строки из агрегатных функций. bool all_aggregates_has_trivial_destructor = false; + /// Сколько было использовано оперативки для обработки запроса до начала обработки первого блока. + Int64 memory_usage_before_aggregation = 0; + /// Для инициализации от первого блока при конкуррентном использовании. bool initialized = false; std::mutex mutex; @@ -925,6 +941,11 @@ protected: /// Возвращает true, если можно прервать текущую задачу. CancellationHook isCancelled; + /// Для внешней агрегации. + std::vector> temporary_files; + std::mutex temporary_files_mutex; + bool hasTemporaryFiles() const { return !temporary_files.empty(); } + /** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов. * Сформировать блок - пример результата. */ @@ -975,6 +996,15 @@ protected: size_t rows, AggregateFunctionInstruction * aggregate_instructions) const; + void writeToTemporaryFile(AggregatedDataVariants & data_variants); + + template + void writeToTemporaryFileImpl( + AggregatedDataVariants & data_variants, + Method & method, + IBlockOutputStream & out, + const String & path); + public: /// Шаблоны, инстанцирующиеся путём динамической компиляции кода - см. SpecializedAggregator.h diff --git a/dbms/include/DB/Interpreters/Limits.h b/dbms/include/DB/Interpreters/Limits.h index efdb2e0e806..107b94e41d8 100644 --- a/dbms/include/DB/Interpreters/Limits.h +++ b/dbms/include/DB/Interpreters/Limits.h @@ -32,6 +32,7 @@ struct Limits \ M(SettingUInt64, max_rows_to_group_by, 0) \ M(SettingOverflowMode, group_by_overflow_mode, OverflowMode::THROW) \ + M(SettingUInt64, max_bytes_before_external_group_by, 0) \ \ M(SettingUInt64, max_rows_to_sort, 0) \ M(SettingUInt64, max_bytes_to_sort, 0) \ diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index daa796c8acc..affbfa57868 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -90,8 +90,11 @@ struct Settings M(SettingBool, compile, false) \ /** Количество одинаковых по структуре запросов перед тем, как инициируется их компиляция. */ \ M(SettingUInt64, min_count_to_compile, 3) \ - /** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - никогда не использовать. */ \ + /** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - порог не выставлен. */ \ M(SettingUInt64, group_by_two_level_threshold, 100000) \ + /** При каком размере состояния агрегации в байтах, начинает использоваться двухуровневая агрегация. 0 - порог не выставлен. \ + * Двухуровневая агрегация начинает использоваться при срабатывании хотя бы одного из порогов. */ \ + M(SettingUInt64, group_by_two_level_threshold_bytes, 100000000) \ /** Включён ли экономный по памяти режим распределённой агрегации. */ \ M(SettingBool, distributed_aggregation_memory_efficient, false) \ \ diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 0b71a7fd002..19f895f406d 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -11,8 +11,12 @@ #include #include #include +#include +#include +#include #include +#include namespace DB @@ -84,6 +88,8 @@ void Aggregator::initialize(const Block & block) initialized = true; + memory_usage_before_aggregation = current_memory_tracker->get(); + aggregate_functions.resize(params.aggregates_size); for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i] = params.aggregates[i].function.get(); @@ -724,18 +730,97 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, } size_t result_size = result.sizeWithoutOverflowRow(); + auto current_memory_usage = current_memory_tracker->get(); + auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; - if (params.group_by_two_level_threshold && result.isConvertibleToTwoLevel() && result_size >= params.group_by_two_level_threshold) + /** Преобразование в двухуровневую структуру данных. + * Она позволяет делать, в последующем, эффективный мердж - либо экономный по памяти, либо распараллеленный. + */ + if (result.isConvertibleToTwoLevel() + && ((params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold) + || (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast(params.group_by_two_level_threshold_bytes)))) + { result.convertToTwoLevel(); + } /// Проверка ограничений. if (!checkLimits(result_size, no_more_keys)) return false; + /** Сброс данных на диск, если потребляется слишком много оперативки. + * Данные можно сбросить на диск только если используется двухуровневая структура агрегации. + */ + if (params.max_bytes_before_external_group_by + && result.isTwoLevel() + && current_memory_usage > static_cast(params.max_bytes_before_external_group_by)) + writeToTemporaryFile(result); + return true; } +void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) +{ + auto file = std::make_unique(params.tmp_path); + const std::string & path = file->path(); + WriteBufferFromFile file_buf(path); + CompressedWriteBuffer compressed_buf(file_buf); + NativeBlockOutputStream block_out(compressed_buf, Revision::get()); + + /// Сбрасываем только двухуровневые данные. + + #define M(NAME) \ + else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ + return writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out, path); + + if (false) {} + APPLY_FOR_VARIANTS_TWO_LEVEL(M) +#undef M + else + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); + + std::lock_guard lock(temporary_files_mutex); + temporary_files.emplace_back(std::move(file)); +} + + +template +void Aggregator::writeToTemporaryFileImpl( + AggregatedDataVariants & data_variants, + Method & method, + IBlockOutputStream & out, + const String & path) +{ + LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << "."); + + for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) + { + if (method.data.impls[bucket].empty()) + continue; + + Block block = prepareBlockAndFill(data_variants, false, method.data.impls[bucket].size(), + [bucket, &method, this] ( + ColumnPlainPtrs & key_columns, + AggregateColumnsData & aggregate_columns, + ColumnPlainPtrs & final_aggregate_columns, + const Sizes & key_sizes, + bool final) + { + convertToBlockImpl(method, method.data.impls[bucket], + key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final); + }); + + block.info.bucket_num = bucket; + out.write(block); + } + + /// NOTE Вместо освобождения памяти и создания новых хэш-таблиц и арены, можно переиспользовать старые. + data_variants.init(data_variants.type); + data_variants.aggregates_pools = Arenas(1, new Arena); + data_variants.aggregates_pool = data_variants.aggregates_pools.back().get(); +} + + bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const { if (!no_more_keys && params.max_rows_to_group_by && result_size > params.max_rows_to_group_by) @@ -756,8 +841,6 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const } -/** Результат хранится в оперативке и должен полностью помещаться в оперативку. - */ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & result) { if (isCancelled()) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index c1c3b2d7ebf..09a1764c1a9 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -855,7 +855,9 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, Aggregator::Params params(key_names, aggregates, overflow_row, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode, settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, - streams.size() > 1 ? settings.group_by_two_level_threshold : SettingUInt64(0)); + streams.size() > 1 ? settings.group_by_two_level_threshold : SettingUInt64(0), + streams.size() > 1 ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), + settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath()); /// Если источников несколько, то выполняем параллельную агрегацию if (streams.size() > 1) From a7387d9ce431a1f288d1a4898fa0c0c36c765abc Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 30 Nov 2015 23:20:37 +0300 Subject: [PATCH 05/14] dbms: improvement [#METR-2944]. --- dbms/src/Storages/StorageBuffer.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 36224ce0b9a..b9ac246c68b 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -151,11 +151,7 @@ static void appendBlock(const Block & from, Block & to) throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no) + ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); - if (col_to.empty()) - to.getByPosition(column_no).column = col_from.clone(); - else - for (size_t row_no = 0; row_no < rows; ++row_no) - col_to.insertFrom(col_from, row_no); + col_to.insertRangeFrom(col_from, 0, rows); } } From eba1cb4ca3af3c9359252640ac5e2792d1fa195a Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Tue, 1 Dec 2015 14:16:41 +0300 Subject: [PATCH 06/14] dbms: FieldVisitorToString::formatFloat emit trailing dec pt [#METR-17328] --- dbms/src/Core/FieldVisitors.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Core/FieldVisitors.cpp b/dbms/src/Core/FieldVisitors.cpp index 8373b481fa8..c2c4832ecbf 100644 --- a/dbms/src/Core/FieldVisitors.cpp +++ b/dbms/src/Core/FieldVisitors.cpp @@ -34,10 +34,10 @@ String FieldVisitorDump::operator() (const Array & x) const String FieldVisitorToString::formatFloat(const Float64 x) { - DoubleConverter::BufferType buffer; + DoubleConverter::BufferType buffer; double_conversion::StringBuilder builder{buffer, sizeof(buffer)}; - const auto result = DoubleConverter::instance().ToShortest(x, &builder); + const auto result = DoubleConverter::instance().ToShortest(x, &builder); if (!result) throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); From 02b16fce069258517ed4f90ae9e481a71ff41cfb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 17:09:05 +0300 Subject: [PATCH 07/14] dbms: external aggregation: initial implementation [#METR-17000]. --- .../DB/DataStreams/OneBlockInputStream.h | 6 +- .../ParallelAggregatingBlockInputStream.h | 87 ++++++++++++++----- dbms/include/DB/Interpreters/Aggregator.h | 13 ++- dbms/src/Interpreters/Aggregator.cpp | 78 +++++++++++++---- 4 files changed, 137 insertions(+), 47 deletions(-) diff --git a/dbms/include/DB/DataStreams/OneBlockInputStream.h b/dbms/include/DB/DataStreams/OneBlockInputStream.h index ea4b9adfefb..0178b8285a6 100644 --- a/dbms/include/DB/DataStreams/OneBlockInputStream.h +++ b/dbms/include/DB/DataStreams/OneBlockInputStream.h @@ -1,17 +1,13 @@ #pragma once -#include - #include namespace DB { -using Poco::SharedPtr; - - /** Поток блоков, из которого можно прочитать один блок. + * Также смотрите BlocksListBlockInputStream. */ class OneBlockInputStream : public IProfilingBlockInputStream { diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index d0777a536a3..9572fae1b76 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -1,8 +1,14 @@ #pragma once #include +#include +#include #include +#include +#include +#include #include +#include namespace DB @@ -23,8 +29,8 @@ public: */ ParallelAggregatingBlockInputStream( BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, - const Aggregator::Params & params, bool final_, size_t max_threads_) - : aggregator(params), + const Aggregator::Params & params_, bool final_, size_t max_threads_) + : params(params_), aggregator(params), final(final_), max_threads(std::min(inputs.size(), max_threads_)), keys_size(params.keys_size), aggregates_size(params.aggregates_size), handler(*this), processor(inputs, additional_input_at_end, max_threads, handler) @@ -74,25 +80,57 @@ protected: Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); }; aggregator.setCancellationHook(hook); - AggregatedDataVariantsPtr data_variants = executeAndMerge(); + execute(); - if (data_variants) - blocks = aggregator.convertToBlocks(*data_variants, final, max_threads); + if (isCancelled()) + return {}; - it = blocks.begin(); + if (!aggregator.hasTemporaryFiles()) + { + /** Если все частично-агрегированные данные в оперативке, то мерджим их параллельно, тоже в оперативке. + * NOTE Если израсходовано больше половины допустимой памяти, то мерджить следовало бы более экономно. + */ + AggregatedDataVariantsPtr data_variants = aggregator.merge(many_data, max_threads); + + if (data_variants) + impl.reset(new BlocksListBlockInputStream( + aggregator.convertToBlocks(*data_variants, final, max_threads))); + } + else + { + /** Если есть временные файлы с частично-агрегированными данными на диске, + * то читаем и мерджим их, расходуя минимальное количество памяти. + */ + + /// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще. NOTE Это можно делать параллельно. + for (AggregatedDataVariantsPtr & data : many_data) + { + size_t rows = data->sizeWithoutOverflowRow(); + if (rows) + aggregator.writeToTemporaryFile(*data, rows); + } + + const auto & files = aggregator.getTemporaryFiles(); + BlockInputStreams input_streams; + for (const auto & file : files) + { + temporary_inputs.emplace_back(new TemporaryFileStream(file->path())); + input_streams.emplace_back(temporary_inputs.back()->block_in); + } + + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); + } } Block res; - if (isCancelled() || it == blocks.end()) + if (isCancelled() || !impl) return res; - res = *it; - ++it; - - return res; + return impl->read(); } private: + Aggregator::Params params; Aggregator aggregator; bool final; size_t max_threads; @@ -108,8 +146,22 @@ private: bool no_more_keys = false; bool executed = false; - BlocksList blocks; - BlocksList::iterator it; + + /// Для чтения сброшенных во временный файл данных. + struct TemporaryFileStream + { + ReadBufferFromFile file_in; + CompressedReadBuffer compressed_in; + BlockInputStreamPtr block_in; + + TemporaryFileStream(const std::string & path) + : file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, Revision::get())) {} + }; + std::vector> temporary_inputs; + + /** Отсюда будем доставать готовые блоки после агрегации. + */ + std::unique_ptr impl; Logger * log = &Logger::get("ParallelAggregatingBlockInputStream"); @@ -172,7 +224,7 @@ private: ParallelInputsProcessor processor; - AggregatedDataVariantsPtr executeAndMerge() + void execute() { many_data.resize(max_threads); exceptions.resize(max_threads); @@ -193,7 +245,7 @@ private: rethrowFirstException(exceptions); if (isCancelled()) - return nullptr; + return; double elapsed_seconds = watch.elapsedSeconds(); @@ -216,11 +268,6 @@ private: << "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)" << " in " << elapsed_seconds << " sec." << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); - - if (isCancelled()) - return nullptr; - - return aggregator.merge(many_data, max_threads); } }; diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 7bbf2c3a9c7..ae1fea6fe59 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -877,6 +877,14 @@ public: /// Для IBlockInputStream. String getID() const; + void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows); + + bool hasTemporaryFiles() const { return !temporary_files.empty(); } + + using TemporaryFiles = std::vector>; + + const TemporaryFiles & getTemporaryFiles() const { return temporary_files; } + protected: friend struct AggregatedDataVariants; @@ -942,9 +950,8 @@ protected: CancellationHook isCancelled; /// Для внешней агрегации. - std::vector> temporary_files; + TemporaryFiles temporary_files; std::mutex temporary_files_mutex; - bool hasTemporaryFiles() const { return !temporary_files.empty(); } /** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов. * Сформировать блок - пример результата. @@ -996,8 +1003,6 @@ protected: size_t rows, AggregateFunctionInstruction * aggregate_instructions) const; - void writeToTemporaryFile(AggregatedDataVariants & data_variants); - template void writeToTemporaryFileImpl( AggregatedDataVariants & data_variants, diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 19f895f406d..a479ed30c84 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -731,17 +731,17 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, size_t result_size = result.sizeWithoutOverflowRow(); auto current_memory_usage = current_memory_tracker->get(); - auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; + auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Здесь учитываются все результаты в сумме, из разных потоков. + + bool worth_convert_to_two_level + = (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold) + || (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast(params.group_by_two_level_threshold_bytes)); /** Преобразование в двухуровневую структуру данных. * Она позволяет делать, в последующем, эффективный мердж - либо экономный по памяти, либо распараллеленный. */ - if (result.isConvertibleToTwoLevel() - && ((params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold) - || (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast(params.group_by_two_level_threshold_bytes)))) - { + if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level) result.convertToTwoLevel(); - } /// Проверка ограничений. if (!checkLimits(result_size, no_more_keys)) @@ -752,26 +752,33 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, */ if (params.max_bytes_before_external_group_by && result.isTwoLevel() - && current_memory_usage > static_cast(params.max_bytes_before_external_group_by)) - writeToTemporaryFile(result); + && current_memory_usage > static_cast(params.max_bytes_before_external_group_by) + && worth_convert_to_two_level) + { + writeToTemporaryFile(result, result_size); + } return true; } -void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) +void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows) { + Stopwatch watch; + auto file = std::make_unique(params.tmp_path); const std::string & path = file->path(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); NativeBlockOutputStream block_out(compressed_buf, Revision::get()); + LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << "."); + /// Сбрасываем только двухуровневые данные. - #define M(NAME) \ +#define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ - return writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out, path); + writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out, path); if (false) {} APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -779,8 +786,35 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); - std::lock_guard lock(temporary_files_mutex); - temporary_files.emplace_back(std::move(file)); + /// NOTE Вместо освобождения памяти и создания новых хэш-таблиц и арены, можно переиспользовать старые. + data_variants.init(data_variants.type); + data_variants.aggregates_pools = Arenas(1, new Arena); + data_variants.aggregates_pool = data_variants.aggregates_pools.back().get(); + + block_out.flush(); + compressed_buf.next(); + file_buf.next(); + + { + std::lock_guard lock(temporary_files_mutex); + temporary_files.emplace_back(std::move(file)); + } + + double elapsed_seconds = watch.elapsedSeconds(); + double compressed_bytes = file_buf.count(); + double uncompressed_bytes = compressed_buf.count(); + + LOG_TRACE(log, std::fixed << std::setprecision(3) + << "Written part in " << elapsed_seconds << " sec., " + << rows << " rows, " + << (uncompressed_bytes / 1048576.0) << " MiB uncompressed, " + << (compressed_bytes / 1048576.0) << " MiB compressed, " + << (uncompressed_bytes / rows) << " uncompressed bytes per row, " + << (compressed_bytes / rows) << " compressed bytes per row, " + << "compression rate: " << (uncompressed_bytes / compressed_bytes) + << " (" << (rows / elapsed_seconds) << " rows/sec., " + << (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, " + << (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)"); } @@ -791,7 +825,8 @@ void Aggregator::writeToTemporaryFileImpl( IBlockOutputStream & out, const String & path) { - LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << "."); + size_t max_temporary_block_size_rows = 0; + size_t max_temporary_block_size_bytes = 0; for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) { @@ -812,12 +847,19 @@ void Aggregator::writeToTemporaryFileImpl( block.info.bucket_num = bucket; out.write(block); + + size_t block_size_rows = block.rowsInFirstColumn(); + size_t block_size_bytes = block.bytes(); + + if (block_size_rows > max_temporary_block_size_rows) + max_temporary_block_size_rows = block.rowsInFirstColumn(); + if (block_size_bytes > max_temporary_block_size_bytes) + max_temporary_block_size_bytes = block_size_bytes; } - /// NOTE Вместо освобождения памяти и создания новых хэш-таблиц и арены, можно переиспользовать старые. - data_variants.init(data_variants.type); - data_variants.aggregates_pools = Arenas(1, new Arena); - data_variants.aggregates_pool = data_variants.aggregates_pools.back().get(); + LOG_TRACE(log, std::fixed << std::setprecision(3) + << "Max size of temporary block: " << max_temporary_block_size_rows << " rows, " + << (max_temporary_block_size_bytes / 1048576.0) << " MiB."); } From eef5fcc4665fe3a14ad0d2bc77426c7dd28be973 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 17:11:31 +0300 Subject: [PATCH 08/14] dbms: addition to prev. revision [#METR-17000]. --- .../DataStreams/BlocksListBlockInputStream.h | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 dbms/include/DB/DataStreams/BlocksListBlockInputStream.h diff --git a/dbms/include/DB/DataStreams/BlocksListBlockInputStream.h b/dbms/include/DB/DataStreams/BlocksListBlockInputStream.h new file mode 100644 index 00000000000..0e62130f123 --- /dev/null +++ b/dbms/include/DB/DataStreams/BlocksListBlockInputStream.h @@ -0,0 +1,49 @@ +#pragma once + +#include + + +namespace DB +{ + +/** Поток блоков, из которого можно прочитать следующий блок из явно предоставленного списка. + * Также смотрите OneBlockInputStream. + */ +class BlocksListBlockInputStream : public IProfilingBlockInputStream +{ +public: + /// Захватывает владение списком блоков. + BlocksListBlockInputStream(BlocksList && list_) + : list(std::move(list_)), it(list.begin()), end(list.end()) {} + + /// Использует лежащий где-то ещё список блоков. + BlocksListBlockInputStream(BlocksList::iterator & begin_, BlocksList::iterator & end_) + : it(begin_), end(end_) {} + + String getName() const override { return "BlocksList"; } + + String getID() const override + { + std::stringstream res; + res << this; + return res.str(); + } + +protected: + Block readImpl() override + { + if (it == end) + return Block(); + + Block res = *it; + ++it; + return res; + } + +private: + BlocksList list; + BlocksList::iterator it; + const BlocksList::iterator end; +}; + +} From fe8f947a435d37eb6a66c78cc643e5153aa898ca Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 17:43:51 +0300 Subject: [PATCH 09/14] dbms: allowed external aggregation with non-parallel aggregation [#METR-17000]. --- .../DataStreams/AggregatingBlockInputStream.h | 28 +++++++++++-- .../ParallelAggregatingBlockInputStream.h | 1 + .../AggregatingBlockInputStream.cpp | 41 +++++++++++++++---- .../Interpreters/InterpreterSelectQuery.cpp | 10 ++++- 4 files changed, 65 insertions(+), 15 deletions(-) diff --git a/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h index b600d068e59..2e850cb8dae 100644 --- a/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AggregatingBlockInputStream.h @@ -1,7 +1,11 @@ #pragma once #include +#include +#include #include +#include +#include namespace DB @@ -22,8 +26,8 @@ public: * Агрегатные функции ищутся везде в выражении. * Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены. */ - AggregatingBlockInputStream(BlockInputStreamPtr input_, const Aggregator::Params & params, bool final_) - : aggregator(params), final(final_) + AggregatingBlockInputStream(BlockInputStreamPtr input_, const Aggregator::Params & params_, bool final_) + : params(params_), aggregator(params), final(final_) { children.push_back(input_); } @@ -40,12 +44,28 @@ public: protected: Block readImpl() override; + Aggregator::Params params; Aggregator aggregator; bool final; bool executed = false; - BlocksList blocks; - BlocksList::iterator it; + + /// Для чтения сброшенных во временный файл данных. + struct TemporaryFileStream + { + ReadBufferFromFile file_in; + CompressedReadBuffer compressed_in; + BlockInputStreamPtr block_in; + + TemporaryFileStream(const std::string & path) + : file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, Revision::get())) {} + }; + std::vector> temporary_inputs; + + /** Отсюда будем доставать готовые блоки после агрегации. */ + std::unique_ptr impl; + + Logger * log = &Logger::get("AggregatingBlockInputStream"); }; } diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 9572fae1b76..499580dcb52 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -118,6 +118,7 @@ protected: input_streams.emplace_back(temporary_inputs.back()->block_in); } + LOG_TRACE(log, "Will merge " << files.size() << " temporary files."); impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); } } diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 1deb5a3e06c..175f7dc6e32 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -1,5 +1,5 @@ -#include - +#include +#include #include @@ -18,18 +18,41 @@ Block AggregatingBlockInputStream::readImpl() aggregator.setCancellationHook(hook); aggregator.execute(children.back(), data_variants); - blocks = aggregator.convertToBlocks(data_variants, final, 1); - it = blocks.begin(); + + if (!aggregator.hasTemporaryFiles()) + { + impl.reset(new BlocksListBlockInputStream( + aggregator.convertToBlocks(data_variants, final, 1))); + } + else + { + /** Если есть временные файлы с частично-агрегированными данными на диске, + * то читаем и мерджим их, расходуя минимальное количество памяти. + */ + + /// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще. + size_t rows = data_variants.sizeWithoutOverflowRow(); + if (rows) + aggregator.writeToTemporaryFile(data_variants, rows); + + const auto & files = aggregator.getTemporaryFiles(); + BlockInputStreams input_streams; + for (const auto & file : files) + { + temporary_inputs.emplace_back(new TemporaryFileStream(file->path())); + input_streams.emplace_back(temporary_inputs.back()->block_in); + } + + LOG_TRACE(log, "Will merge " << files.size() << " temporary files."); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); + } } Block res; - if (isCancelled() || it == blocks.end()) + if (isCancelled() || !impl) return res; - res = *it; - ++it; - - return res; + return impl->read(); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 09a1764c1a9..c4e6cdfd035 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -852,11 +852,17 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, AggregateDescriptions aggregates; query_analyzer->getAggregateInfo(key_names, aggregates); + /** Двухуровневая агрегация полезна в двух случаях: + * 1. Делается параллельная агрегация, и результаты надо параллельно мерджить. + * 2. Делается агрегация с сохранением временных данных на диск, и их нужно мерджить эффективно по памяти. + */ + bool allow_to_use_two_level_group_by = streams.size() > 1 || settings.limits.max_bytes_before_external_group_by != 0; + Aggregator::Params params(key_names, aggregates, overflow_row, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode, settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, - streams.size() > 1 ? settings.group_by_two_level_threshold : SettingUInt64(0), - streams.size() > 1 ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath()); /// Если источников несколько, то выполняем параллельную агрегацию From 88443344238c145afc8271eb369dacd40f7dcfea Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 19:58:15 +0300 Subject: [PATCH 10/14] dbms: external aggregation: development [#METR-17000]. --- .../ParallelAggregatingBlockInputStream.h | 7 +++++-- dbms/include/DB/Interpreters/Aggregator.h | 12 +++++++++--- .../DataStreams/AggregatingBlockInputStream.cpp | 7 +++++-- dbms/src/Interpreters/Aggregator.cpp | 15 ++++++++++----- .../00284_external_aggregation.reference | 2 ++ .../0_stateless/00284_external_aggregation.sql | 5 +++++ 6 files changed, 36 insertions(+), 12 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00284_external_aggregation.reference create mode 100644 dbms/tests/queries/0_stateless/00284_external_aggregation.sql diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 499580dcb52..d2da7a2b6f3 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -112,13 +112,16 @@ protected: const auto & files = aggregator.getTemporaryFiles(); BlockInputStreams input_streams; - for (const auto & file : files) + for (const auto & file : files.files) { temporary_inputs.emplace_back(new TemporaryFileStream(file->path())); input_streams.emplace_back(temporary_inputs.back()->block_in); } - LOG_TRACE(log, "Will merge " << files.size() << " temporary files."); + LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " + << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " + << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); } } diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index ae1fea6fe59..da6778be79c 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -877,11 +877,18 @@ public: /// Для IBlockInputStream. String getID() const; + /// Для внешней агрегации. void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows); - bool hasTemporaryFiles() const { return !temporary_files.empty(); } + bool hasTemporaryFiles() const { return !temporary_files.files.empty(); } - using TemporaryFiles = std::vector>; + struct TemporaryFiles + { + std::vector> files; + size_t sum_size_uncompressed = 0; + size_t sum_size_compressed = 0; + std::mutex mutex; + }; const TemporaryFiles & getTemporaryFiles() const { return temporary_files; } @@ -951,7 +958,6 @@ protected: /// Для внешней агрегации. TemporaryFiles temporary_files; - std::mutex temporary_files_mutex; /** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов. * Сформировать блок - пример результата. diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 175f7dc6e32..d7ff0fe4146 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -37,13 +37,16 @@ Block AggregatingBlockInputStream::readImpl() const auto & files = aggregator.getTemporaryFiles(); BlockInputStreams input_streams; - for (const auto & file : files) + for (const auto & file : files.files) { temporary_inputs.emplace_back(new TemporaryFileStream(file->path())); input_streams.emplace_back(temporary_inputs.back()->block_in); } - LOG_TRACE(log, "Will merge " << files.size() << " temporary files."); + LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " + << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " + << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); } } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index a479ed30c84..3fb4ea3e842 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -795,15 +795,17 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si compressed_buf.next(); file_buf.next(); - { - std::lock_guard lock(temporary_files_mutex); - temporary_files.emplace_back(std::move(file)); - } - double elapsed_seconds = watch.elapsedSeconds(); double compressed_bytes = file_buf.count(); double uncompressed_bytes = compressed_buf.count(); + { + std::lock_guard lock(temporary_files.mutex); + temporary_files.files.emplace_back(std::move(file)); + temporary_files.sum_size_uncompressed += uncompressed_bytes; + temporary_files.sum_size_compressed += compressed_bytes; + } + LOG_TRACE(log, std::fixed << std::setprecision(3) << "Written part in " << elapsed_seconds << " sec., " << rows << " rows, " @@ -857,6 +859,9 @@ void Aggregator::writeToTemporaryFileImpl( max_temporary_block_size_bytes = block_size_bytes; } + /// data_variants не будет уничтожать состояния агрегатных функций в деструкторе. Теперь состояниями владеют ColumnAggregateFunction. + data_variants.aggregator = nullptr; + LOG_TRACE(log, std::fixed << std::setprecision(3) << "Max size of temporary block: " << max_temporary_block_size_rows << " rows, " << (max_temporary_block_size_bytes / 1048576.0) << " MiB."); diff --git a/dbms/tests/queries/0_stateless/00284_external_aggregation.reference b/dbms/tests/queries/0_stateless/00284_external_aggregation.reference new file mode 100644 index 00000000000..48e30e781e0 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00284_external_aggregation.reference @@ -0,0 +1,2 @@ +49999995000000 10000000 +499999500000 1000000 15 diff --git a/dbms/tests/queries/0_stateless/00284_external_aggregation.sql b/dbms/tests/queries/0_stateless/00284_external_aggregation.sql new file mode 100644 index 00000000000..0595b81a0f7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00284_external_aggregation.sql @@ -0,0 +1,5 @@ +SET max_bytes_before_external_group_by = 100000000; +SET max_memory_usage = 200000000; + +SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k); +SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k); From 7c753e1403aad492335af4cd2a93d01ac30063ce Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 20:08:33 +0300 Subject: [PATCH 11/14] dbms: fixed build [#METR-17000]. --- dbms/src/DataStreams/tests/aggregating_stream.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/DataStreams/tests/aggregating_stream.cpp b/dbms/src/DataStreams/tests/aggregating_stream.cpp index 7deea2865ab..79a52db5c12 100644 --- a/dbms/src/DataStreams/tests/aggregating_stream.cpp +++ b/dbms/src/DataStreams/tests/aggregating_stream.cpp @@ -91,9 +91,10 @@ int main(int argc, char ** argv) sample.insert(col); } + Aggregator::Params params(key_column_names, aggregate_descriptions, false); + BlockInputStreamPtr stream = new OneBlockInputStream(block); - stream = new AggregatingBlockInputStream(stream, key_column_names, aggregate_descriptions, false, true, - 0, OverflowMode::THROW, nullptr, 0, 0); + stream = new AggregatingBlockInputStream(stream, params, true); WriteBufferFromOStream ob(std::cout); RowOutputStreamPtr row_out = new TabSeparatedRowOutputStream(ob, sample); From b04fc9bdf0b2fb9196f614865d2be6acd8c8a705 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 1 Dec 2015 20:10:24 +0300 Subject: [PATCH 12/14] dbms: fixed build [#METR-17000]. --- dbms/src/Interpreters/tests/aggregate.cpp | 39 ++++++++++++----------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/dbms/src/Interpreters/tests/aggregate.cpp b/dbms/src/Interpreters/tests/aggregate.cpp index 26ed509c27e..12df0dae7de 100644 --- a/dbms/src/Interpreters/tests/aggregate.cpp +++ b/dbms/src/Interpreters/tests/aggregate.cpp @@ -18,16 +18,18 @@ int main(int argc, char ** argv) { + using namespace DB; + try { size_t n = argc == 2 ? atoi(argv[1]) : 10; - DB::Block block; + Block block; - DB::ColumnWithTypeAndName column_x; + ColumnWithTypeAndName column_x; column_x.name = "x"; - column_x.type = new DB::DataTypeInt16; - DB::ColumnInt16 * x = new DB::ColumnInt16; + column_x.type = new DataTypeInt16; + ColumnInt16 * x = new ColumnInt16; column_x.column = x; auto & vec_x = x->getData(); @@ -39,41 +41,42 @@ int main(int argc, char ** argv) const char * strings[] = {"abc", "def", "abcd", "defg", "ac"}; - DB::ColumnWithTypeAndName column_s1; + ColumnWithTypeAndName column_s1; column_s1.name = "s1"; - column_s1.type = new DB::DataTypeString; - column_s1.column = new DB::ColumnString; + column_s1.type = new DataTypeString; + column_s1.column = new ColumnString; for (size_t i = 0; i < n; ++i) column_s1.column->insert(std::string(strings[i % 5])); block.insert(column_s1); - DB::ColumnWithTypeAndName column_s2; + ColumnWithTypeAndName column_s2; column_s2.name = "s2"; - column_s2.type = new DB::DataTypeString; - column_s2.column = new DB::ColumnString; + column_s2.type = new DataTypeString; + column_s2.column = new ColumnString; for (size_t i = 0; i < n; ++i) column_s2.column->insert(std::string(strings[i % 3])); block.insert(column_s2); - DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block); - DB::AggregatedDataVariants aggregated_data_variants; + BlockInputStreamPtr stream = new OneBlockInputStream(block); + AggregatedDataVariants aggregated_data_variants; - DB::Names key_column_names; + Names key_column_names; key_column_names.emplace_back("x"); key_column_names.emplace_back("s1"); - DB::AggregateFunctionFactory factory; + AggregateFunctionFactory factory; - DB::AggregateDescriptions aggregate_descriptions(1); + AggregateDescriptions aggregate_descriptions(1); - DB::DataTypes empty_list_of_types; + DataTypes empty_list_of_types; aggregate_descriptions[0].function = factory.get("count", empty_list_of_types); - DB::Aggregator aggregator(key_column_names, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0, 0); + Aggregator::Params params(key_column_names, aggregate_descriptions, false); + Aggregator aggregator(params); { Poco::Stopwatch stopwatch; @@ -88,7 +91,7 @@ int main(int argc, char ** argv) << std::endl; } } - catch (const DB::Exception & e) + catch (const Exception & e) { std::cerr << e.displayText() << std::endl; } From d76d57dda2fa7912ecfb227bd7a3fe72a7746942 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 2 Dec 2015 00:20:14 +0300 Subject: [PATCH 13/14] dbms: better [#METR-17000]. --- .../ParallelAggregatingBlockInputStream.h | 31 ++++++++++++++----- .../DB/DataStreams/ParallelInputsProcessor.h | 7 +++++ .../DB/DataStreams/UnionBlockInputStream.h | 4 +++ dbms/include/DB/Interpreters/Aggregator.h | 10 ++++-- dbms/src/Interpreters/Aggregator.cpp | 2 +- 5 files changed, 43 insertions(+), 11 deletions(-) diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index d2da7a2b6f3..55ee11b50a0 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -102,14 +102,6 @@ protected: * то читаем и мерджим их, расходуя минимальное количество памяти. */ - /// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще. NOTE Это можно делать параллельно. - for (AggregatedDataVariantsPtr & data : many_data) - { - size_t rows = data->sizeWithoutOverflowRow(); - if (rows) - aggregator.writeToTemporaryFile(*data, rows); - } - const auto & files = aggregator.getTemporaryFiles(); BlockInputStreams input_streams; for (const auto & file : files.files) @@ -211,8 +203,31 @@ private: parent.threads_data[thread_num].src_bytes += block.bytes(); } + void onFinishThread(size_t thread_num) + { + if (parent.aggregator.hasTemporaryFiles()) + { + /// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще их потом объединять. + auto & data = *parent.many_data[thread_num]; + size_t rows = data.sizeWithoutOverflowRow(); + if (rows) + parent.aggregator.writeToTemporaryFile(data, rows); + } + } + void onFinish() { + if (parent.aggregator.hasTemporaryFiles()) + { + /// Может так получиться, что какие-то данные ещё не сброшены на диск, + /// потому что во время вызова onFinishThread ещё никакие данные не были сброшены на диск, а потом какие-то - были. + for (auto & data : parent.many_data) + { + size_t rows = data->sizeWithoutOverflowRow(); + if (rows) + parent.aggregator.writeToTemporaryFile(*data, rows); + } + } } void onException(std::exception_ptr & exception, size_t thread_num) diff --git a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h index f945aa37a23..773d7080e3e 100644 --- a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h +++ b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h @@ -43,6 +43,11 @@ struct ParallelInputsHandler /// Обработка блока данных + дополнительных информаций. void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num) {} + /// Вызывается для каждого потока, когда потоку стало больше нечего делать. + /// Из-за того, что иссякла часть источников, и сейчас источников осталось меньше, чем потоков. + /// Вызывается, если метод onException не кидает исключение; вызывается до метода onFinish. + void onFinishThread(size_t thread_num) {} + /// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы. /// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение. void onFinish() {} @@ -182,6 +187,8 @@ private: handler.onException(exception, thread_num); } + handler.onFinishThread(thread_num); + /// Последний поток при выходе сообщает, что данных больше нет. if (0 == --active_threads) { diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index e16c2d780bb..f539d0d413d 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -271,6 +271,10 @@ private: parent.output_queue.push(Payload()); } + void onFinishThread(size_t thread_num) + { + } + void onException(std::exception_ptr & exception, size_t thread_num) { //std::cerr << "pushing exception\n"; diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index da6778be79c..da5f4bafd31 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -880,14 +880,20 @@ public: /// Для внешней агрегации. void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows); - bool hasTemporaryFiles() const { return !temporary_files.files.empty(); } + bool hasTemporaryFiles() const { return !temporary_files.empty(); } struct TemporaryFiles { std::vector> files; size_t sum_size_uncompressed = 0; size_t sum_size_compressed = 0; - std::mutex mutex; + mutable std::mutex mutex; + + bool empty() const + { + std::lock_guard lock(mutex); + return files.empty(); + } }; const TemporaryFiles & getTemporaryFiles() const { return temporary_files; } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 3fb4ea3e842..34858418ad7 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -2087,7 +2087,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) result.keys_size = params.keys_size; result.key_sizes = key_sizes; - LOG_TRACE(log, "Merging partially aggregated blocks."); + LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << blocks.front().info.bucket_num << ")."); for (Block & block : blocks) { From 99d7aa59b83bf6286ad90b6328217112d1ed4ea4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 2 Dec 2015 01:35:48 +0300 Subject: [PATCH 14/14] dbms: allowed to merge partial-aggregated streams memory-efficient in parallel [#METR-17000]. --- ...ggregatedMemoryEfficientBlockInputStream.h | 40 ++++- .../ParallelAggregatingBlockInputStream.h | 2 +- .../AggregatingBlockInputStream.cpp | 2 +- ...regatedMemoryEfficientBlockInputStream.cpp | 142 +++++++++++++++--- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- 5 files changed, 159 insertions(+), 29 deletions(-) diff --git a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index 472123385c7..6f738ac7d84 100644 --- a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -1,7 +1,9 @@ #pragma once +#include #include #include +#include namespace DB @@ -19,17 +21,14 @@ namespace DB * удалённых серверов делаются последовательно, при этом, чтение упирается в CPU. * Это несложно исправить. * - * Также, чтения и вычисления (слияние состояний) делаются по очереди. - * Есть возможность делать чтения асинхронно - при этом будет расходоваться в два раза больше памяти, но всё-равно немного. - * Это можно сделать с помощью UnionBlockInputStream. - * * Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж. * При этом будет расходоваться кратно больше оперативки. */ class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream { public: - MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Aggregator::Params & params, bool final_); + MergingAggregatedMemoryEfficientBlockInputStream( + BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_); String getName() const override { return "MergingAggregatedMemoryEfficient"; } @@ -41,6 +40,7 @@ protected: private: Aggregator aggregator; bool final; + size_t threads; bool started = false; bool has_two_level = false; @@ -59,6 +59,36 @@ private: }; std::vector inputs; + + using BlocksToMerge = Poco::SharedPtr; + + /// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках. + BlocksToMerge getNextBlocksToMerge(); + + /// Для параллельного мерджа. + struct OutputData + { + Block block; + std::exception_ptr exception; + + OutputData() {} + OutputData(Block && block_) : block(std::move(block_)) {} + OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {} + }; + + struct ParallelMergeData + { + boost::threadpool::pool pool; + std::mutex get_next_blocks_mutex; + ConcurrentBoundedQueue result_queue; + bool exhausted = false; + + ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads) {} + }; + + std::unique_ptr parallel_merge_data; + + void mergeThread(MemoryTracker * memory_tracker); }; } diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 55ee11b50a0..f1af91a681f 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -114,7 +114,7 @@ protected: << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); - impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, max_threads)); } } diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index d7ff0fe4146..7be810aa9db 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -47,7 +47,7 @@ Block AggregatingBlockInputStream::readImpl() << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); - impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final)); + impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, 1)); } } diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index cc00f788915..e046ad83733 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -1,3 +1,5 @@ +#include +#include #include @@ -6,8 +8,8 @@ namespace DB MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream( - BlockInputStreams inputs_, const Aggregator::Params & params, bool final_) - : aggregator(params), final(final_), inputs(inputs_.begin(), inputs_.end()) + BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_) + : aggregator(params), final(final_), threads(threads_), inputs(inputs_.begin(), inputs_.end()) { children = inputs_; } @@ -24,17 +26,115 @@ String MergingAggregatedMemoryEfficientBlockInputStream::getID() const Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() { - /// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления. - /** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно. - * И отправляет запрос последовательно. Это медленно. - */ - if (!started) + if (threads == 1) { - started = true; - for (auto & child : children) - child->readPrefix(); - } + /// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления. + /** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно. + * И отправляет запрос последовательно. Это медленно. + */ + if (!started) + { + started = true; + for (auto & child : children) + child->readPrefix(); + } + if (BlocksToMerge blocks_to_merge = getNextBlocksToMerge()) + return aggregator.mergeBlocks(*blocks_to_merge, final); + return {}; + } + else + { + /** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа, + * затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты. + */ + + if (!parallel_merge_data) + { + parallel_merge_data.reset(new ParallelMergeData(threads)); + + auto & pool = parallel_merge_data->pool; + + /** Если child - RemoteBlockInputStream, то соединения и отправку запроса тоже будем делать параллельно. + */ + started = true; + size_t num_children = children.size(); + std::vector> tasks(num_children); + for (size_t i = 0; i < num_children; ++i) + { + auto & child = children[i]; + auto & task = tasks[i]; + + task = std::packaged_task([&child] { child->readPrefix(); }); + pool.schedule([&task] { task(); }); + } + + pool.wait(); + for (auto & task : tasks) + task.get_future().get(); + + /** Создаём потоки, которые будут получать и мерджить данные. + */ + + for (size_t i = 0; i < threads; ++i) + pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread, + this, current_memory_tracker)); + } + + OutputData res; + parallel_merge_data->result_queue.pop(res); + + if (res.exception) + std::rethrow_exception(res.exception); + + if (!res.block) + parallel_merge_data->pool.wait(); + + return res.block; + } +} + + +void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker * memory_tracker) +{ + setThreadName("MrgAggMemEffThr"); + current_memory_tracker = memory_tracker; + + try + { + while (true) + { + /// Получение следующих блоков делается последовательно, а мердж - параллельно. + BlocksToMerge blocks_to_merge; + + { + std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); + + if (parallel_merge_data->exhausted) + break; + + blocks_to_merge = getNextBlocksToMerge(); + + if (!blocks_to_merge) + { + parallel_merge_data->exhausted = true; + parallel_merge_data->result_queue.push(Block()); + break; + } + } + + parallel_merge_data->result_queue.push(aggregator.mergeBlocks(*blocks_to_merge, final)); + } + } + catch (...) + { + parallel_merge_data->result_queue.push(std::current_exception()); + } +} + + +MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregatedMemoryEfficientBlockInputStream::getNextBlocksToMerge() +{ /** Имеем несколько источников. * Из каждого из них могут приходить следующие данные: * @@ -120,13 +220,13 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() // std::cerr << "merging overflows\n"; has_overflows = false; - BlocksList blocks_to_merge; + BlocksToMerge blocks_to_merge = new BlocksList; for (auto & input : inputs) if (input.overflow_block) - blocks_to_merge.emplace_back(std::move(input.overflow_block)); + blocks_to_merge->emplace_back(std::move(input.overflow_block)); - return aggregator.mergeBlocks(blocks_to_merge, final); + return blocks_to_merge; } else return {}; @@ -180,7 +280,7 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() continue; /// Теперь собираем блоки для current_bucket_num, чтобы их померджить. - BlocksList blocks_to_merge; + BlocksToMerge blocks_to_merge = new BlocksList; for (auto & input : inputs) { @@ -188,33 +288,33 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() { // std::cerr << "having block for current_bucket_num\n"; - blocks_to_merge.emplace_back(std::move(input.block)); + blocks_to_merge->emplace_back(std::move(input.block)); input.block = Block(); } else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num]) { // std::cerr << "having splitted data for bucket\n"; - blocks_to_merge.emplace_back(std::move(input.splitted_blocks[min_bucket_num])); + blocks_to_merge->emplace_back(std::move(input.splitted_blocks[min_bucket_num])); input.splitted_blocks[min_bucket_num] = Block(); } } - return aggregator.mergeBlocks(blocks_to_merge, final); + return blocks_to_merge; } else { /// Есть только одноуровневые данные. Просто мерджим их. // std::cerr << "don't have two level\n"; - BlocksList blocks_to_merge; + BlocksToMerge blocks_to_merge = new BlocksList; for (auto & input : inputs) if (input.block) - blocks_to_merge.emplace_back(std::move(input.block)); + blocks_to_merge->emplace_back(std::move(input.block)); current_bucket_num = NUM_BUCKETS; - return aggregator.mergeBlocks(blocks_to_merge, final); + return blocks_to_merge; } } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index c4e6cdfd035..de0a7d865ab 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -924,7 +924,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina } else { - streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, params, final); + streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, params, final, original_max_threads); streams.resize(1); } }