diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index adce05a8f6b..fbdbe8204f7 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -1039,8 +1039,7 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) { LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed"); auto sync_timeout = getContext()->getSettingsRef().receive_timeout.totalMilliseconds(); - auto zookeeper = getContext()->getZooKeeper(); - if (!storage_replicated->waitForProcessingQueue(sync_timeout, query.sync_replica_mode, zookeeper, query.src_replicas)) + if (!storage_replicated->waitForProcessingQueue(sync_timeout, query.sync_replica_mode, query.src_replicas)) { LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs()); throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 0836ca1e5b4..e88b5558d43 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2623,7 +2623,7 @@ String ReplicatedMergeTreeMergePredicate::getCoveringVirtualPart(const String & ReplicatedMergeTreeQueue::SubscriberHandler ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback, std::unordered_set & out_entry_names, SyncReplicaMode sync_mode, - zkutil::ZooKeeperPtr & zookeeper, std::unordered_set src_replicas) + std::unordered_set src_replicas) { std::lock_guard lock(state_mutex); std::lock_guard lock_subscribers(subscribers_mutex); @@ -2644,9 +2644,8 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall std::unordered_set existing_replicas; if (!src_replicas.empty()) { - Coordination::Stat stat; Strings unfiltered_hosts; - unfiltered_hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat); + unfiltered_hosts = storage.getZooKeeper()->getChildren(zookeeper_path + "/replicas"); for (const auto & host : unfiltered_hosts) existing_replicas.insert(host); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 845c5580790..92201b11d37 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -430,7 +430,7 @@ public: ActionBlocker pull_log_blocker; /// Adds a subscriber - SubscriberHandler addSubscriber(SubscriberCallBack && callback, std::unordered_set & out_entry_names, SyncReplicaMode sync_mode, zkutil::ZooKeeperPtr & zookeeper, std::unordered_set src_replicas); + SubscriberHandler addSubscriber(SubscriberCallBack && callback, std::unordered_set & out_entry_names, SyncReplicaMode sync_mode, std::unordered_set src_replicas); void notifySubscribersOnPartialShutdown(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e1bfa9e4c92..ef0e844b748 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8551,7 +8551,7 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio cleanup_thread.wakeup(); } -bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, zkutil::ZooKeeperPtr & zookeeper, std::unordered_set source_replicas) +bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, std::unordered_set source_replicas) { /// Let's fetch new log entries firstly queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); @@ -8592,7 +8592,7 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec target_entry_event.set(); }; - const auto handler = queue.addSubscriber(std::move(callback), wait_for_ids, sync_mode, zookeeper, source_replicas); + const auto handler = queue.addSubscriber(std::move(callback), wait_for_ids, sync_mode, source_replicas); if (!target_entry_event.tryWait(max_wait_milliseconds)) return false; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index de06c04ce44..18530490164 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -208,7 +208,7 @@ public: /// Wait till replication queue's current last entry is processed or till size becomes 0 /// If timeout is exceeded returns false - bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, zkutil::ZooKeeperPtr & zookeeper, std::unordered_set source_replicas); + bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, std::unordered_set source_replicas); /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK. void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);