From 2650a2062899f5f232176ee56814cce66c800139 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Fri, 13 Sep 2024 16:21:17 +0200 Subject: [PATCH 1/2] Make dedup logic O(n*log(n)) instead of O(n^2) --- .../ParallelReplicasReadingCoordinator.cpp | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 8abf735b49f..c9fb09cd0ba 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -869,8 +869,7 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa /// To get rid of duplicates for (auto && part: announcement.description) { - auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), - [&part] (const Part & other) { return other.description.info == part.info; }); + auto the_same_it = all_parts_to_read.find(Part{.description = part}); /// We have the same part - add the info about presence on the corresponding replica to it if (the_same_it != all_parts_to_read.end()) @@ -882,12 +881,28 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa if (state_initialized) continue; - auto covering_or_the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), - [&part] (const Part & other) { return other.description.info.contains(part.info) || part.info.contains(other.description.info); }); + /// Look for the first part >= current + auto covering_it = all_parts_to_read.lower_bound(Part{.description = part}); - /// It is covering part or we have covering - skip it - if (covering_or_the_same_it != all_parts_to_read.end()) - continue; + if (covering_it != all_parts_to_read.end()) + { + /// Checks if other part covers this one or this one covers the other + auto is_covered_or_covering = [&part] (const Part & other) + { + return other.description.info.contains(part.info) || part.info.contains(other.description.info); + }; + + if (is_covered_or_covering(*covering_it)) + continue; + + /// Also look at the previous part, it could be covering the current one + if (covering_it != all_parts_to_read.begin()) + { + --covering_it; + if (is_covered_or_covering(*covering_it)) + continue; + } + } new_rows_to_read += part.rows; From e13247b67ee66d510af988cf0799a7286dab4ea4 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Fri, 13 Sep 2024 16:50:43 +0200 Subject: [PATCH 2/2] Fix clang-18 build --- .../MergeTree/ParallelReplicasReadingCoordinator.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index c9fb09cd0ba..ddbed5db7dc 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -869,7 +869,7 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa /// To get rid of duplicates for (auto && part: announcement.description) { - auto the_same_it = all_parts_to_read.find(Part{.description = part}); + auto the_same_it = all_parts_to_read.find(Part{.description = part, .replicas = {}}); /// We have the same part - add the info about presence on the corresponding replica to it if (the_same_it != all_parts_to_read.end()) @@ -882,14 +882,14 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa continue; /// Look for the first part >= current - auto covering_it = all_parts_to_read.lower_bound(Part{.description = part}); + auto covering_it = all_parts_to_read.lower_bound(Part{.description = part, .replicas = {}}); if (covering_it != all_parts_to_read.end()) { /// Checks if other part covers this one or this one covers the other auto is_covered_or_covering = [&part] (const Part & other) { - return other.description.info.contains(part.info) || part.info.contains(other.description.info); + return other.description.info.contains(part.info) || part.info.contains(other.description.info); }; if (is_covered_or_covering(*covering_it))