mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 10:10:50 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
57036f94c0
commit
79e97417d9
@ -1,65 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using Segments = std::vector<MergeTreeDataSelectExecutor::RangesInDataParts>;
|
||||
|
||||
/** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor)
|
||||
* на не больше, чем указанное количество сегментов.
|
||||
*/
|
||||
class PartsWithRangesSplitter final
|
||||
{
|
||||
public:
|
||||
PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
|
||||
UInt64 parallel_replica_offset,
|
||||
size_t granularity_, size_t min_segment_size_, size_t max_segments_count_);
|
||||
|
||||
~PartsWithRangesSplitter() = default;
|
||||
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
|
||||
PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete;
|
||||
|
||||
Segments perform();
|
||||
|
||||
private:
|
||||
void init();
|
||||
bool emitRange();
|
||||
bool switchToNextSegment();
|
||||
bool switchToNextRange(bool add_part);
|
||||
void initRangeInfo();
|
||||
void initSegmentInfo();
|
||||
void addPart();
|
||||
bool isRangeConsumed() const { return range_begin == range_end; }
|
||||
bool isSegmentConsumed() const { return segment_begin == segment_end; }
|
||||
|
||||
private:
|
||||
// Входные данные.
|
||||
const MergeTreeDataSelectExecutor::RangesInDataParts & input;
|
||||
MergeTreeDataSelectExecutor::RangesInDataParts::const_iterator input_part;
|
||||
MarkRanges::const_iterator input_range;
|
||||
|
||||
// Выходные данные.
|
||||
Segments output_segments;
|
||||
Segments::iterator current_output_segment;
|
||||
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part = nullptr;
|
||||
|
||||
size_t total_size = 0;
|
||||
|
||||
const size_t granularity;
|
||||
const size_t min_segment_size;
|
||||
const size_t max_segments_count;
|
||||
|
||||
size_t segment_size = 0;
|
||||
|
||||
size_t range_begin = 0;
|
||||
size_t range_end = 0;
|
||||
|
||||
size_t segment_begin = 0;
|
||||
size_t segment_end = 0;
|
||||
|
||||
size_t part_index_in_query = 0;
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,5 @@
|
||||
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
||||
#include <DB/Storages/MergeTree/PartsWithRangesSplitter.h>
|
||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||
#include <DB/Parsers/ASTIdentifier.h>
|
||||
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
||||
@ -10,34 +9,6 @@
|
||||
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/Common/VirtualColumnUtils.h>
|
||||
#include <DB/Common/SipHash.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
std::pair<UInt64, UInt64> computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & segment)
|
||||
{
|
||||
SipHash hash;
|
||||
for (const auto & part_with_ranges : segment)
|
||||
{
|
||||
const auto & part = *(part_with_ranges.data_part);
|
||||
hash.update(part.name.c_str(), part.name.length());
|
||||
const auto & ranges = part_with_ranges.ranges;
|
||||
for (const auto & range : ranges)
|
||||
{
|
||||
hash.update(reinterpret_cast<const char *>(&range.begin), sizeof(range.begin));
|
||||
hash.update(reinterpret_cast<const char *>(&range.end), sizeof(range.end));
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 lo;
|
||||
UInt64 hi;
|
||||
|
||||
hash.get128(lo, hi);
|
||||
return std::make_pair(lo, hi);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -136,7 +107,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
|
||||
/// Семплирование.
|
||||
Names column_names_to_read = real_column_names;
|
||||
UInt64 sampling_column_value_limit = 0;
|
||||
typedef Poco::SharedPtr<ASTFunction> ASTFunctionPtr;
|
||||
ASTFunctionPtr filter_function;
|
||||
ExpressionActionsPtr filter_expression;
|
||||
@ -178,6 +148,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
relative_sample_size = 0;
|
||||
}
|
||||
|
||||
UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count);
|
||||
UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset);
|
||||
|
||||
if ((parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0))
|
||||
relative_sample_size = 1;
|
||||
|
||||
if (relative_sample_size != 0)
|
||||
{
|
||||
UInt64 sampling_column_max = 0;
|
||||
@ -194,20 +170,60 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
else
|
||||
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
|
||||
UInt64 sampling_column_value_lower_limit;
|
||||
UInt64 sampling_column_value_upper_limit;
|
||||
UInt64 upper_limit = static_cast<UInt64>(relative_sample_size * sampling_column_max);
|
||||
|
||||
if (parallel_replicas_count > 1)
|
||||
{
|
||||
UInt64 step = upper_limit / parallel_replicas_count;
|
||||
sampling_column_value_lower_limit = parallel_replica_offset * step;
|
||||
if ((parallel_replica_offset + 1) < parallel_replicas_count)
|
||||
sampling_column_value_upper_limit = (parallel_replica_offset + 1) * step;
|
||||
else
|
||||
sampling_column_value_upper_limit = upper_limit + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
sampling_column_value_lower_limit = 0;
|
||||
sampling_column_value_upper_limit = upper_limit + 1;
|
||||
}
|
||||
|
||||
/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
|
||||
sampling_column_value_limit = static_cast<UInt64>(relative_sample_size * sampling_column_max);
|
||||
if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
|
||||
Range::createRightBounded(sampling_column_value_limit, true)))
|
||||
Range::createLeftBounded(sampling_column_value_lower_limit, true)))
|
||||
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
/// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit
|
||||
if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
|
||||
Range::createRightBounded(sampling_column_value_upper_limit, false)))
|
||||
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
/// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit)
|
||||
|
||||
ASTPtr lower_filter_args = new ASTExpressionList;
|
||||
lower_filter_args->children.push_back(data.sampling_expression);
|
||||
lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit));
|
||||
|
||||
ASTFunctionPtr lower_filter_function = new ASTFunction;
|
||||
lower_filter_function->name = "greaterOrEquals";
|
||||
lower_filter_function->arguments = lower_filter_args;
|
||||
lower_filter_function->children.push_back(lower_filter_function->arguments);
|
||||
|
||||
ASTPtr upper_filter_args = new ASTExpressionList;
|
||||
upper_filter_args->children.push_back(data.sampling_expression);
|
||||
upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit));
|
||||
|
||||
ASTFunctionPtr upper_filter_function = new ASTFunction;
|
||||
upper_filter_function->name = "less";
|
||||
upper_filter_function->arguments = upper_filter_args;
|
||||
upper_filter_function->children.push_back(upper_filter_function->arguments);
|
||||
|
||||
ASTPtr filter_function_args = new ASTExpressionList;
|
||||
filter_function_args->children.push_back(data.sampling_expression);
|
||||
filter_function_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_limit));
|
||||
filter_function_args->children.push_back(lower_filter_function);
|
||||
filter_function_args->children.push_back(upper_filter_function);
|
||||
|
||||
filter_function = new ASTFunction;
|
||||
filter_function->name = "lessOrEquals";
|
||||
filter_function->name = "and";
|
||||
filter_function->arguments = filter_function_args;
|
||||
filter_function->children.push_back(filter_function->arguments);
|
||||
|
||||
@ -237,71 +253,22 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
||||
RangesInDataParts parts_with_ranges;
|
||||
|
||||
/// Найдем, какой диапазон читать из каждого куска.
|
||||
size_t sum_marks = 0;
|
||||
size_t sum_ranges = 0;
|
||||
for (auto & part : parts)
|
||||
{
|
||||
RangesInDataPart ranges(part, (*part_index)++);
|
||||
ranges.ranges = markRangesFromPkRange(part->index, key_condition);
|
||||
|
||||
if (!ranges.ranges.empty())
|
||||
{
|
||||
parts_with_ranges.push_back(ranges);
|
||||
}
|
||||
|
||||
UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count);
|
||||
UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset);
|
||||
|
||||
if (parallel_replicas_count > 1)
|
||||
{
|
||||
// Разбиваем массив на не больше, чем N сегментов, где N - количество реплик,
|
||||
PartsWithRangesSplitter splitter(parts_with_ranges, parallel_replica_offset,
|
||||
data.index_granularity, data.settings.min_rows_for_seek,
|
||||
parallel_replicas_count);
|
||||
auto segments = splitter.perform();
|
||||
|
||||
if (!segments.empty())
|
||||
{
|
||||
if (parallel_replica_offset >= segments.size())
|
||||
return BlockInputStreams();
|
||||
|
||||
/// Для каждого сегмента, вычисляем его хэш.
|
||||
/// Сортируем массив сегментов по хэшу.
|
||||
/// Выбираем сегмент соответствующий текущей реплике.
|
||||
|
||||
using Entry = std::pair<std::pair<UInt64, UInt64>, RangesInDataParts *>;
|
||||
std::vector<Entry> hashed_segments;
|
||||
hashed_segments.reserve(segments.size());
|
||||
|
||||
for (auto & segment : segments)
|
||||
{
|
||||
Entry entry = std::make_pair(computeHash(segment), &segment);
|
||||
hashed_segments.push_back(entry);
|
||||
}
|
||||
|
||||
std::sort(hashed_segments.begin(), hashed_segments.end(), [](const Entry & lhs, const Entry & rhs)
|
||||
{
|
||||
return lhs.first < rhs.first;
|
||||
});
|
||||
|
||||
parts_with_ranges = std::move(*(hashed_segments[parallel_replica_offset].second));
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Получаем данные только от первой реплики.
|
||||
if (parallel_replica_offset > 0)
|
||||
return BlockInputStreams();
|
||||
}
|
||||
}
|
||||
|
||||
/// Считаем количество засечек и диапазонов.
|
||||
size_t sum_marks = 0;
|
||||
size_t sum_ranges = 0;
|
||||
sum_marks = 0;
|
||||
sum_ranges = 0;
|
||||
for (const auto & part_with_ranges : parts_with_ranges)
|
||||
{
|
||||
sum_ranges += part_with_ranges.ranges.size();
|
||||
for (const auto & range : part_with_ranges.ranges)
|
||||
sum_ranges += ranges.ranges.size();
|
||||
for (const auto & range : ranges.ranges)
|
||||
sum_marks += range.end - range.begin;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
|
||||
<< sum_marks << " marks to read from " << sum_ranges << " ranges");
|
||||
|
@ -1,144 +0,0 @@
|
||||
#include <DB/Storages/MergeTree/PartsWithRangesSplitter.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_,
|
||||
UInt64 parallel_replica_offset,
|
||||
size_t granularity_, size_t min_segment_size_, size_t max_segments_count_)
|
||||
: input(input_),
|
||||
granularity(granularity_),
|
||||
min_segment_size(min_segment_size_),
|
||||
max_segments_count(max_segments_count_)
|
||||
{
|
||||
total_size = 0;
|
||||
for (const auto & part_with_ranges : input)
|
||||
{
|
||||
const auto & ranges = part_with_ranges.ranges;
|
||||
if (ranges.empty())
|
||||
throw Exception("Missing range in chunk.", ErrorCodes::MISSING_RANGE_IN_CHUNK);
|
||||
for (const auto & range : ranges)
|
||||
total_size += range.end - range.begin;
|
||||
}
|
||||
part_index_in_query = parallel_replica_offset * total_size;
|
||||
total_size *= granularity;
|
||||
|
||||
if ((granularity == 0) || (min_segment_size == 0) || (max_segments_count == 0) || (total_size == 0))
|
||||
throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
Segments PartsWithRangesSplitter::perform()
|
||||
{
|
||||
if ((max_segments_count > 1) && (total_size > min_segment_size))
|
||||
{
|
||||
init();
|
||||
while (emitRange()) {}
|
||||
}
|
||||
return output_segments;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::init()
|
||||
{
|
||||
output_segments.clear();
|
||||
|
||||
// Вычислить размер сегментов так, чтобы он был кратен granularity
|
||||
segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size));
|
||||
size_t scale = segment_size / granularity;
|
||||
if (segment_size % granularity != 0) {
|
||||
++scale;
|
||||
}
|
||||
segment_size = granularity * scale;
|
||||
|
||||
// Посчитать количество сегментов.
|
||||
size_t segments_count = total_size / segment_size;
|
||||
if (total_size % segment_size != 0) {
|
||||
++segments_count;
|
||||
}
|
||||
|
||||
output_segments.resize(segments_count);
|
||||
|
||||
input_part = input.begin();
|
||||
part_index_in_query += input_part->part_index_in_query;
|
||||
|
||||
/// Инициализируем информацию про первый диапазон.
|
||||
input_range = input_part->ranges.begin();
|
||||
initRangeInfo();
|
||||
|
||||
/// Инициализируем информацию про первый выходной сегмент.
|
||||
current_output_segment = output_segments.begin();
|
||||
initSegmentInfo();
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::emitRange()
|
||||
{
|
||||
size_t new_size = std::min((range_end - range_begin) * granularity, segment_end - segment_begin);
|
||||
size_t end = range_begin + new_size / granularity;
|
||||
|
||||
current_output_part->ranges.push_back(MarkRange(range_begin, end));
|
||||
|
||||
range_begin = end;
|
||||
segment_begin += new_size;
|
||||
|
||||
if (isSegmentConsumed())
|
||||
return switchToNextSegment();
|
||||
else if (isRangeConsumed())
|
||||
return switchToNextRange(true);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::switchToNextSegment()
|
||||
{
|
||||
++current_output_segment;
|
||||
if (current_output_segment == output_segments.end())
|
||||
return false;
|
||||
|
||||
if (isRangeConsumed())
|
||||
if (!switchToNextRange(false))
|
||||
return false;
|
||||
|
||||
initSegmentInfo();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::switchToNextRange(bool add_part)
|
||||
{
|
||||
++input_range;
|
||||
if (input_range == input_part->ranges.end())
|
||||
{
|
||||
++input_part;
|
||||
if (input_part == input.end())
|
||||
return false;
|
||||
|
||||
input_range = input_part->ranges.begin();
|
||||
|
||||
if (add_part)
|
||||
addPart();
|
||||
}
|
||||
|
||||
initRangeInfo();
|
||||
return true;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::initRangeInfo()
|
||||
{
|
||||
range_begin = input_range->begin;
|
||||
range_end = input_range->end;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::initSegmentInfo()
|
||||
{
|
||||
addPart();
|
||||
segment_begin = 0;
|
||||
segment_end = segment_size;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::addPart()
|
||||
{
|
||||
MergeTreeDataSelectExecutor::RangesInDataPart part(input_part->data_part, part_index_in_query);
|
||||
++part_index_in_query;
|
||||
current_output_segment->push_back(part);
|
||||
current_output_part = &(current_output_segment->back());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user