dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-08-21 18:34:55 +00:00
parent c25456d09c
commit 8b2aabed53
2 changed files with 27 additions and 33 deletions

View File

@ -4,30 +4,24 @@
namespace DB
{
/** Если количество источников inputs больше width,
* то клеит источники друг с другом (с помощью ConcatBlockInputStream),
* чтобы количество источников стало не больше width.
*
* Старается клеить источники друг с другом равномерно.
*/
BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width)
{
if (inputs.size() <= width)
size_t size = inputs.size();
if (size <= width)
return inputs;
std::vector<BlockInputStreams> partitions(width);
/** Для распределения, используем генератор случайных чисел,
* но не инициализируем его, чтобы результат был детерминированным.
*/
drand48_data rand_buf = {{0}};
long rand_res = 0;
for (size_t i = 0, size = inputs.size(); i < size; ++i)
{
lrand48_r(&rand_buf, &rand_res);
partitions[rand_res % width].push_back(inputs[i]); /// Теоретически, не слишком хорошо.
}
typedef std::vector<size_t> Distribution;
Distribution distribution(size);
for (size_t i = 0; i < size; ++i)
distribution[i] = i % width;
std::random_shuffle(distribution.begin(), distribution.end());
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].push_back(inputs[i]);
BlockInputStreams res(width);
for (size_t i = 0; i < width; ++i)

View File

@ -593,7 +593,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
LOG_TRACE(log, "Merging aggregated data");
AggregatedDataVariants & res = *data_variants[0];
AggregatedDataVariantsPtr res = data_variants[0];
/// Все результаты агрегации соединяем с первым.
for (size_t i = 1, size = data_variants.size(); i < size; ++i)
@ -603,19 +603,19 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
if (current.empty())
continue;
if (res.empty())
if (res->empty())
{
data_variants[0] = data_variants[i];
res = data_variants[i];
continue;
}
if (res.type != current.type)
if (res->type != current.type)
throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
/// В какой структуре данных агрегированы данные?
if (res.type == AggregatedDataVariants::WITHOUT_KEY)
if (res->type == AggregatedDataVariants::WITHOUT_KEY)
{
AggregatedDataWithoutKey & res_data = res.without_key;
AggregatedDataWithoutKey & res_data = res->without_key;
AggregatedDataWithoutKey & current_data = current.without_key;
size_t i = 0;
@ -625,9 +625,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
delete *jt;
}
}
else if (res.type == AggregatedDataVariants::KEY_64)
else if (res->type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res_data = res.key64;
AggregatedDataWithUInt64Key & res_data = res->key64;
AggregatedDataWithUInt64Key & current_data = current.key64;
for (AggregatedDataWithUInt64Key::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
@ -649,9 +649,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
res_it->second = it->second;
}
}
else if (res.type == AggregatedDataVariants::KEY_STRING)
else if (res->type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res_data = res.key_string;
AggregatedDataWithStringKey & res_data = res->key_string;
AggregatedDataWithStringKey & current_data = current.key_string;
for (AggregatedDataWithStringKey::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
@ -673,9 +673,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
res_it->second = it->second;
}
}
else if (res.type == AggregatedDataVariants::HASHED)
else if (res->type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & res_data = res.hashed;
AggregatedDataHashed & res_data = res->hashed;
AggregatedDataHashed & current_data = current.hashed;
for (AggregatedDataHashed::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
@ -697,9 +697,9 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
res_it->second = it->second;
}
}
else if (res.type == AggregatedDataVariants::GENERIC)
else if (res->type == AggregatedDataVariants::GENERIC)
{
AggregatedData & res_data = res.generic;
AggregatedData & res_data = res->generic;
AggregatedData & current_data = current.generic;
for (AggregatedData::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
@ -724,7 +724,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
LOG_TRACE(log, "Merged aggregated data");
return data_variants[0];
return res;
}