Merge pull request #69596 from ClickHouse/fix_dedup_in_parallel_replicas_announcement

Optimize complexity of part deduplication in parallel replicas announcement
This commit is contained in:
Alexander Gololobov 2024-09-17 20:48:12 +00:00 committed by GitHub
commit 6597a8ed04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -373,17 +373,20 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann
if (state_initialized) if (state_initialized)
return; return;
for (auto && part : announcement.description)
{ {
auto intersecting_it = std::find_if( /// To speedup search for adjacent parts
all_parts_to_read.begin(), Parts known_parts(all_parts_to_read.begin(), all_parts_to_read.end());
all_parts_to_read.end(),
[&part](const Part & other) { return !other.description.info.isDisjoint(part.info); });
if (intersecting_it != all_parts_to_read.end()) for (auto && part : announcement.description)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Intersecting parts found in announcement"); {
auto intersecting_it = known_parts.lower_bound(Part{.description = part, .replicas = {}});
all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}}); if (intersecting_it != known_parts.end() && !intersecting_it->description.info.isDisjoint(part.info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Intersecting parts found in announcement");
known_parts.emplace(Part{.description = part, .replicas = {}});
all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
}
} }
std::ranges::sort( std::ranges::sort(
@ -870,8 +873,7 @@ void InOrderCoordinator<mode>::doHandleInitialAllRangesAnnouncement(InitialAllRa
/// To get rid of duplicates /// To get rid of duplicates
for (auto && part: announcement.description) for (auto && part: announcement.description)
{ {
auto the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), auto the_same_it = all_parts_to_read.find(Part{.description = part, .replicas = {}});
[&part] (const Part & other) { return other.description.info == part.info; });
/// We have the same part - add the info about presence on the corresponding replica to it /// We have the same part - add the info about presence on the corresponding replica to it
if (the_same_it != all_parts_to_read.end()) if (the_same_it != all_parts_to_read.end())
@ -883,12 +885,28 @@ void InOrderCoordinator<mode>::doHandleInitialAllRangesAnnouncement(InitialAllRa
if (state_initialized) if (state_initialized)
continue; continue;
auto covering_or_the_same_it = std::find_if(all_parts_to_read.begin(), all_parts_to_read.end(), /// Look for the first part >= current
[&part] (const Part & other) { return other.description.info.contains(part.info) || part.info.contains(other.description.info); }); auto covering_it = all_parts_to_read.lower_bound(Part{.description = part, .replicas = {}});
/// It is covering part or we have covering - skip it if (covering_it != all_parts_to_read.end())
if (covering_or_the_same_it != all_parts_to_read.end()) {
continue; /// Checks if other part covers this one or this one covers the other
auto is_covered_or_covering = [&part] (const Part & other)
{
return other.description.info.contains(part.info) || part.info.contains(other.description.info);
};
if (is_covered_or_covering(*covering_it))
continue;
/// Also look at the previous part, it could be covering the current one
if (covering_it != all_parts_to_read.begin())
{
--covering_it;
if (is_covered_or_covering(*covering_it))
continue;
}
}
new_rows_to_read += part.rows; new_rows_to_read += part.rows;
@ -897,6 +915,21 @@ void InOrderCoordinator<mode>::doHandleInitialAllRangesAnnouncement(InitialAllRa
std::sort(ranges.begin(), ranges.end()); std::sort(ranges.begin(), ranges.end());
} }
#ifndef NDEBUG
/// Double check that there are no intersecting parts
{
auto intersecting_part_it = std::adjacent_find(all_parts_to_read.begin(), all_parts_to_read.end(),
[] (const Part & lhs, const Part & rhs)
{
return !lhs.description.info.isDisjoint(rhs.description.info);
});
if (intersecting_part_it != all_parts_to_read.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts {} and {} intersect",
intersecting_part_it->description.info.getPartNameV1(), std::next(intersecting_part_it)->description.info.getPartNameV1());
}
#endif
state_initialized = true; state_initialized = true;
// progress_callback is not set when local plan is used for initiator // progress_callback is not set when local plan is used for initiator