From 2650a2062899f5f232176ee56814cce66c800139 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Fri, 13 Sep 2024 16:21:17 +0200 Subject: [PATCH 1/6] Make dedup logic O(n*log(n)) instead of O(n^2) --- .../ParallelReplicasReadingCoordinator.cpp | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 8abf735b49f..c9fb09cd0ba 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -869,8 +869,7 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa /// To get rid of duplicates for (auto && part: announcement.description) { - auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), - [&part] (const Part & other) { return other.description.info == part.info; }); + auto the_same_it = all_parts_to_read.find(Part{.description = part}); /// We have the same part - add the info about presence on the corresponding replica to it if (the_same_it != all_parts_to_read.end()) @@ -882,12 +881,28 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa if (state_initialized) continue; - auto covering_or_the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), - [&part] (const Part & other) { return other.description.info.contains(part.info) || part.info.contains(other.description.info); }); + /// Look for the first part >= current + auto covering_it = all_parts_to_read.lower_bound(Part{.description = part}); - /// It is covering part or we have covering - skip it - if (covering_or_the_same_it != all_parts_to_read.end()) - continue; + if (covering_it != all_parts_to_read.end()) + { + /// Checks if other part covers this one or this one covers the other + auto is_covered_or_covering = [&part] (const Part & other) + { + return other.description.info.contains(part.info) || part.info.contains(other.description.info); + }; + + if (is_covered_or_covering(*covering_it)) + continue; + + /// Also look at the previous part, it could be covering the current one + if (covering_it != all_parts_to_read.begin()) + { + --covering_it; + if (is_covered_or_covering(*covering_it)) + continue; + } + } new_rows_to_read += part.rows; From e13247b67ee66d510af988cf0799a7286dab4ea4 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Fri, 13 Sep 2024 16:50:43 +0200 Subject: [PATCH 2/6] Fix clang-18 build --- .../MergeTree/ParallelReplicasReadingCoordinator.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index c9fb09cd0ba..ddbed5db7dc 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -869,7 +869,7 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa /// To get rid of duplicates for (auto && part: announcement.description) { - auto the_same_it = all_parts_to_read.find(Part{.description = part}); + auto the_same_it = all_parts_to_read.find(Part{.description = part, .replicas = {}}); /// We have the same part - add the info about presence on the corresponding replica to it if (the_same_it != all_parts_to_read.end()) @@ -882,14 +882,14 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa continue; /// Look for the first part >= current - auto covering_it = all_parts_to_read.lower_bound(Part{.description = part}); + auto covering_it = all_parts_to_read.lower_bound(Part{.description = part, .replicas = {}}); if (covering_it != all_parts_to_read.end()) { /// Checks if other part covers this one or this one covers the other auto is_covered_or_covering = [&part] (const Part & other) { - return other.description.info.contains(part.info) || part.info.contains(other.description.info); + return other.description.info.contains(part.info) || part.info.contains(other.description.info); }; if (is_covered_or_covering(*covering_it)) From aba7de5091ffc52c864cc004c44ff4be966bb126 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 17 Sep 2024 16:53:32 +0200 Subject: [PATCH 3/6] Verify that there are no intersecting parts in the resulting all_parts_to_read --- .../ParallelReplicasReadingCoordinator.cpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index ddbed5db7dc..0a25874cfd9 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -911,6 +911,24 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa std::sort(ranges.begin(), ranges.end()); } +#ifndef NDEBUG + /// Double check that there are no intersecting parts + { + auto part_it = all_parts_to_read.begin(); + auto next_part_it = part_it; + if (next_part_it != all_parts_to_read.end()) + ++next_part_it; + while (next_part_it != all_parts_to_read.end()) + { + chassert(part_it->description.info.isDisjoint(next_part_it->description.info), + fmt::format("Parts {} and {} intersect", + part_it->description.info.getPartNameV1(), next_part_it->description.info.getPartNameV1())); + ++part_it; + ++next_part_it; + } + } +#endif + state_initialized = true; // progress_callback is not set when local plan is used for initiator From 190d3f04c9fb03e8a8c64ce1b25d5536e7835ad7 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 17 Sep 2024 16:54:49 +0200 Subject: [PATCH 4/6] More optimal check for intrsecting parts in DefaultCoordinator init --- .../ParallelReplicasReadingCoordinator.cpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 0a25874cfd9..603584af6c2 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -376,17 +376,20 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann if (state_initialized) return; - for (auto && part : announcement.description) { - auto intersecting_it = std::find_if( - all_parts_to_read.begin(), - all_parts_to_read.end(), - [&part](const Part & other) { return !other.description.info.isDisjoint(part.info); }); + /// To speedup search for adjacent parts + Parts known_parts(all_parts_to_read.begin(), all_parts_to_read.end()); - if (intersecting_it != all_parts_to_read.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Intersecting parts found in announcement"); + for (auto && part : announcement.description) + { + auto intersecting_it = known_parts.lower_bound(Part{.description = part, .replicas = {}}); - all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}}); + if (intersecting_it != known_parts.end() && !intersecting_it->description.info.isDisjoint(part.info)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Intersecting parts found in announcement"); + + all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}}); + known_parts.emplace(Part{.description = part, .replicas = {}}); + } } std::ranges::sort( From 3674c97ebba63bc88c6bc03f630124afa314053a Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 17 Sep 2024 17:49:02 +0200 Subject: [PATCH 5/6] Fix for using part after std::move from it --- src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 603584af6c2..98f28430ecc 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -387,8 +387,8 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann if (intersecting_it != known_parts.end() && !intersecting_it->description.info.isDisjoint(part.info)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Intersecting parts found in announcement"); - all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}}); known_parts.emplace(Part{.description = part, .replicas = {}}); + all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}}); } } From 574a26c63ba24e4632b428827642b40db48424e4 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 17 Sep 2024 17:56:44 +0200 Subject: [PATCH 6/6] Use adjacent_find to check adjacent parts --- .../ParallelReplicasReadingCoordinator.cpp | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 98f28430ecc..26f2273d196 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -917,18 +917,15 @@ void InOrderCoordinator::doHandleInitialAllRangesAnnouncement(InitialAllRa #ifndef NDEBUG /// Double check that there are no intersecting parts { - auto part_it = all_parts_to_read.begin(); - auto next_part_it = part_it; - if (next_part_it != all_parts_to_read.end()) - ++next_part_it; - while (next_part_it != all_parts_to_read.end()) - { - chassert(part_it->description.info.isDisjoint(next_part_it->description.info), - fmt::format("Parts {} and {} intersect", - part_it->description.info.getPartNameV1(), next_part_it->description.info.getPartNameV1())); - ++part_it; - ++next_part_it; - } + auto intersecting_part_it = std::adjacent_find(all_parts_to_read.begin(), all_parts_to_read.end(), + [] (const Part & lhs, const Part & rhs) + { + return !lhs.description.info.isDisjoint(rhs.description.info); + }); + + if (intersecting_part_it != all_parts_to_read.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts {} and {} intersect", + intersecting_part_it->description.info.getPartNameV1(), std::next(intersecting_part_it)->description.info.getPartNameV1()); } #endif