diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 0efa83237ca..5d738612a4c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -111,6 +111,18 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP bool found_part_with_the_same_max_block = false; Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas"); + /// Move our replica to the end of replicas + for (auto it = replicas.begin(); it != replicas.end(); ++it) + { + String replica_path = storage.zookeeper_path + "/replicas/" + *it; + if (replica_path == storage.replica_path) + { + std::iter_swap(it, replicas.begin() + replicas.size() - 1); + break; + } + } + + /// Check all replicas and our replica must be this last one for (const String & replica : replicas) { String replica_path = storage.zookeeper_path + "/replicas/" + replica; @@ -146,7 +158,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP if (found_part_with_the_same_min_block && found_part_with_the_same_max_block) { /// FIXME It may never appear - LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {}. Hoping that it will eventually appear as a result of a merge.", part_name); + LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica); return MissingPartSearchResult::FoundAndDontNeedFetch; } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 88bade29759..b5ab13933d3 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -55,13 +55,15 @@ void ReplicatedMergeTreeQueue::clear() mutation_pointer.clear(); } -void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts) +void ReplicatedMergeTreeQueue::initialize(zkutil::ZooKeeperPtr zookeeper) { std::lock_guard lock(state_mutex); - for (const auto & part : parts) + + Strings parts = zookeeper->getChildren(replica_path + "/parts"); + for (const auto & part_name : parts) { - current_parts.add(part->name, nullptr); - virtual_parts.add(part->name, nullptr); + current_parts.add(part_name, nullptr); + virtual_parts.add(part_name, nullptr); } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 57e1e658665..37abd0a1668 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -277,8 +277,8 @@ public: /// Clears queue state void clear(); - /// Put a set of (already existing) parts in virtual_parts. - void initialize(const MergeTreeData::DataParts & parts); + /// Get set of parts from zookeeper + void initialize(zkutil::ZooKeeperPtr zookeeper); /** Inserts an action to the end of the queue. * To restore broken parts during operation. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9ac4864b87f..8085f110759 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1224,34 +1224,24 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) Coordination::Requests ops; - String has_replica = findReplicaHavingPart(part_name, true); - if (!has_replica.empty()) + LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: {}", part_name); + time_t part_create_time = 0; + Coordination::ExistsResponse exists_resp = exists_futures[i].get(); + if (exists_resp.error == Coordination::Error::ZOK) { - LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: {}", part_name); - time_t part_create_time = 0; - Coordination::ExistsResponse exists_resp = exists_futures[i].get(); - if (exists_resp.error == Coordination::Error::ZOK) - { - part_create_time = exists_resp.stat.ctime / 1000; - removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0); - } - LogEntry log_entry; - log_entry.type = LogEntry::GET_PART; - log_entry.source_replica = ""; - log_entry.new_part_name = part_name; - log_entry.create_time = part_create_time; - - /// We assume that this occurs before the queue is loaded (queue.initialize). - ops.emplace_back(zkutil::makeCreateRequest( - fs::path(replica_path) / "queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); - enqueue_futures.emplace_back(zookeeper->asyncMulti(ops)); - } - else - { - LOG_ERROR(log, "Not found active replica having part {}", part_name); - enqueuePartForCheck(part_name); + part_create_time = exists_resp.stat.ctime / 1000; + removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0); } + LogEntry log_entry; + log_entry.type = LogEntry::GET_PART; + log_entry.source_replica = ""; + log_entry.new_part_name = part_name; + log_entry.create_time = part_create_time; + /// We assume that this occurs before the queue is loaded (queue.initialize). + ops.emplace_back(zkutil::makeCreateRequest( + fs::path(replica_path) / "queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + enqueue_futures.emplace_back(zookeeper->asyncMulti(ops)); } for (auto & future : enqueue_futures) @@ -4298,7 +4288,7 @@ void StorageReplicatedMergeTree::startup() try { - queue.initialize(getDataParts()); + queue.initialize(getZooKeeper()); InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared(*this); [[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);