dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-30 19:46:05 +03:00
parent b0c4727d7a
commit 7cba335206
3 changed files with 16 additions and 20 deletions

View File

@ -14,6 +14,7 @@ class PartsWithRangesSplitter final
{ {
public: public:
PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
UInt64 parallel_replica_offset,
size_t granularity_, size_t min_segment_size_, size_t max_segments_count_); size_t granularity_, size_t min_segment_size_, size_t max_segments_count_);
~PartsWithRangesSplitter() = default; ~PartsWithRangesSplitter() = default;
@ -42,23 +43,23 @@ private:
// Выходные данные. // Выходные данные.
Segments output_segments; Segments output_segments;
Segments::iterator current_output_segment; Segments::iterator current_output_segment;
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part = nullptr;
size_t total_size; size_t total_size = 0;
const size_t granularity; const size_t granularity;
const size_t min_segment_size; const size_t min_segment_size;
const size_t max_segments_count; const size_t max_segments_count;
size_t segment_size; size_t segment_size = 0;
size_t range_begin; size_t range_begin = 0;
size_t range_end; size_t range_end = 0;
size_t segment_begin; size_t segment_begin = 0;
size_t segment_end; size_t segment_end = 0;
size_t part_index_in_query; size_t part_index_in_query = 0;
}; };
} }

View File

@ -252,7 +252,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (parallel_replicas_count > 1) if (parallel_replicas_count > 1)
{ {
// Разбиваем массив на не больше, чем N сегментов, где N - количество реплик, // Разбиваем массив на не больше, чем N сегментов, где N - количество реплик,
PartsWithRangesSplitter splitter(parts_with_ranges, data.index_granularity, data.settings.min_rows_for_seek, PartsWithRangesSplitter splitter(parts_with_ranges, parallel_replica_offset,
data.index_granularity, data.settings.min_rows_for_seek,
parallel_replicas_count); parallel_replicas_count);
auto segments = splitter.perform(); auto segments = splitter.perform();

View File

@ -4,19 +4,12 @@ namespace DB
{ {
PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
UInt64 parallel_replica_offset,
size_t granularity_, size_t min_segment_size_, size_t max_segments_count_) size_t granularity_, size_t min_segment_size_, size_t max_segments_count_)
: input(input_), : input(input_),
current_output_part(nullptr),
total_size(0),
granularity(granularity_), granularity(granularity_),
min_segment_size(min_segment_size_), min_segment_size(min_segment_size_),
max_segments_count(max_segments_count_), max_segments_count(max_segments_count_)
segment_size(0),
range_begin(0),
range_end(0),
segment_begin(0),
segment_end(0),
part_index_in_query(0)
{ {
total_size = 0; total_size = 0;
for (const auto & part_with_ranges : input) for (const auto & part_with_ranges : input)
@ -27,6 +20,7 @@ PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecut
for (const auto & range : ranges) for (const auto & range : ranges)
total_size += range.end - range.begin; total_size += range.end - range.begin;
} }
part_index_in_query = parallel_replica_offset * total_size;
total_size *= granularity; total_size *= granularity;
if ((granularity == 0) || (min_segment_size == 0) || (max_segments_count == 0) || (total_size == 0)) if ((granularity == 0) || (min_segment_size == 0) || (max_segments_count == 0) || (total_size == 0))
@ -49,7 +43,7 @@ void PartsWithRangesSplitter::init()
// Вычислить размер сегментов так, чтобы он был кратен granularity // Вычислить размер сегментов так, чтобы он был кратен granularity
segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size)); segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size));
unsigned int scale = segment_size / granularity; size_t scale = segment_size / granularity;
if (segment_size % granularity != 0) { if (segment_size % granularity != 0) {
++scale; ++scale;
} }
@ -64,7 +58,7 @@ void PartsWithRangesSplitter::init()
output_segments.resize(segments_count); output_segments.resize(segments_count);
input_part = input.begin(); input_part = input.begin();
part_index_in_query = input_part->part_index_in_query; part_index_in_query += input_part->part_index_in_query;
/// Инициализируем информацию про первый диапазон. /// Инициализируем информацию про первый диапазон.
input_range = input_part->ranges.begin(); input_range = input_part->ranges.begin();