From 8b2aabed53262b109b77e0a9e634835139e118ee Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 21 Aug 2012 18:34:55 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- .../DataStreams/narrowBlockInputStreams.cpp | 30 ++++++++----------- dbms/src/Interpreters/Aggregator.cpp | 30 +++++++++---------- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/dbms/src/DataStreams/narrowBlockInputStreams.cpp b/dbms/src/DataStreams/narrowBlockInputStreams.cpp index cde521aa6c3..cd4d4d00caf 100644 --- a/dbms/src/DataStreams/narrowBlockInputStreams.cpp +++ b/dbms/src/DataStreams/narrowBlockInputStreams.cpp @@ -4,30 +4,24 @@ namespace DB { -/** Если количество источников inputs больше width, - * то клеит источники друг с другом (с помощью ConcatBlockInputStream), - * чтобы количество источников стало не больше width. - * - * Старается клеить источники друг с другом равномерно. - */ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width) { - if (inputs.size() <= width) + size_t size = inputs.size(); + if (size <= width) return inputs; std::vector partitions(width); - /** Для распределения, используем генератор случайных чисел, - * но не инициализируем его, чтобы результат был детерминированным. - */ - drand48_data rand_buf = {{0}}; - long rand_res = 0; - - for (size_t i = 0, size = inputs.size(); i < size; ++i) - { - lrand48_r(&rand_buf, &rand_res); - partitions[rand_res % width].push_back(inputs[i]); /// Теоретически, не слишком хорошо. - } + typedef std::vector Distribution; + Distribution distribution(size); + + for (size_t i = 0; i < size; ++i) + distribution[i] = i % width; + + std::random_shuffle(distribution.begin(), distribution.end()); + + for (size_t i = 0; i < size; ++i) + partitions[distribution[i]].push_back(inputs[i]); BlockInputStreams res(width); for (size_t i = 0; i < width; ++i) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 80fdaffc57e..7f0f8693721 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -593,7 +593,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va LOG_TRACE(log, "Merging aggregated data"); - AggregatedDataVariants & res = *data_variants[0]; + AggregatedDataVariantsPtr res = data_variants[0]; /// Все результаты агрегации соединяем с первым. for (size_t i = 1, size = data_variants.size(); i < size; ++i) @@ -603,19 +603,19 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va if (current.empty()) continue; - if (res.empty()) + if (res->empty()) { - data_variants[0] = data_variants[i]; + res = data_variants[i]; continue; } - if (res.type != current.type) + if (res->type != current.type) throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS); /// В какой структуре данных агрегированы данные? - if (res.type == AggregatedDataVariants::WITHOUT_KEY) + if (res->type == AggregatedDataVariants::WITHOUT_KEY) { - AggregatedDataWithoutKey & res_data = res.without_key; + AggregatedDataWithoutKey & res_data = res->without_key; AggregatedDataWithoutKey & current_data = current.without_key; size_t i = 0; @@ -625,9 +625,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va delete *jt; } } - else if (res.type == AggregatedDataVariants::KEY_64) + else if (res->type == AggregatedDataVariants::KEY_64) { - AggregatedDataWithUInt64Key & res_data = res.key64; + AggregatedDataWithUInt64Key & res_data = res->key64; AggregatedDataWithUInt64Key & current_data = current.key64; for (AggregatedDataWithUInt64Key::const_iterator it = current_data.begin(); it != current_data.end(); ++it) @@ -649,9 +649,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va res_it->second = it->second; } } - else if (res.type == AggregatedDataVariants::KEY_STRING) + else if (res->type == AggregatedDataVariants::KEY_STRING) { - AggregatedDataWithStringKey & res_data = res.key_string; + AggregatedDataWithStringKey & res_data = res->key_string; AggregatedDataWithStringKey & current_data = current.key_string; for (AggregatedDataWithStringKey::const_iterator it = current_data.begin(); it != current_data.end(); ++it) @@ -673,9 +673,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va res_it->second = it->second; } } - else if (res.type == AggregatedDataVariants::HASHED) + else if (res->type == AggregatedDataVariants::HASHED) { - AggregatedDataHashed & res_data = res.hashed; + AggregatedDataHashed & res_data = res->hashed; AggregatedDataHashed & current_data = current.hashed; for (AggregatedDataHashed::const_iterator it = current_data.begin(); it != current_data.end(); ++it) @@ -697,9 +697,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va res_it->second = it->second; } } - else if (res.type == AggregatedDataVariants::GENERIC) + else if (res->type == AggregatedDataVariants::GENERIC) { - AggregatedData & res_data = res.generic; + AggregatedData & res_data = res->generic; AggregatedData & current_data = current.generic; for (AggregatedData::const_iterator it = current_data.begin(); it != current_data.end(); ++it) @@ -724,7 +724,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va LOG_TRACE(log, "Merged aggregated data"); - return data_variants[0]; + return res; }