dbms: fixed error [#METR-14604].

This commit is contained in:
Alexey Milovidov 2015-01-17 07:49:13 +03:00
parent 25f003e464
commit 0c6dc2629e
2 changed files with 58 additions and 4 deletions

View File

@ -1454,6 +1454,8 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
result.keys_size = keys_size; result.keys_size = keys_size;
result.key_sizes = key_sizes; result.key_sizes = key_sizes;
bool has_blocks_with_unknown_bucket = bucket_to_blocks.count(-1);
/// Сначала параллельно мерджим для отдельных bucket-ов. Затем домердживаем данные, не распределённые по bucket-ам. /// Сначала параллельно мерджим для отдельных bucket-ов. Затем домердживаем данные, не распределённые по bucket-ам.
if (has_two_level) if (has_two_level)
{ {
@ -1464,7 +1466,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
&& has_two_level) && has_two_level)
thread_pool.reset(new boost::threadpool::pool(max_threads)); thread_pool.reset(new boost::threadpool::pool(max_threads));
auto merge_bucket = [&bucket_to_blocks, &result, this](size_t bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker) auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
{ {
current_memory_tracker = memory_tracker; current_memory_tracker = memory_tracker;
@ -1485,11 +1487,15 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток. /// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
std::vector<std::packaged_task<void()>> tasks; std::vector<std::packaged_task<void()>> tasks;
tasks.reserve(bucket_to_blocks.size()); tasks.reserve(bucket_to_blocks.size() - has_blocks_with_unknown_bucket);
for (auto & bucket_blocks : bucket_to_blocks) for (auto & bucket_blocks : bucket_to_blocks)
{ {
size_t bucket = bucket_blocks.first; auto bucket = bucket_blocks.first;
if (bucket == -1)
continue;
result.aggregates_pools.push_back(new Arena); result.aggregates_pools.push_back(new Arena);
Arena * aggregates_pool = result.aggregates_pools.back().get(); Arena * aggregates_pool = result.aggregates_pools.back().get();
@ -1510,7 +1516,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
LOG_TRACE(log, "Merged partially aggregated two-level data."); LOG_TRACE(log, "Merged partially aggregated two-level data.");
} }
if (bucket_to_blocks.count(-1)) if (has_blocks_with_unknown_bucket)
{ {
LOG_TRACE(log, "Merging partially aggregated single-level data."); LOG_TRACE(log, "Merging partially aggregated single-level data.");

View File

@ -0,0 +1,48 @@
0 600000 499999
1 4 3
2 4 5
3 4 7
4 4 9
5 4 11
6 4 13
7 4 15
8 4 17
9 4 19
0 1599996 499999
0 600000 499999
1 4 3
2 4 5
3 4 7
4 4 9
5 4 11
6 4 13
7 4 15
8 4 17
9 4 19
0 1599996 499999
0 4 1
1 4 3
2 4 5
3 4 7
4 4 9
5 4 11
6 4 13
7 4 15
8 4 17
9 4 19
0 400004 200001
0 600000 499999
1 4 3
2 4 5
3 4 7
4 4 9
5 4 11
6 4 13
7 4 15
8 4 17
9 4 19
0 1599996 499999