diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h index b7f80cd37cd..37b997b6871 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -6,11 +6,14 @@ namespace DB { +class PartsWithRangesSplitter; /** Выполняет запросы SELECT на данных из merge-дерева. */ class MergeTreeDataSelectExecutor { + friend class PartsWithRangesSplitter; + public: MergeTreeDataSelectExecutor(MergeTreeData & data_); diff --git a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h new file mode 100644 index 00000000000..91601e5cb20 --- /dev/null +++ b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h @@ -0,0 +1,61 @@ +#include + +namespace DB +{ + + /// Разбиваем parts_with_ranges на n частей. + /// Следующие условия должны быть выполнены: + /// - 1 <= n <= settings.max_clusters_count; + /// - каждая из n частей имеет >= min_cluster_size записей. + /// 3 levels: cluster / part / range + + class PartsWithRangesSplitter + { + public: + typedef MergeTreeDataSelectExecutor::RangesInDataParts Cluster; + + public: + PartsWithRangesSplitter(const Cluster & input_cluster_, + size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_); + + ~PartsWithRangesSplitter() = default; + PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; + PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete; + + std::vector perform(); + + private: + void init(); + bool emit(); + bool updateCluster(); + bool updateRange(bool add_part); + void addPart(); + void initRangeInfo(); + void initClusterInfo(); + bool isRangeConsumed() const { return range_begin == range_end; } + bool isClusterConsumed() const { return cluster_begin == cluster_end; } + + private: + // Input data. + const Cluster & input_cluster; + Cluster::const_iterator input_part; + std::vector::const_iterator input_range; + + // Output data. + std::vector output_clusters; + std::vector::iterator current_output_cluster; + MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; + + size_t total_size; + size_t remaining_size; + size_t min_cluster_size; + size_t max_clusters_count; + size_t cluster_size; + + size_t range_begin; + size_t range_end; + + size_t cluster_begin; + size_t cluster_end; + }; +} \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index df0175ef925..a7f5aefcab6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -226,6 +227,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } } + if (settings.parallel_replicas_count > 0) + { + PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek, + settings.parallel_replicas_count); + auto per_replica_parts_with_ranges = splitter.perform(); + + /// Для каждого элемента per_replica_parts_with_ranges[k], вычисляем хэш от RangesInDataParts + /// Сортируем per_replica_parts_with_ranges по хэшу + /// Выбираем per_replica_parts_with_ranges[settings.parallel_replica_offset] + /// Если settings.parallel_replica_offset > (n - 1), то ничего не делаем. + } + LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, " << sum_marks << " marks to read from " << sum_ranges << " ranges"); diff --git a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp new file mode 100644 index 00000000000..d0d9a0d13d8 --- /dev/null +++ b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp @@ -0,0 +1,118 @@ +#include + +namespace DB +{ + + PartsWithRangesSplitter::PartsWithRangesSplitter(const Cluster & input_cluster_, + size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_) + : input_cluster(input_cluster_), + total_size(total_size_), + remaining_size(total_size_), + min_cluster_size(min_cluster_size_), + max_clusters_count(max_clusters_count_) + { + } + + std::vector PartsWithRangesSplitter::perform() + { + init(); + while (emit()) {} + return output_clusters; + } + + void PartsWithRangesSplitter::init() + { + size_t clusters_count = max_clusters_count; + while ((clusters_count > 0) && (total_size < (min_cluster_size * clusters_count))) + --clusters_count; + + cluster_size = total_size / clusters_count; + + output_clusters.resize(clusters_count); + + // Initialize range reader. + input_part = input_cluster.begin(); + input_range = input_part->ranges.begin(); + initRangeInfo(); + + // Initialize output writer. + current_output_cluster = output_clusters.begin(); + addPart(); + initClusterInfo(); + } + + bool PartsWithRangesSplitter::emit() + { + size_t new_size = std::min(range_end - range_begin, cluster_end - cluster_begin); + current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size)); + + range_begin += new_size; + cluster_begin += new_size; + + if (isClusterConsumed()) + return updateCluster(); + else if (isRangeConsumed()) + return updateRange(true); + else + return false; + } + + bool PartsWithRangesSplitter::updateCluster() + { + ++current_output_cluster; + if (current_output_cluster == output_clusters.end()) + return false; + + if (isRangeConsumed()) + if (!updateRange(false)) + return false; + + addPart(); + initClusterInfo(); + return true; + } + + bool PartsWithRangesSplitter::updateRange(bool add_part) + { + ++input_range; + if (input_range == input_part->ranges.end()) + { + ++input_part; + if (input_part == input_cluster.end()) + return false; + + input_range = input_part->ranges.begin(); + + if (add_part) + addPart(); + } + + initRangeInfo(); + return true; + } + + void PartsWithRangesSplitter::addPart() + { + MergeTreeDataSelectExecutor::RangesInDataPart new_part; + new_part.data_part = input_part->data_part; + current_output_cluster->push_back(new_part); + current_output_part = &(current_output_cluster->back()); + } + + void PartsWithRangesSplitter::initRangeInfo() + { + range_begin = 0; + range_end = input_range->end - input_range->begin; + } + + void PartsWithRangesSplitter::initClusterInfo() + { + cluster_begin = 0; + cluster_end = cluster_size; + + remaining_size -= cluster_size; + if (remaining_size < cluster_size) + cluster_end += remaining_size; + } + +}