diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 2993c1d562f..d28ad0e66cb 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -275,6 +275,7 @@ namespace ErrorCodes UNEXPECTED_REPLICA, MISMATCH_REPLICAS_DATA_SOURCES, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS, + MISSING_RANGE_IN_CHUNK, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h index a7ff8341732..d8b5c46534d 100644 --- a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h +++ b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h @@ -8,11 +8,11 @@ namespace DB /** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor) * на указанное количество частей. */ -class PartsWithRangesSplitter +class PartsWithRangesSplitter final { public: PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, - size_t total_size_, size_t min_segment_size_, size_t max_segments_count_); + size_t min_segment_size_, size_t max_segments_count_); ~PartsWithRangesSplitter() = default; PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; @@ -22,30 +22,32 @@ public: private: void init(); - bool emit(); - bool updateSegment(); - bool updateRange(bool add_part); - void addPart(); + bool emitRange(); + bool switchToNextSegment(); + bool switchToNextRange(bool add_part); void initRangeInfo(); void initSegmentInfo(); + void addPart(); bool isRangeConsumed() const { return range_begin == range_end; } bool isSegmentConsumed() const { return segment_begin == segment_end; } private: - // Input data. + // Входные данные. const MergeTreeDataSelectExecutor::RangesInDataParts & input; MergeTreeDataSelectExecutor::RangesInDataParts::const_iterator input_part; std::vector::const_iterator input_range; - // Output data. + // Выходные данные. std::vector output_segments; std::vector::iterator current_output_segment; MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; size_t total_size; size_t remaining_size; - size_t min_segment_size; - size_t max_segments_count; + + const size_t min_segment_size; + const size_t max_segments_count; + size_t segment_size; size_t range_begin; @@ -53,6 +55,8 @@ private: size_t segment_begin; size_t segment_end; + + size_t part_index_in_query; }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 911c3f6094a..ab974f9c636 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -256,7 +256,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( if (settings.parallel_replicas_count > 1) { - PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek, + PartsWithRangesSplitter splitter(parts_with_ranges, data.settings.min_rows_for_seek, settings.parallel_replicas_count); auto segments = splitter.perform(); diff --git a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp index 49b8a1fd087..9f07e0c705d 100644 --- a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp +++ b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp @@ -4,27 +4,46 @@ namespace DB { PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, - size_t total_size_, size_t min_segment_size_, size_t max_segments_count_) + size_t min_segment_size_, size_t max_segments_count_) : input(input_), - total_size(total_size_), - remaining_size(total_size_), + current_output_part(nullptr), + total_size(0), + remaining_size(0), min_segment_size(min_segment_size_), - max_segments_count(max_segments_count_) + max_segments_count(max_segments_count_), + segment_size(0), + range_begin(0), + range_end(0), + segment_begin(0), + segment_end(0), + part_index_in_query(0) { + total_size = 0; + for (const auto & part_with_ranges : input) + { + const auto & ranges = part_with_ranges.ranges; + if (ranges.empty()) + throw Exception("Missing range in chunk.", ErrorCodes::MISSING_RANGE_IN_CHUNK); + for (const auto & range : ranges) + total_size += range.end - range.begin; + } + if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2) || (total_size < min_segment_size)) - throw Exception("One or more parameters are out of bound.", ErrorCodes::PARAMETER_OUT_OF_BOUND); + throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS); } std::vector PartsWithRangesSplitter::perform() { init(); - while (emit()) {} + while (emitRange()) {} return output_segments; } void PartsWithRangesSplitter::init() { + remaining_size = total_size; + size_t segments_count = max_segments_count; while ((segments_count > 0) && (total_size < (min_segment_size * segments_count))) --segments_count; @@ -32,18 +51,19 @@ void PartsWithRangesSplitter::init() segment_size = total_size / segments_count; output_segments.resize(segments_count); - /// Инициализируем информацию про первый диапазон. input_part = input.begin(); + part_index_in_query = input_part->part_index_in_query; + + /// Инициализируем информацию про первый диапазон. input_range = input_part->ranges.begin(); initRangeInfo(); /// Инициализируем информацию про первый выходной сегмент. current_output_segment = output_segments.begin(); - addPart(); initSegmentInfo(); } -bool PartsWithRangesSplitter::emit() +bool PartsWithRangesSplitter::emitRange() { size_t new_size = std::min(range_end - range_begin, segment_end - segment_begin); current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size)); @@ -52,29 +72,28 @@ bool PartsWithRangesSplitter::emit() segment_begin += new_size; if (isSegmentConsumed()) - return updateSegment(); + return switchToNextSegment(); else if (isRangeConsumed()) - return updateRange(true); + return switchToNextRange(true); else return false; } -bool PartsWithRangesSplitter::updateSegment() +bool PartsWithRangesSplitter::switchToNextSegment() { ++current_output_segment; if (current_output_segment == output_segments.end()) return false; if (isRangeConsumed()) - if (!updateRange(false)) + if (!switchToNextRange(false)) return false; - addPart(); initSegmentInfo(); return true; } -bool PartsWithRangesSplitter::updateRange(bool add_part) +bool PartsWithRangesSplitter::switchToNextRange(bool add_part) { ++input_range; if (input_range == input_part->ranges.end()) @@ -93,22 +112,16 @@ bool PartsWithRangesSplitter::updateRange(bool add_part) return true; } -void PartsWithRangesSplitter::addPart() -{ - MergeTreeDataSelectExecutor::RangesInDataPart new_part; - new_part.data_part = input_part->data_part; - current_output_segment->push_back(new_part); - current_output_part = &(current_output_segment->back()); -} - void PartsWithRangesSplitter::initRangeInfo() { - range_begin = 0; - range_end = input_range->end - input_range->begin; + range_begin = input_range->begin; + range_end = input_range->end; } void PartsWithRangesSplitter::initSegmentInfo() { + addPart(); + segment_begin = 0; segment_end = segment_size; @@ -117,4 +130,12 @@ void PartsWithRangesSplitter::initSegmentInfo() segment_end += remaining_size; } +void PartsWithRangesSplitter::addPart() +{ + MergeTreeDataSelectExecutor::RangesInDataPart part(input_part->data_part, part_index_in_query); + ++part_index_in_query; + current_output_segment->push_back(part); + current_output_part = &(current_output_segment->back()); +} + }