This commit is contained in:
Michael Kolupaev 2012-11-29 08:41:20 +00:00
parent 912cf7c4eb
commit f55d19f71a
2 changed files with 17 additions and 4 deletions

View File

@ -57,6 +57,9 @@ struct StorageMergeTreeSettings
/// Куски настолько большого размера объединять нельзя вообще.
size_t max_rows_to_merge_parts;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read;
/// Сколько потоков использовать для объединения кусков.
size_t merging_threads;
@ -65,6 +68,7 @@ struct StorageMergeTreeSettings
max_level_to_merge_different_level_parts(10),
max_rows_to_merge_different_level_parts(10 * 1024 * 1024),
max_rows_to_merge_parts(100 * 1024 * 1024),
min_rows_for_concurrent_read(20 * 8192),
merging_threads(2) {}
};

View File

@ -1042,9 +1042,10 @@ BlockInputStreams StorageMergeTree::read(
size_t cur_part = 0;
/// Сколько зесечек уже забрали из parts[cur_part].
size_t cur_pos = 0;
for (size_t i = 0; i < effective_threads; ++i)
size_t marks_spread = 0;
for (size_t i = 0; i < effective_threads && marks_spread < sum_marks; ++i)
{
size_t need_marks = sum_marks * (i + 1) / effective_threads - sum_marks * i / effective_threads;
size_t need_marks = sum_marks * (i + 1) / effective_threads - marks_spread;
BlockInputStreams streams;
while (need_marks > 0)
{
@ -1060,12 +1061,20 @@ BlockInputStreams StorageMergeTree::read(
}
size_t marks_to_get_from_part = std::min(marks_left_in_part, need_marks);
/// Не будем оставлять в куске слишком мало строк.
if ((marks_left_in_part - marks_to_get_from_part) * index_granularity < settings.min_rows_for_concurrent_read)
marks_to_get_from_part = marks_left_in_part;
streams.push_back(new MergeTreeBlockInputStream(full_path + part.data_part->name + '/',
max_block_size, column_names, *this,
part.data_part, part.first_mark + cur_pos,
marks_to_get_from_part * index_granularity));
need_marks -= marks_to_get_from_part;
marks_spread += marks_to_get_from_part;
if (marks_to_get_from_part > need_marks)
need_marks = 0;
else
need_marks -= marks_to_get_from_part;
cur_pos += marks_to_get_from_part;
}
@ -1075,7 +1084,7 @@ BlockInputStreams StorageMergeTree::read(
res.push_back(new ConcatBlockInputStream(streams));
}
if (cur_part + 1 != parts.size() || cur_pos != parts.back().last_mark - parts.back().first_mark + 1)
if (marks_spread != sum_marks || cur_part + 1 != parts.size() || cur_pos != parts.back().last_mark - parts.back().first_mark + 1)
throw Exception("Can't spread marks among threads", ErrorCodes::LOGICAL_ERROR);
return res;