From b1619218255f75dfa94ee8494757ddfd9795fcf9 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Mon, 22 Jan 2024 22:33:23 +0100 Subject: [PATCH] impl --- .../ParallelReplicasReadingCoordinator.cpp | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 980b128ec75..484a0b37644 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -56,6 +56,32 @@ takeFromRange(const MarkRange & range, size_t min_number_of_marks, size_t & curr current_marks_amount += range_we_take.getNumberOfMarks(); return range_we_take.getNumberOfMarks(); } + +void sortResponseRanges(RangesInDataPartsDescription & result) +{ + std::ranges::sort(result, [](const auto & lhs, const auto & rhs) { return lhs.info < rhs.info; }); + + RangesInDataPartsDescription new_result; + + /// Aggregate ranges for each part within a single entry + for (auto & ranges_in_part : result) + { + if (new_result.empty() || new_result.back().info != ranges_in_part.info) + new_result.push_back(RangesInDataPartDescription{.info = ranges_in_part.info}); + + new_result.back().ranges.insert( + new_result.back().ranges.end(), + std::make_move_iterator(ranges_in_part.ranges.begin()), + std::make_move_iterator(ranges_in_part.ranges.end())); + ranges_in_part.ranges.clear(); + } + + /// Sort ranges for each part + for (auto & ranges_in_part : new_result) + std::sort(ranges_in_part.ranges.begin(), ranges_in_part.ranges.end()); + + result = std::move(new_result); +} } namespace ProfileEvents @@ -775,6 +801,8 @@ ParallelReadResponse DefaultCoordinator::handleRequest(ParallelReadRequest reque } } + sortResponseRanges(response.description); + LOG_DEBUG( log, "Going to respond to replica {} with {}; mine_marks={}, stolen_by_hash={}, stolen_rest={}",