diff --git a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h new file mode 100644 index 00000000000..cbe117dbcdb --- /dev/null +++ b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -0,0 +1,110 @@ +#pragma once + +#include +#include + + +namespace DB +{ + + +/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному блоку из каждого потока. + * Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата. + * + * Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить. + */ +class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream +{ +public: + MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_, + const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_) + : aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0), + final(final_) + { + children = inputs_; + current_blocks.resize(children.size()); + overflow_blocks.resize(children.size()); + is_exhausted.resize(children.size()); + } + + String getName() const override { return "MergingAggregatedMemorySavvy"; } + + String getID() const override + { + std::stringstream res; + res << "MergingAggregatedMemorySavvy(" << aggregator.getID(); + for (size_t i = 0, size = children.size(); i < size; ++i) + res << ", " << children.back()->getID(); + res << ")"; + return res.str(); + } + +protected: + Block readImpl() override + { + /// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления. + if (current_bucket_num == -1) + for (auto & child : children) + child->readPrefix(); + + /// Всё прочитали. + if (current_bucket_num > 255) + return {}; + + /// Читаем следующие блоки для current_bucket_num + for (size_t i = 0, size = children.size(); i < size; ++i) + { + while (!is_exhausted[i] && (!current_blocks[i] || current_blocks[i].info.bucket_num < current_bucket_num)) + { + current_blocks[i] = children[i]->read(); + + if (!current_blocks[i]) + { + is_exhausted[i] = true; + } + else if (current_blocks[i].info.is_overflows) + { + overflow_blocks[i].swap(current_blocks[i]); + } + } + } + + /// Может быть, нет блоков для current_bucket_num, а все блоки имеют больший bucket_num. + Int32 min_bucket_num = 256; + for (size_t i = 0, size = children.size(); i < size; ++i) + if (!is_exhausted[i] && current_blocks[i].info.bucket_num < min_bucket_num) + min_bucket_num = current_blocks[i].info.bucket_num; + + current_bucket_num = min_bucket_num; + + /// Все потоки исчерпаны. + if (current_bucket_num > 255) + return {}; /// TODO overflow_blocks. + + /// TODO Если есть single_level и two_level блоки. + + /// Объединяем все блоки с current_bucket_num. + + BlocksList blocks_to_merge; + for (size_t i = 0, size = children.size(); i < size; ++i) + if (current_blocks[i].info.bucket_num == current_bucket_num) + blocks_to_merge.emplace_back(std::move(current_blocks[i])); + + Block res = aggregator.mergeBlocks(blocks_to_merge, final); + + ++current_bucket_num; + return res; + } + +private: + Aggregator aggregator; + bool final; + + Int32 current_bucket_num = -1; + std::vector current_blocks; + std::vector is_exhausted; + + std::vector overflow_blocks; +}; + +} diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 02ac9c225f4..da58b0b158f 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -111,6 +111,14 @@ public: parallel_replicas->disconnect(); } + + /// Отправляет запрос (инициирует вычисления) раньше, чем read. + void readPrefix() override + { + if (!sent_query) + sendQuery(); + } + protected: /// Отправить на удаленные реплики все временные таблицы void sendExternalTables() @@ -147,19 +155,10 @@ protected: { if (!sent_query) { - createParallelReplicas(); + sendQuery(); if (settings.skip_unavailable_shards && 0 == parallel_replicas->size()) - return Block(); - - established = true; - - parallel_replicas->sendQuery(query, "", stage, true); - - established = false; - sent_query = true; - - sendExternalTables(); + return {}; } while (true) @@ -280,6 +279,23 @@ protected: } private: + void sendQuery() + { + createParallelReplicas(); + + if (settings.skip_unavailable_shards && 0 == parallel_replicas->size()) + return; + + established = true; + + parallel_replicas->sendQuery(query, "", stage, true); + + established = false; + sent_query = true; + + sendExternalTables(); + } + /// ITable::read requires a Context, therefore we should create one if the user can't supply it static Context & getDefaultContext() { diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 5459c06b82b..65fbdce1900 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -674,17 +674,6 @@ typedef SharedPtr AggregatedDataVariantsPtr; typedef std::vector ManyAggregatedDataVariants; -/** Достать вариант агрегации по его типу. */ -template Method & getDataVariant(AggregatedDataVariants & variants); - -#define M(NAME, IS_TWO_LEVEL) \ - template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant(AggregatedDataVariants & variants) { return *variants.NAME; } - -APPLY_FOR_AGGREGATED_VARIANTS(M) - -#undef M - - /** Агрегирует источник блоков. */ class Aggregator @@ -733,11 +722,15 @@ public: */ AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads); - /** Объединить несколько агрегированных блоков в одну структуру данных. + /** Объединить поток частично агрегированных блоков в одну структуру данных. * (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.) */ void mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants & result, size_t max_threads); + /** Объединить несколько частично агрегированных блоков в один. + */ + Block mergeBlocks(BlocksList & blocks, bool final); + using CancellationHook = std::function; /** Установить функцию, которая проверяет, можно ли прервать текущую задачу. @@ -974,4 +967,15 @@ protected: }; +/** Достать вариант агрегации по его типу. */ +template Method & getDataVariant(AggregatedDataVariants & variants); + +#define M(NAME, IS_TWO_LEVEL) \ + template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant(AggregatedDataVariants & variants) { return *variants.NAME; } + +APPLY_FOR_AGGREGATED_VARIANTS(M) + +#undef M + + } diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 2bc4c0bca4a..36d4fc9db6a 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -91,6 +91,8 @@ struct Settings M(SettingUInt64, min_count_to_compile, 3) \ /** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - никогда не использовать. */ \ M(SettingUInt64, group_by_two_level_threshold, 100000) \ + /** Включён ли экономный по памяти режим распределённой агрегации. */ \ + M(SettingBool, distributed_aggregation_memory_efficient, false) \ \ /** Максимальное количество используемых реплик каждого шарда при выполнении запроса */ \ M(SettingUInt64, max_parallel_replicas, 1) \ diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index fa164d12a18..e51fe1141d6 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -141,9 +141,12 @@ void Block::insertUnique(const ColumnWithTypeAndName & elem) void Block::erase(size_t position) { + if (index_by_position.empty()) + throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND); + if (position >= index_by_position.size()) throw Exception("Position out of bound in Block::erase(), max position = " - + toString(index_by_position.size()), ErrorCodes::POSITION_OUT_OF_BOUND); + + toString(index_by_position.size() - 1), ErrorCodes::POSITION_OUT_OF_BOUND); Container_t::iterator it = index_by_position[position]; index_by_name.erase(index_by_name.find(it->name)); @@ -177,6 +180,9 @@ void Block::erase(const String & name) ColumnWithTypeAndName & Block::getByPosition(size_t position) { + if (index_by_position.empty()) + throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND); + if (position >= index_by_position.size()) throw Exception("Position " + toString(position) + " is out of bound in Block::getByPosition(), max position = " @@ -189,6 +195,9 @@ ColumnWithTypeAndName & Block::getByPosition(size_t position) const ColumnWithTypeAndName & Block::getByPosition(size_t position) const { + if (index_by_position.empty()) + throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND); + if (position >= index_by_position.size()) throw Exception("Position " + toString(position) + " is out of bound in Block::getByPosition(), max position = " diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 74bffca822a..f21656a54de 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -1688,6 +1689,66 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants } +Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) +{ + if (blocks.empty()) + return {}; + + StringRefs key(keys_size); + ConstColumnPlainPtrs key_columns(keys_size); + + AggregateColumnsData aggregate_columns(aggregates_size); + + initialize(blocks.front()); + + /// Каким способом выполнять агрегацию? + for (size_t i = 0; i < keys_size; ++i) + key_columns[i] = sample.getByPosition(i).column; + + Sizes key_sizes; + AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes); + + /// Временные данные для агрегации. + AggregatedDataVariants result; + + /// result будет уничтожать состояния агрегатных функций в деструкторе + result.aggregator = this; + + result.init(method); + result.keys_size = keys_size; + result.key_sizes = key_sizes; + + LOG_TRACE(log, "Merging partially aggregated blocks."); + + for (Block & block : blocks) + { + if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows) + mergeWithoutKeyStreamsImpl(block, result); + + #define M(NAME, IS_TWO_LEVEL) \ + else if (result.type == AggregatedDataVariants::Type::NAME) \ + mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data); + + APPLY_FOR_AGGREGATED_VARIANTS(M) + #undef M + else if (result.type != AggregatedDataVariants::Type::without_key) + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); + } + + BlocksList merged_block = convertToBlocks(result, final, 1); + + if (merged_block.size() > 1) /// TODO overflows + throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR); + + LOG_TRACE(log, "Merged partially aggregated blocks."); + + if (merged_block.empty()) + return {}; + + return merged_block.front(); +} + + template void NO_INLINE Aggregator::destroyImpl( Method & method) const @@ -1769,4 +1830,5 @@ void Aggregator::setCancellationHook(const CancellationHook cancellation_hook) isCancelled = cancellation_hook; } + } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index a6c66a015d3..6272434ee48 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -856,14 +857,38 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool final) { - /// Склеим несколько источников в один - executeUnion(); - - /// Теперь объединим агрегированные блоки Names key_names; AggregateDescriptions aggregates; query_analyzer->getAggregateInfo(key_names, aggregates); - streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads); + + /** Есть два режима распределённой агрегации. + * + * 1. В разных потоках читать из удалённых серверов блоки. + * Сохранить все блоки в оперативку. Объединить блоки. + * Если агрегация двухуровневая - распараллелить по номерам корзин. + * + * 2. В одном потоке читать по очереди блоки с разных серверов. + * В оперативке хранится только по одному блоку с каждого сервера. + * Если агрегация двухуровневая - последовательно объединяем блоки каждого следующего уровня. + * + * Второй вариант расходует меньше памяти (до 256 раз меньше) + * в случае двухуровневой агрегации, которая используется для больших результатов после GROUP BY, + * но при этом может работать медленнее. + */ + + if (!settings.distributed_aggregation_memory_efficient) + { + /// Склеим несколько источников в один, распараллеливая работу. + executeUnion(); + + /// Теперь объединим агрегированные блоки + streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads); + } + else + { + streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, key_names, aggregates, overflow_row, final); + streams.resize(1); + } }