diff --git a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h deleted file mode 100644 index 60fb0233fd6..00000000000 --- a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -using Segments = std::vector; - -/** Этот класс разбивает объект типа RangesInDataParts (см. MergeTreeDataSelectExecutor) - * на не больше, чем указанное количество сегментов. - */ -class PartsWithRangesSplitter final -{ -public: - PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, - UInt64 parallel_replica_offset, - size_t granularity_, size_t min_segment_size_, size_t max_segments_count_); - - ~PartsWithRangesSplitter() = default; - PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; - PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete; - - Segments perform(); - -private: - void init(); - bool emitRange(); - bool switchToNextSegment(); - bool switchToNextRange(bool add_part); - void initRangeInfo(); - void initSegmentInfo(); - void addPart(); - bool isRangeConsumed() const { return range_begin == range_end; } - bool isSegmentConsumed() const { return segment_begin == segment_end; } - -private: - // Входные данные. - const MergeTreeDataSelectExecutor::RangesInDataParts & input; - MergeTreeDataSelectExecutor::RangesInDataParts::const_iterator input_part; - MarkRanges::const_iterator input_range; - - // Выходные данные. - Segments output_segments; - Segments::iterator current_output_segment; - MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part = nullptr; - - size_t total_size = 0; - - const size_t granularity; - const size_t min_segment_size; - const size_t max_segments_count; - - size_t segment_size = 0; - - size_t range_begin = 0; - size_t range_end = 0; - - size_t segment_begin = 0; - size_t segment_end = 0; - - size_t part_index_in_query = 0; -}; - -} diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index faad84ca355..ced5ec68d44 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -10,34 +9,6 @@ #include #include #include -#include - -namespace -{ - -std::pair computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & segment) -{ - SipHash hash; - for (const auto & part_with_ranges : segment) - { - const auto & part = *(part_with_ranges.data_part); - hash.update(part.name.c_str(), part.name.length()); - const auto & ranges = part_with_ranges.ranges; - for (const auto & range : ranges) - { - hash.update(reinterpret_cast(&range.begin), sizeof(range.begin)); - hash.update(reinterpret_cast(&range.end), sizeof(range.end)); - } - } - - UInt64 lo; - UInt64 hi; - - hash.get128(lo, hi); - return std::make_pair(lo, hi); -} - -} namespace DB { @@ -136,7 +107,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( /// Семплирование. Names column_names_to_read = real_column_names; - UInt64 sampling_column_value_limit = 0; typedef Poco::SharedPtr ASTFunctionPtr; ASTFunctionPtr filter_function; ExpressionActionsPtr filter_expression; @@ -178,6 +148,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( relative_sample_size = 0; } + UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count); + UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset); + + if ((parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0)) + relative_sample_size = 1; + if (relative_sample_size != 0) { UInt64 sampling_column_max = 0; @@ -194,20 +170,60 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( else throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); + UInt64 sampling_column_value_lower_limit; + UInt64 sampling_column_value_upper_limit; + UInt64 upper_limit = static_cast(relative_sample_size * sampling_column_max); + + if (parallel_replicas_count > 1) + { + UInt64 step = upper_limit / parallel_replicas_count; + sampling_column_value_lower_limit = parallel_replica_offset * step; + if ((parallel_replica_offset + 1) < parallel_replicas_count) + sampling_column_value_upper_limit = (parallel_replica_offset + 1) * step; + else + sampling_column_value_upper_limit = upper_limit + 1; + } + else + { + sampling_column_value_lower_limit = 0; + sampling_column_value_upper_limit = upper_limit + 1; + } + /// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса. - sampling_column_value_limit = static_cast(relative_sample_size * sampling_column_max); if (!key_condition.addCondition(data.sampling_expression->getColumnName(), - Range::createRightBounded(sampling_column_value_limit, true))) + Range::createLeftBounded(sampling_column_value_lower_limit, true))) throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - /// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit + if (!key_condition.addCondition(data.sampling_expression->getColumnName(), + Range::createRightBounded(sampling_column_value_upper_limit, false))) + throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); + + /// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit) + + ASTPtr lower_filter_args = new ASTExpressionList; + lower_filter_args->children.push_back(data.sampling_expression); + lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit)); + + ASTFunctionPtr lower_filter_function = new ASTFunction; + lower_filter_function->name = "greaterOrEquals"; + lower_filter_function->arguments = lower_filter_args; + lower_filter_function->children.push_back(lower_filter_function->arguments); + + ASTPtr upper_filter_args = new ASTExpressionList; + upper_filter_args->children.push_back(data.sampling_expression); + upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit)); + + ASTFunctionPtr upper_filter_function = new ASTFunction; + upper_filter_function->name = "less"; + upper_filter_function->arguments = upper_filter_args; + upper_filter_function->children.push_back(upper_filter_function->arguments); ASTPtr filter_function_args = new ASTExpressionList; - filter_function_args->children.push_back(data.sampling_expression); - filter_function_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_limit)); + filter_function_args->children.push_back(lower_filter_function); + filter_function_args->children.push_back(upper_filter_function); filter_function = new ASTFunction; - filter_function->name = "lessOrEquals"; + filter_function->name = "and"; filter_function->arguments = filter_function_args; filter_function->children.push_back(filter_function->arguments); @@ -237,70 +253,21 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( RangesInDataParts parts_with_ranges; /// Найдем, какой диапазон читать из каждого куска. + size_t sum_marks = 0; + size_t sum_ranges = 0; for (auto & part : parts) { RangesInDataPart ranges(part, (*part_index)++); ranges.ranges = markRangesFromPkRange(part->index, key_condition); if (!ranges.ranges.empty()) + { parts_with_ranges.push_back(ranges); - } - UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count); - UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset); - - if (parallel_replicas_count > 1) - { - // Разбиваем массив на не больше, чем N сегментов, где N - количество реплик, - PartsWithRangesSplitter splitter(parts_with_ranges, parallel_replica_offset, - data.index_granularity, data.settings.min_rows_for_seek, - parallel_replicas_count); - auto segments = splitter.perform(); - - if (!segments.empty()) - { - if (parallel_replica_offset >= segments.size()) - return BlockInputStreams(); - - /// Для каждого сегмента, вычисляем его хэш. - /// Сортируем массив сегментов по хэшу. - /// Выбираем сегмент соответствующий текущей реплике. - - using Entry = std::pair, RangesInDataParts *>; - std::vector hashed_segments; - hashed_segments.reserve(segments.size()); - - for (auto & segment : segments) - { - Entry entry = std::make_pair(computeHash(segment), &segment); - hashed_segments.push_back(entry); - } - - std::sort(hashed_segments.begin(), hashed_segments.end(), [](const Entry & lhs, const Entry & rhs) - { - return lhs.first < rhs.first; - }); - - parts_with_ranges = std::move(*(hashed_segments[parallel_replica_offset].second)); + sum_ranges += ranges.ranges.size(); + for (const auto & range : ranges.ranges) + sum_marks += range.end - range.begin; } - else - { - /// Получаем данные только от первой реплики. - if (parallel_replica_offset > 0) - return BlockInputStreams(); - } - } - - /// Считаем количество засечек и диапазонов. - size_t sum_marks = 0; - size_t sum_ranges = 0; - sum_marks = 0; - sum_ranges = 0; - for (const auto & part_with_ranges : parts_with_ranges) - { - sum_ranges += part_with_ranges.ranges.size(); - for (const auto & range : part_with_ranges.ranges) - sum_marks += range.end - range.begin; } LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, " diff --git a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp deleted file mode 100644 index e9eec940e29..00000000000 --- a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp +++ /dev/null @@ -1,144 +0,0 @@ -#include - -namespace DB -{ - -PartsWithRangesSplitter::PartsWithRangesSplitter(const MergeTreeDataSelectExecutor::RangesInDataParts & input_, - UInt64 parallel_replica_offset, - size_t granularity_, size_t min_segment_size_, size_t max_segments_count_) - : input(input_), - granularity(granularity_), - min_segment_size(min_segment_size_), - max_segments_count(max_segments_count_) -{ - total_size = 0; - for (const auto & part_with_ranges : input) - { - const auto & ranges = part_with_ranges.ranges; - if (ranges.empty()) - throw Exception("Missing range in chunk.", ErrorCodes::MISSING_RANGE_IN_CHUNK); - for (const auto & range : ranges) - total_size += range.end - range.begin; - } - part_index_in_query = parallel_replica_offset * total_size; - total_size *= granularity; - - if ((granularity == 0) || (min_segment_size == 0) || (max_segments_count == 0) || (total_size == 0)) - throw Exception("One or more arguments are invalid.", ErrorCodes::BAD_ARGUMENTS); -} - -Segments PartsWithRangesSplitter::perform() -{ - if ((max_segments_count > 1) && (total_size > min_segment_size)) - { - init(); - while (emitRange()) {} - } - return output_segments; -} - -void PartsWithRangesSplitter::init() -{ - output_segments.clear(); - - // Вычислить размер сегментов так, чтобы он был кратен granularity - segment_size = total_size / std::min(max_segments_count, (total_size / min_segment_size)); - size_t scale = segment_size / granularity; - if (segment_size % granularity != 0) { - ++scale; - } - segment_size = granularity * scale; - - // Посчитать количество сегментов. - size_t segments_count = total_size / segment_size; - if (total_size % segment_size != 0) { - ++segments_count; - } - - output_segments.resize(segments_count); - - input_part = input.begin(); - part_index_in_query += input_part->part_index_in_query; - - /// Инициализируем информацию про первый диапазон. - input_range = input_part->ranges.begin(); - initRangeInfo(); - - /// Инициализируем информацию про первый выходной сегмент. - current_output_segment = output_segments.begin(); - initSegmentInfo(); -} - -bool PartsWithRangesSplitter::emitRange() -{ - size_t new_size = std::min((range_end - range_begin) * granularity, segment_end - segment_begin); - size_t end = range_begin + new_size / granularity; - - current_output_part->ranges.push_back(MarkRange(range_begin, end)); - - range_begin = end; - segment_begin += new_size; - - if (isSegmentConsumed()) - return switchToNextSegment(); - else if (isRangeConsumed()) - return switchToNextRange(true); - else - return false; -} - -bool PartsWithRangesSplitter::switchToNextSegment() -{ - ++current_output_segment; - if (current_output_segment == output_segments.end()) - return false; - - if (isRangeConsumed()) - if (!switchToNextRange(false)) - return false; - - initSegmentInfo(); - return true; -} - -bool PartsWithRangesSplitter::switchToNextRange(bool add_part) -{ - ++input_range; - if (input_range == input_part->ranges.end()) - { - ++input_part; - if (input_part == input.end()) - return false; - - input_range = input_part->ranges.begin(); - - if (add_part) - addPart(); - } - - initRangeInfo(); - return true; -} - -void PartsWithRangesSplitter::initRangeInfo() -{ - range_begin = input_range->begin; - range_end = input_range->end; -} - -void PartsWithRangesSplitter::initSegmentInfo() -{ - addPart(); - segment_begin = 0; - segment_end = segment_size; -} - -void PartsWithRangesSplitter::addPart() -{ - MergeTreeDataSelectExecutor::RangesInDataPart part(input_part->data_part, part_index_in_query); - ++part_index_in_query; - current_output_segment->push_back(part); - current_output_part = &(current_output_segment->back()); -} - -}