From da1974bbf31bea68e151b02e310f347c93268129 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 25 Dec 2014 23:56:01 +0300 Subject: [PATCH] dbms: more scalable aggregator: development [#METR-2944]. --- .../ParallelAggregatingBlockInputStream.h | 2 +- dbms/include/DB/Interpreters/Aggregator.h | 2 +- dbms/src/Interpreters/Aggregator.cpp | 24 +++++++++++++------ 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index d2625f7da3c..dcfe7ba7d20 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -124,7 +124,7 @@ protected: << " in " << elapsed_seconds << " sec." << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); - AggregatedDataVariantsPtr res = aggregator->merge(many_data); + AggregatedDataVariantsPtr res = aggregator->merge(many_data, max_threads); if (isCancelled()) return Block(); diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 87fd71a17e0..1d732c1e049 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -584,7 +584,7 @@ public: * После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить, пока не будет вызвана функция convertToBlock. * Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации. */ - AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants); + AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads); /** Объединить несколько агрегированных блоков в одну структуру данных. * (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index f56126cd851..d6e33986a3a 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -766,7 +766,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi } -AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants) +AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads) { if (data_variants.empty()) throw Exception("Empty data passed to Aggregator::merge().", ErrorCodes::EMPTY_DATA_PASSED); @@ -805,22 +805,32 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va if (res->type == AggregatedDataVariants::WITHOUT_KEY || overflow_row) mergeWithoutKeyDataImpl(non_empty_data); + boost::threadpool::pool * thread_pool = nullptr; + if (max_threads > 1 && rows > 100000 /// TODO Сделать настраиваемый порог. + && ( res->type == AggregatedDataVariants::KEY_32 + || res->type == AggregatedDataVariants::KEY_64 + || res->type == AggregatedDataVariants::KEY_STRING + || res->type == AggregatedDataVariants::KEY_FIXED_STRING + || res->type == AggregatedDataVariants::KEYS_128 + || res->type == AggregatedDataVariants::HASHED)) + thread_pool = new boost::threadpool::pool(max_threads); + if (res->type == AggregatedDataVariants::KEY_8) mergeSingleLevelDataImpl>(non_empty_data); else if (res->type == AggregatedDataVariants::KEY_16) mergeSingleLevelDataImpl>(non_empty_data); else if (res->type == AggregatedDataVariants::KEY_32) - mergeTwoLevelDataImpl>(non_empty_data, nullptr); + mergeTwoLevelDataImpl>(non_empty_data, thread_pool); else if (res->type == AggregatedDataVariants::KEY_64) - mergeTwoLevelDataImpl>(non_empty_data, nullptr); + mergeTwoLevelDataImpl>(non_empty_data, thread_pool); else if (res->type == AggregatedDataVariants::KEY_STRING) - mergeTwoLevelDataImpl(non_empty_data, nullptr); + mergeTwoLevelDataImpl(non_empty_data, thread_pool); else if (res->type == AggregatedDataVariants::KEY_FIXED_STRING) - mergeTwoLevelDataImpl(non_empty_data, nullptr); + mergeTwoLevelDataImpl(non_empty_data, thread_pool); else if (res->type == AggregatedDataVariants::KEYS_128) - mergeTwoLevelDataImpl(non_empty_data, nullptr); + mergeTwoLevelDataImpl(non_empty_data, thread_pool); else if (res->type == AggregatedDataVariants::HASHED) - mergeTwoLevelDataImpl(non_empty_data, nullptr); + mergeTwoLevelDataImpl(non_empty_data, thread_pool); else if (res->type != AggregatedDataVariants::WITHOUT_KEY) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);