Less copying while handling annoucement

This commit is contained in:
Igor Nikonov 2023-10-18 12:20:14 +00:00
parent 56e62cb3fc
commit 01fa15c33f
2 changed files with 5 additions and 10 deletions

View File

@ -574,7 +574,7 @@ void RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement(InitialAllRang
if (!extension || !extension->parallel_reading_coordinator) if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement); extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(std::move(announcement));
} }
void RemoteQueryExecutor::finish() void RemoteQueryExecutor::finish()

View File

@ -385,7 +385,7 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
LOG_TRACE(log, "Received an announcement {}", announcement.describe()); LOG_TRACE(log, "Received an announcement {}", announcement.describe());
/// To get rid of duplicates /// To get rid of duplicates
for (const auto & part: announcement.description) for (auto && part: announcement.description)
{ {
auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
[&part] (const Part & other) { return other.description.info == part.info; }); [&part] (const Part & other) { return other.description.info == part.info; });
@ -404,13 +404,8 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
if (covering_or_the_same_it != all_parts_to_read.end()) if (covering_or_the_same_it != all_parts_to_read.end())
continue; continue;
auto new_part = Part{ auto [inserted_it, _] = all_parts_to_read.emplace(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
.description = part, auto & ranges = inserted_it->description.ranges;
.replicas = {announcement.replica_num}
};
auto insert_it = all_parts_to_read.insert(new_part);
auto & ranges = insert_it.first->description.ranges;
std::sort(ranges.begin(), ranges.end()); std::sort(ranges.begin(), ranges.end());
} }
} }
@ -517,7 +512,7 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
} }
return pimpl->handleInitialAllRangesAnnouncement(announcement); return pimpl->handleInitialAllRangesAnnouncement(std::move(announcement));
} }
ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelReadRequest request) ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelReadRequest request)