dbms: more scalable aggregator: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-12-25 23:56:01 +03:00
parent cf3812532e
commit da1974bbf3
3 changed files with 19 additions and 9 deletions

View File

@ -124,7 +124,7 @@ protected:
<< " in " << elapsed_seconds << " sec."
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
AggregatedDataVariantsPtr res = aggregator->merge(many_data);
AggregatedDataVariantsPtr res = aggregator->merge(many_data, max_threads);
if (isCancelled())
return Block();

View File

@ -584,7 +584,7 @@ public:
* После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить, пока не будет вызвана функция convertToBlock.
* Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации.
*/
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants);
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads);
/** Объединить несколько агрегированных блоков в одну структуру данных.
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)

View File

@ -766,7 +766,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
}
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants)
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants, size_t max_threads)
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::merge().", ErrorCodes::EMPTY_DATA_PASSED);
@ -805,22 +805,32 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
if (res->type == AggregatedDataVariants::WITHOUT_KEY || overflow_row)
mergeWithoutKeyDataImpl(non_empty_data);
boost::threadpool::pool * thread_pool = nullptr;
if (max_threads > 1 && rows > 100000 /// TODO Сделать настраиваемый порог.
&& ( res->type == AggregatedDataVariants::KEY_32
|| res->type == AggregatedDataVariants::KEY_64
|| res->type == AggregatedDataVariants::KEY_STRING
|| res->type == AggregatedDataVariants::KEY_FIXED_STRING
|| res->type == AggregatedDataVariants::KEYS_128
|| res->type == AggregatedDataVariants::HASHED))
thread_pool = new boost::threadpool::pool(max_threads);
if (res->type == AggregatedDataVariants::KEY_8)
mergeSingleLevelDataImpl<AggregationMethodOneNumber<UInt8>>(non_empty_data);
else if (res->type == AggregatedDataVariants::KEY_16)
mergeSingleLevelDataImpl<AggregationMethodOneNumber<UInt16>>(non_empty_data);
else if (res->type == AggregatedDataVariants::KEY_32)
mergeTwoLevelDataImpl<AggregationMethodOneNumber<UInt32>>(non_empty_data, nullptr);
mergeTwoLevelDataImpl<AggregationMethodOneNumber<UInt32>>(non_empty_data, thread_pool);
else if (res->type == AggregatedDataVariants::KEY_64)
mergeTwoLevelDataImpl<AggregationMethodOneNumber<UInt64>>(non_empty_data, nullptr);
mergeTwoLevelDataImpl<AggregationMethodOneNumber<UInt64>>(non_empty_data, thread_pool);
else if (res->type == AggregatedDataVariants::KEY_STRING)
mergeTwoLevelDataImpl<AggregationMethodString>(non_empty_data, nullptr);
mergeTwoLevelDataImpl<AggregationMethodString>(non_empty_data, thread_pool);
else if (res->type == AggregatedDataVariants::KEY_FIXED_STRING)
mergeTwoLevelDataImpl<AggregationMethodFixedString>(non_empty_data, nullptr);
mergeTwoLevelDataImpl<AggregationMethodFixedString>(non_empty_data, thread_pool);
else if (res->type == AggregatedDataVariants::KEYS_128)
mergeTwoLevelDataImpl<AggregationMethodKeys128>(non_empty_data, nullptr);
mergeTwoLevelDataImpl<AggregationMethodKeys128>(non_empty_data, thread_pool);
else if (res->type == AggregatedDataVariants::HASHED)
mergeTwoLevelDataImpl<AggregationMethodHashed>(non_empty_data, nullptr);
mergeTwoLevelDataImpl<AggregationMethodHashed>(non_empty_data, thread_pool);
else if (res->type != AggregatedDataVariants::WITHOUT_KEY)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);