review fixup: simplify conditions and add meaningful comment on why we have these conditions

This commit is contained in:
Jayme Bird 2024-01-05 16:50:35 +00:00
parent d169383c6c
commit f67e283968

View File

@ -2655,20 +2655,39 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
for (const auto & entry : queue)
{
bool entry_matches = !lightweight_entries_only || std::find(lightweight_entries.begin(), lightweight_entries.end(), entry->type) != lightweight_entries.end();
bool source_replica_condition = true;
if (!entry_matches)
continue;
// Check if srcReplica condition should be applied
if (!src_replicas.empty() && entry_matches)
// `src_replicas` is used for specified sets of replicas; however, we also account for
// entries from removed or unknown replicas. This is necessary because the `source_replica`
// field in a replication queue entry doesn't always indicate the current existence or state
// of the part in that replica. Therefore, we include entries from replicas not listed in zookeeper.
// The `need_wait_for_entry` condition ensures:
// 1. Waiting for entries from both specified (`src_replicas`) and potentially removed
// or unknown replicas, as `source_replica` may not reflect the current part status.
// 2. Handling cases where parts become broken (e.g., due to a hard restart) leading to
// changes in the source replica or empty `source_replica` fields.
// Example Scenario:
// - A part is added on replica1 and fetched by replica2. If the part on replica1 breaks and
// replica1 schedules a re-fetch from another source, a GET_PART entry with an empty
// `source_replica` may be created.
// - If replica3 is added and replica2 (with the intact part) is removed, SYNC .. FROM replica2
// might not account for the re-fetch need from replica1, risking data inconsistencies.
// - Therefore, `need_wait_for_entry` considers entries with specified sources, those not in
// zookeeper->getChildren(zookeeper_path + "/replicas"), and entries with empty `source_replica`.
bool is_entry_from_specified_replica = src_replicas.contains(entry->source_replica);
chassert(!existing_replicas.contains(""));
bool is_entry_from_removed_or_unknown_replica = !existing_replicas.contains(entry->source_replica) || entry->source_replica.empty();
bool need_wait_for_entry = is_entry_from_specified_replica || is_entry_from_removed_or_unknown_replica;
if (need_wait_for_entry)
{
// Condition: entry's source_replica is one of the specified ones, or not in the system anymore, or is empty
source_replica_condition = src_replicas.contains(entry->source_replica)
|| !existing_replicas.contains(entry->source_replica) || entry->source_replica.empty();
}
if (entry_matches && source_replica_condition)
{
out_entry_names.insert(entry->znode_name);
}
}
}
LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));