Merge pull request #55781 from ClickHouse/pr-cleanup-annoucement-less-copying

Parallel replicas: cleanup, less copying during announcement
This commit is contained in:
Igor Nikonov 2023-10-18 20:41:18 +02:00 committed by GitHub
commit 9087572204
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 23 deletions

View File

@ -574,7 +574,7 @@ void RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement(InitialAllRang
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(std::move(announcement));
}
void RemoteQueryExecutor::finish()

View File

@ -134,7 +134,7 @@ public:
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
void updateReadingState(const InitialAllRangesAnnouncement & announcement);
void updateReadingState(InitialAllRangesAnnouncement announcement);
void finalizeReadingState();
size_t computeConsistentHash(const MergeTreePartInfo & info) const
@ -152,12 +152,12 @@ DefaultCoordinator::~DefaultCoordinator()
LOG_DEBUG(log, "Coordination done: {}", toString(stats));
}
void DefaultCoordinator::updateReadingState(const InitialAllRangesAnnouncement & announcement)
void DefaultCoordinator::updateReadingState(InitialAllRangesAnnouncement announcement)
{
PartRefs parts_diff;
/// To get rid of duplicates
for (const auto & part: announcement.description)
for (auto && part: announcement.description)
{
auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
[&part] (const Part & other) { return other.description.info.getPartNameV1() == part.info.getPartNameV1(); });
@ -176,12 +176,7 @@ void DefaultCoordinator::updateReadingState(const InitialAllRangesAnnouncement &
if (covering_or_the_same_it != all_parts_to_read.end())
continue;
auto new_part = Part{
.description = part,
.replicas = {announcement.replica_num}
};
auto [insert_it, _] = all_parts_to_read.insert(new_part);
auto [insert_it, _] = all_parts_to_read.emplace(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
parts_diff.push_back(insert_it);
}
@ -242,12 +237,14 @@ void DefaultCoordinator::finalizeReadingState()
void DefaultCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{
updateReadingState(announcement);
const auto replica_num = announcement.replica_num;
if (announcement.replica_num >= stats.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica number ({}) is bigger than total replicas count ({})", announcement.replica_num, stats.size());
updateReadingState(std::move(announcement));
stats[announcement.replica_num].number_of_requests +=1;
if (replica_num >= stats.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica number ({}) is bigger than total replicas count ({})", replica_num, stats.size());
++stats[replica_num].number_of_requests;
++sent_initial_requests;
LOG_DEBUG(log, "Sent initial requests: {} Replicas count: {}", sent_initial_requests, replicas_count);
@ -385,7 +382,7 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
LOG_TRACE(log, "Received an announcement {}", announcement.describe());
/// To get rid of duplicates
for (const auto & part: announcement.description)
for (auto && part: announcement.description)
{
auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(),
[&part] (const Part & other) { return other.description.info == part.info; });
@ -404,13 +401,8 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
if (covering_or_the_same_it != all_parts_to_read.end())
continue;
auto new_part = Part{
.description = part,
.replicas = {announcement.replica_num}
};
auto insert_it = all_parts_to_read.insert(new_part);
auto & ranges = insert_it.first->description.ranges;
auto [inserted_it, _] = all_parts_to_read.emplace(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
auto & ranges = inserted_it->description.ranges;
std::sort(ranges.begin(), ranges.end());
}
}
@ -517,7 +509,7 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
}
return pimpl->handleInitialAllRangesAnnouncement(announcement);
return pimpl->handleInitialAllRangesAnnouncement(std::move(announcement));
}
ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelReadRequest request)