From 37fbf905dda2cdeb683488b46ff364531d1ac2d5 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Sun, 30 Jun 2024 20:33:43 +0000 Subject: [PATCH] Use only local snapshot for in order coordinator --- .../ParallelReplicasReadingCoordinator.cpp | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index f66cfdafa1a..b510da13d90 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -836,6 +836,7 @@ public: Parts all_parts_to_read; size_t total_rows_to_read = 0; + bool state_initialized{false}; LoggerPtr log = getLogger(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator")); }; @@ -857,6 +858,11 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa { LOG_TRACE(log, "Received an announcement {}", announcement.describe()); + ++stats[announcement.replica_num].number_of_requests; + + if (state_initialized) + return; + size_t new_rows_to_read = 0; /// To get rid of duplicates @@ -886,7 +892,7 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa std::sort(ranges.begin(), ranges.end()); } - ++stats[announcement.replica_num].number_of_requests; + state_initialized = true; // progress_callback is not set when local plan is used for initiator if (progress_callback && new_rows_to_read > 0) @@ -923,8 +929,15 @@ ParallelReadResponse InOrderCoordinator::handleRequest(ParallelReadRequest if (global_part_it == all_parts_to_read.end()) continue; + if (global_part_it->replicas.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Part {} requested by replica {} is not registered in working set", + part.info.getPartNameV1(), + request.replica_num); + if (!global_part_it->replicas.contains(request.replica_num)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} doesn't exist on replica {} according to the global state", part.info.getPartNameV1(), request.replica_num); + continue; size_t current_mark_size = 0;