diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index e88b5558d43..0c92be0991d 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2655,20 +2655,39 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall for (const auto & entry : queue) { bool entry_matches = !lightweight_entries_only || std::find(lightweight_entries.begin(), lightweight_entries.end(), entry->type) != lightweight_entries.end(); - bool source_replica_condition = true; + if (!entry_matches) + continue; - // Check if srcReplica condition should be applied - if (!src_replicas.empty() && entry_matches) + // `src_replicas` is used for specified sets of replicas; however, we also account for + // entries from removed or unknown replicas. This is necessary because the `source_replica` + // field in a replication queue entry doesn't always indicate the current existence or state + // of the part in that replica. Therefore, we include entries from replicas not listed in zookeeper. + // The `need_wait_for_entry` condition ensures: + // 1. Waiting for entries from both specified (`src_replicas`) and potentially removed + // or unknown replicas, as `source_replica` may not reflect the current part status. + // 2. Handling cases where parts become broken (e.g., due to a hard restart) leading to + // changes in the source replica or empty `source_replica` fields. + + // Example Scenario: + // - A part is added on replica1 and fetched by replica2. If the part on replica1 breaks and + // replica1 schedules a re-fetch from another source, a GET_PART entry with an empty + // `source_replica` may be created. + // - If replica3 is added and replica2 (with the intact part) is removed, SYNC .. FROM replica2 + // might not account for the re-fetch need from replica1, risking data inconsistencies. + // - Therefore, `need_wait_for_entry` considers entries with specified sources, those not in + // zookeeper->getChildren(zookeeper_path + "/replicas"), and entries with empty `source_replica`. + + bool is_entry_from_specified_replica = src_replicas.contains(entry->source_replica); + + chassert(!existing_replicas.contains("")); + bool is_entry_from_removed_or_unknown_replica = !existing_replicas.contains(entry->source_replica) || entry->source_replica.empty(); + + bool need_wait_for_entry = is_entry_from_specified_replica || is_entry_from_removed_or_unknown_replica; + + if (need_wait_for_entry) { - // Condition: entry's source_replica is one of the specified ones, or not in the system anymore, or is empty - source_replica_condition = src_replicas.contains(entry->source_replica) - || !existing_replicas.contains(entry->source_replica) || entry->source_replica.empty(); - } - - if (entry_matches && source_replica_condition) - { out_entry_names.insert(entry->znode_name); - } + } } LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));