diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp index f13da426c45..6bde8e7b3d6 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp @@ -1,4 +1,5 @@ #include +#include namespace ProfileEvents { @@ -14,6 +15,11 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsUInt64 index_granularity; +} + MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrder( ParallelReadingExtension extension_, CoordinationMode mode_, @@ -70,16 +76,73 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta task_idx, per_part_infos.size()); const auto & part_info = per_part_infos[task_idx]->data_part->info; - auto get_from_buffer = [&]() -> std::optional + const auto & data_settings = per_part_infos[task_idx]->data_part->storage.getSettings(); + auto get_from_buffer = [&, + rows_granularity = (*data_settings)[MergeTreeSetting::index_granularity], + my_max_block_size = this->block_size_params.max_block_size_rows]() -> std::optional { + const size_t max_marks_in_range = (my_max_block_size + rows_granularity - 1) / rows_granularity; for (auto & desc : buffered_tasks) { if (desc.info == part_info && !desc.ranges.empty()) { - auto result = std::move(desc.ranges); - desc.ranges = MarkRanges{}; - ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks()); - return result; + if (mode == CoordinationMode::WithOrder) + { + /// if already splited, just return desc.ranges + if (split) + { + auto result = std::move(desc.ranges); + desc.ranges = MarkRanges{}; + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks()); + return result; + } + + /// else split range but still return all MarkRanges + MarkRanges result; + for (auto range : desc.ranges) + { + while (!split && range.begin + marks_in_range < range.end) + { + result.emplace_back(range.begin, range.begin + marks_in_range); + range.begin += marks_in_range; + marks_in_range *= 2; + if (marks_in_range > max_marks_in_range) + split = true; + } + result.emplace_back(range.begin, range.begin + marks_in_range); + } + chassert(!result.empty()); + desc.ranges = MarkRanges{}; + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks()); + return result; + } + else + { + /// for reverse order just reutrn one range + MarkRanges result; + auto & range = desc.ranges.back(); + if (range.begin + marks_in_range < range.end) + { + result.emplace_front(range.end - marks_in_range, range.end); + range.end -= marks_in_range; + marks_in_range = std::min(marks_in_range * 2, max_marks_in_range); + } + else + { + result.emplace_front(range.begin, range.end); + desc.ranges.pop_back(); + } + + /// TODO: should we need reset for each ranges? + if (desc.ranges.empty()) + { + marks_in_range = 1; + } + + chassert(result.size() == 1); + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, result.getNumberOfMarks()); + return result; + } } } return std::nullopt; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h index a05dc54b529..4ec7d5f9334 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h @@ -38,6 +38,8 @@ private: RangesInDataPartsDescription buffered_tasks; mutable std::mutex mutex; + bool split{false}; + size_t marks_in_range{1}; }; };