dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-22 17:04:33 +03:00
parent 928b4f3a1d
commit fd4cfb2b2e
4 changed files with 62 additions and 36 deletions

View File

@ -275,6 +275,7 @@ namespace ErrorCodes
UNEXPECTED_REPLICA,
MISMATCH_REPLICAS_DATA_SOURCES,
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
MISSING_RANGE_IN_CHUNK,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -8,11 +8,11 @@ namespace DB
/** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor)
* на указанное количество частей.
*/
class PartsWithRangesSplitter
class PartsWithRangesSplitter final
{
public:
PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t total_size_, size_t min_segment_size_, size_t max_segments_count_);
size_t min_segment_size_, size_t max_segments_count_);
~PartsWithRangesSplitter() = default;
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
@ -22,30 +22,32 @@ public:
private:
void init();
bool emit();
bool updateSegment();
bool updateRange(bool add_part);
void addPart();
bool emitRange();
bool switchToNextSegment();
bool switchToNextRange(bool add_part);
void initRangeInfo();
void initSegmentInfo();
void addPart();
bool isRangeConsumed() const { return range_begin == range_end; }
bool isSegmentConsumed() const { return segment_begin == segment_end; }
private:
// Input data.
// Входные данные.
const MergeTreeDataSelectExecutor::RangesInDataParts & input;
MergeTreeDataSelectExecutor::RangesInDataParts::const_iterator input_part;
std::vector<MarkRange>::const_iterator input_range;
// Output data.
// Выходные данные.
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> output_segments;
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts>::iterator current_output_segment;
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
size_t total_size;
size_t remaining_size;
size_t min_segment_size;
size_t max_segments_count;
const size_t min_segment_size;
const size_t max_segments_count;
size_t segment_size;
size_t range_begin;
@ -53,6 +55,8 @@ private:
size_t segment_begin;
size_t segment_end;
size_t part_index_in_query;
};
}

View File

@ -256,7 +256,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (settings.parallel_replicas_count > 1)
{
PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek,
PartsWithRangesSplitter splitter(parts_with_ranges, data.settings.min_rows_for_seek,
settings.parallel_replicas_count);
auto segments = splitter.perform();

View File

@ -4,27 +4,46 @@ namespace DB
{
PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
size_t total_size_, size_t min_segment_size_, size_t max_segments_count_)
size_t min_segment_size_, size_t max_segments_count_)
: input(input_),
total_size(total_size_),
remaining_size(total_size_),
current_output_part(nullptr),
total_size(0),
remaining_size(0),
min_segment_size(min_segment_size_),
max_segments_count(max_segments_count_)
max_segments_count(max_segments_count_),
segment_size(0),
range_begin(0),
range_end(0),
segment_begin(0),
segment_end(0),
part_index_in_query(0)
{
total_size = 0;
for (const auto & part_with_ranges : input)
{
const auto & ranges = part_with_ranges.ranges;
if (ranges.empty())
throw Exception("Missing range in chunk.", ErrorCodes::MISSING_RANGE_IN_CHUNK);
for (const auto & range : ranges)
total_size += range.end - range.begin;
}
if ((total_size == 0) || (min_segment_size == 0) || (max_segments_count < 2)
|| (total_size < min_segment_size))
throw Exception("One or more parameters are out of bound.", ErrorCodes::PARAMETER_OUT_OF_BOUND);
throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS);
}
std::vector<MergeTreeDataSelectExecutor::RangesInDataParts> PartsWithRangesSplitter::perform()
{
init();
while (emit()) {}
while (emitRange()) {}
return output_segments;
}
void PartsWithRangesSplitter::init()
{
remaining_size = total_size;
size_t segments_count = max_segments_count;
while ((segments_count > 0) && (total_size < (min_segment_size * segments_count)))
--segments_count;
@ -32,18 +51,19 @@ void PartsWithRangesSplitter::init()
segment_size = total_size / segments_count;
output_segments.resize(segments_count);
/// Инициализируем информацию про первый диапазон.
input_part = input.begin();
part_index_in_query = input_part->part_index_in_query;
/// Инициализируем информацию про первый диапазон.
input_range = input_part->ranges.begin();
initRangeInfo();
/// Инициализируем информацию про первый выходной сегмент.
current_output_segment = output_segments.begin();
addPart();
initSegmentInfo();
}
bool PartsWithRangesSplitter::emit()
bool PartsWithRangesSplitter::emitRange()
{
size_t new_size = std::min(range_end - range_begin, segment_end - segment_begin);
current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size));
@ -52,29 +72,28 @@ bool PartsWithRangesSplitter::emit()
segment_begin += new_size;
if (isSegmentConsumed())
return updateSegment();
return switchToNextSegment();
else if (isRangeConsumed())
return updateRange(true);
return switchToNextRange(true);
else
return false;
}
bool PartsWithRangesSplitter::updateSegment()
bool PartsWithRangesSplitter::switchToNextSegment()
{
++current_output_segment;
if (current_output_segment == output_segments.end())
return false;
if (isRangeConsumed())
if (!updateRange(false))
if (!switchToNextRange(false))
return false;
addPart();
initSegmentInfo();
return true;
}
bool PartsWithRangesSplitter::updateRange(bool add_part)
bool PartsWithRangesSplitter::switchToNextRange(bool add_part)
{
++input_range;
if (input_range == input_part->ranges.end())
@ -93,22 +112,16 @@ bool PartsWithRangesSplitter::updateRange(bool add_part)
return true;
}
void PartsWithRangesSplitter::addPart()
{
MergeTreeDataSelectExecutor::RangesInDataPart new_part;
new_part.data_part = input_part->data_part;
current_output_segment->push_back(new_part);
current_output_part = &(current_output_segment->back());
}
void PartsWithRangesSplitter::initRangeInfo()
{
range_begin = 0;
range_end = input_range->end - input_range->begin;
range_begin = input_range->begin;
range_end = input_range->end;
}
void PartsWithRangesSplitter::initSegmentInfo()
{
addPart();
segment_begin = 0;
segment_end = segment_size;
@ -117,4 +130,12 @@ void PartsWithRangesSplitter::initSegmentInfo()
segment_end += remaining_size;
}
void PartsWithRangesSplitter::addPart()
{
MergeTreeDataSelectExecutor::RangesInDataPart part(input_part->data_part, part_index_in_query);
++part_index_in_query;
current_output_segment->push_back(part);
current_output_part = &(current_output_segment->back());
}
}