Merge pull request #59089 from nickitat/sort_res_ranges_before_sending_to_replica

Coordinator returns ranges for reading in sorted order
This commit is contained in:
Nikita Taranov 2024-01-24 21:04:53 +01:00 committed by GitHub
commit 1e3f6894a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -56,6 +56,32 @@ takeFromRange(const MarkRange & range, size_t min_number_of_marks, size_t & curr
current_marks_amount += range_we_take.getNumberOfMarks();
return range_we_take.getNumberOfMarks();
}
void sortResponseRanges(RangesInDataPartsDescription & result)
{
std::ranges::sort(result, [](const auto & lhs, const auto & rhs) { return lhs.info < rhs.info; });
RangesInDataPartsDescription new_result;
/// Aggregate ranges for each part within a single entry
for (auto & ranges_in_part : result)
{
if (new_result.empty() || new_result.back().info != ranges_in_part.info)
new_result.push_back(RangesInDataPartDescription{.info = ranges_in_part.info});
new_result.back().ranges.insert(
new_result.back().ranges.end(),
std::make_move_iterator(ranges_in_part.ranges.begin()),
std::make_move_iterator(ranges_in_part.ranges.end()));
ranges_in_part.ranges.clear();
}
/// Sort ranges for each part
for (auto & ranges_in_part : new_result)
std::sort(ranges_in_part.ranges.begin(), ranges_in_part.ranges.end());
result = std::move(new_result);
}
}
namespace ProfileEvents
@ -775,6 +801,8 @@ ParallelReadResponse DefaultCoordinator::handleRequest(ParallelReadRequest reque
}
}
sortResponseRanges(response.description);
LOG_DEBUG(
log,
"Going to respond to replica {} with {}; mine_marks={}, stolen_by_hash={}, stolen_rest={}",