diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h index fae443d8405..5cc9cc3d6bf 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -6,14 +6,11 @@ namespace DB { -class PartsWithRangesSplitter; /** Выполняет запросы SELECT на данных из merge-дерева. */ class MergeTreeDataSelectExecutor { - friend class PartsWithRangesSplitter; - public: MergeTreeDataSelectExecutor(MergeTreeData & data_); @@ -30,11 +27,7 @@ public: unsigned threads = 1, size_t * inout_part_index = nullptr); -private: - MergeTreeData & data; - - Logger * log; - +public: struct RangesInDataPart { MergeTreeData::DataPartPtr data_part; @@ -49,10 +42,13 @@ private: } }; -public: typedef std::vector RangesInDataParts; private: + MergeTreeData & data; + + Logger * log; + size_t min_marks_for_seek; size_t min_marks_for_concurrent_read; size_t max_marks_to_use_cache; diff --git a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h index 91601e5cb20..dfb7ffe01c4 100644 --- a/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h +++ b/dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h @@ -1,61 +1,63 @@ +#pragma once + #include namespace DB { - /// Разбиваем parts_with_ranges на n частей. - /// Следующие условия должны быть выполнены: - /// - 1 <= n <= settings.max_clusters_count; - /// - каждая из n частей имеет >= min_cluster_size записей. - /// 3 levels: cluster / part / range +/// Разбиваем parts_with_ranges на n частей. +/// Следующие условия должны быть выполнены: +/// - 1 <= n <= settings.max_clusters_count; +/// - каждая из n частей имеет >= min_cluster_size записей. +/// 3 levels: cluster / part / range +class PartsWithRangesSplitter +{ +public: + typedef MergeTreeDataSelectExecutor::RangesInDataParts Cluster; - class PartsWithRangesSplitter - { - public: - typedef MergeTreeDataSelectExecutor::RangesInDataParts Cluster; +public: + PartsWithRangesSplitter(const Cluster & input_cluster_, + size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_); - public: - PartsWithRangesSplitter(const Cluster & input_cluster_, - size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_); + ~PartsWithRangesSplitter() = default; + PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; + PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete; - ~PartsWithRangesSplitter() = default; - PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete; - PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete; + std::vector perform(); - std::vector perform(); +private: + void init(); + bool emit(); + bool updateCluster(); + bool updateRange(bool add_part); + void addPart(); + void initRangeInfo(); + void initClusterInfo(); + bool isRangeConsumed() const { return range_begin == range_end; } + bool isClusterConsumed() const { return cluster_begin == cluster_end; } - private: - void init(); - bool emit(); - bool updateCluster(); - bool updateRange(bool add_part); - void addPart(); - void initRangeInfo(); - void initClusterInfo(); - bool isRangeConsumed() const { return range_begin == range_end; } - bool isClusterConsumed() const { return cluster_begin == cluster_end; } +private: + // Input data. + const Cluster & input_cluster; + Cluster::const_iterator input_part; + std::vector::const_iterator input_range; - private: - // Input data. - const Cluster & input_cluster; - Cluster::const_iterator input_part; - std::vector::const_iterator input_range; + // Output data. + std::vector output_clusters; + std::vector::iterator current_output_cluster; + MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; - // Output data. - std::vector output_clusters; - std::vector::iterator current_output_cluster; - MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part; + size_t total_size; + size_t remaining_size; + size_t min_cluster_size; + size_t max_clusters_count; + size_t cluster_size; - size_t total_size; - size_t remaining_size; - size_t min_cluster_size; - size_t max_clusters_count; - size_t cluster_size; + size_t range_begin; + size_t range_end; - size_t range_begin; - size_t range_end; + size_t cluster_begin; + size_t cluster_end; +}; - size_t cluster_begin; - size_t cluster_end; - }; -} \ No newline at end of file +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index b0d7fdd7d53..00ee7df4b1f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -14,27 +14,29 @@ namespace { - std::pair computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & cluster) + +std::pair computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & cluster) +{ + SipHash hash; + for (const auto & part_with_ranges : cluster) { - SipHash hash; - for (const auto & part_with_ranges : cluster) + const auto & part = *(part_with_ranges.data_part); + hash.update(part.name.c_str(), part.name.length()); + const auto & ranges = part_with_ranges.ranges; + for (const auto & range : ranges) { - const auto & part = *(part_with_ranges.data_part); - hash.update(part.name.c_str(), part.name.length()); - const auto & ranges = part_with_ranges.ranges; - for (const auto & range : ranges) - { - hash.update(reinterpret_cast(&range.begin), sizeof(range.begin)); - hash.update(reinterpret_cast(&range.end), sizeof(range.end)); - } + hash.update(reinterpret_cast(&range.begin), sizeof(range.begin)); + hash.update(reinterpret_cast(&range.end), sizeof(range.end)); } - - UInt64 lo; - UInt64 hi; - - hash.get128(lo, hi); - return std::make_pair(lo, hi); } + + UInt64 lo; + UInt64 hi; + + hash.get128(lo, hi); + return std::make_pair(lo, hi); +} + } namespace DB diff --git a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp index d0d9a0d13d8..381aa28213e 100644 --- a/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp +++ b/dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp @@ -3,116 +3,116 @@ namespace DB { - PartsWithRangesSplitter::PartsWithRangesSplitter(const Cluster & input_cluster_, - size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_) - : input_cluster(input_cluster_), - total_size(total_size_), - remaining_size(total_size_), - min_cluster_size(min_cluster_size_), - max_clusters_count(max_clusters_count_) +PartsWithRangesSplitter::PartsWithRangesSplitter(const Cluster & input_cluster_, + size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_) +: input_cluster(input_cluster_), +total_size(total_size_), +remaining_size(total_size_), +min_cluster_size(min_cluster_size_), +max_clusters_count(max_clusters_count_) +{ +} + +std::vector PartsWithRangesSplitter::perform() +{ + init(); + while (emit()) {} + return output_clusters; +} + +void PartsWithRangesSplitter::init() +{ + size_t clusters_count = max_clusters_count; + while ((clusters_count > 0) && (total_size < (min_cluster_size * clusters_count))) + --clusters_count; + + cluster_size = total_size / clusters_count; + + output_clusters.resize(clusters_count); + + // Initialize range reader. + input_part = input_cluster.begin(); + input_range = input_part->ranges.begin(); + initRangeInfo(); + + // Initialize output writer. + current_output_cluster = output_clusters.begin(); + addPart(); + initClusterInfo(); +} + +bool PartsWithRangesSplitter::emit() +{ + size_t new_size = std::min(range_end - range_begin, cluster_end - cluster_begin); + current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size)); + + range_begin += new_size; + cluster_begin += new_size; + + if (isClusterConsumed()) + return updateCluster(); + else if (isRangeConsumed()) + return updateRange(true); + else + return false; +} + +bool PartsWithRangesSplitter::updateCluster() +{ + ++current_output_cluster; + if (current_output_cluster == output_clusters.end()) + return false; + + if (isRangeConsumed()) + if (!updateRange(false)) + return false; + + addPart(); + initClusterInfo(); + return true; +} + +bool PartsWithRangesSplitter::updateRange(bool add_part) +{ + ++input_range; + if (input_range == input_part->ranges.end()) { - } + ++input_part; + if (input_part == input_cluster.end()) + return false; - std::vector PartsWithRangesSplitter::perform() - { - init(); - while (emit()) {} - return output_clusters; - } - - void PartsWithRangesSplitter::init() - { - size_t clusters_count = max_clusters_count; - while ((clusters_count > 0) && (total_size < (min_cluster_size * clusters_count))) - --clusters_count; - - cluster_size = total_size / clusters_count; - - output_clusters.resize(clusters_count); - - // Initialize range reader. - input_part = input_cluster.begin(); input_range = input_part->ranges.begin(); - initRangeInfo(); - // Initialize output writer. - current_output_cluster = output_clusters.begin(); - addPart(); - initClusterInfo(); + if (add_part) + addPart(); } - bool PartsWithRangesSplitter::emit() - { - size_t new_size = std::min(range_end - range_begin, cluster_end - cluster_begin); - current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size)); + initRangeInfo(); + return true; +} - range_begin += new_size; - cluster_begin += new_size; +void PartsWithRangesSplitter::addPart() +{ + MergeTreeDataSelectExecutor::RangesInDataPart new_part; + new_part.data_part = input_part->data_part; + current_output_cluster->push_back(new_part); + current_output_part = &(current_output_cluster->back()); +} - if (isClusterConsumed()) - return updateCluster(); - else if (isRangeConsumed()) - return updateRange(true); - else - return false; - } +void PartsWithRangesSplitter::initRangeInfo() +{ + range_begin = 0; + range_end = input_range->end - input_range->begin; +} - bool PartsWithRangesSplitter::updateCluster() - { - ++current_output_cluster; - if (current_output_cluster == output_clusters.end()) - return false; +void PartsWithRangesSplitter::initClusterInfo() +{ + cluster_begin = 0; + cluster_end = cluster_size; - if (isRangeConsumed()) - if (!updateRange(false)) - return false; - - addPart(); - initClusterInfo(); - return true; - } - - bool PartsWithRangesSplitter::updateRange(bool add_part) - { - ++input_range; - if (input_range == input_part->ranges.end()) - { - ++input_part; - if (input_part == input_cluster.end()) - return false; - - input_range = input_part->ranges.begin(); - - if (add_part) - addPart(); - } - - initRangeInfo(); - return true; - } - - void PartsWithRangesSplitter::addPart() - { - MergeTreeDataSelectExecutor::RangesInDataPart new_part; - new_part.data_part = input_part->data_part; - current_output_cluster->push_back(new_part); - current_output_part = &(current_output_cluster->back()); - } - - void PartsWithRangesSplitter::initRangeInfo() - { - range_begin = 0; - range_end = input_range->end - input_range->begin; - } - - void PartsWithRangesSplitter::initClusterInfo() - { - cluster_begin = 0; - cluster_end = cluster_size; - - remaining_size -= cluster_size; - if (remaining_size < cluster_size) - cluster_end += remaining_size; - } + remaining_size -= cluster_size; + if (remaining_size < cluster_size) + cluster_end += remaining_size; +} }