review fixup: don't pass in zookeeper

This commit is contained in:
Jayme Bird 2024-01-05 15:16:06 +00:00
parent 382f6404bf
commit 9734934d84
5 changed files with 7 additions and 9 deletions

View File

@ -1039,8 +1039,7 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed");
auto sync_timeout = getContext()->getSettingsRef().receive_timeout.totalMilliseconds();
auto zookeeper = getContext()->getZooKeeper();
if (!storage_replicated->waitForProcessingQueue(sync_timeout, query.sync_replica_mode, zookeeper, query.src_replicas))
if (!storage_replicated->waitForProcessingQueue(sync_timeout, query.sync_replica_mode, query.src_replicas))
{
LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs());
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \

View File

@ -2623,7 +2623,7 @@ String ReplicatedMergeTreeMergePredicate::getCoveringVirtualPart(const String &
ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback,
std::unordered_set<String> & out_entry_names, SyncReplicaMode sync_mode,
zkutil::ZooKeeperPtr & zookeeper, std::unordered_set<String> src_replicas)
std::unordered_set<String> src_replicas)
{
std::lock_guard<std::mutex> lock(state_mutex);
std::lock_guard lock_subscribers(subscribers_mutex);
@ -2644,9 +2644,8 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
std::unordered_set<String> existing_replicas;
if (!src_replicas.empty())
{
Coordination::Stat stat;
Strings unfiltered_hosts;
unfiltered_hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
unfiltered_hosts = storage.getZooKeeper()->getChildren(zookeeper_path + "/replicas");
for (const auto & host : unfiltered_hosts)
existing_replicas.insert(host);
}

View File

@ -430,7 +430,7 @@ public:
ActionBlocker pull_log_blocker;
/// Adds a subscriber
SubscriberHandler addSubscriber(SubscriberCallBack && callback, std::unordered_set<String> & out_entry_names, SyncReplicaMode sync_mode, zkutil::ZooKeeperPtr & zookeeper, std::unordered_set<String> src_replicas);
SubscriberHandler addSubscriber(SubscriberCallBack && callback, std::unordered_set<String> & out_entry_names, SyncReplicaMode sync_mode, std::unordered_set<String> src_replicas);
void notifySubscribersOnPartialShutdown();

View File

@ -8551,7 +8551,7 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
cleanup_thread.wakeup();
}
bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, zkutil::ZooKeeperPtr & zookeeper, std::unordered_set<String> source_replicas)
bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, std::unordered_set<String> source_replicas)
{
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
@ -8592,7 +8592,7 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec
target_entry_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback), wait_for_ids, sync_mode, zookeeper, source_replicas);
const auto handler = queue.addSubscriber(std::move(callback), wait_for_ids, sync_mode, source_replicas);
if (!target_entry_event.tryWait(max_wait_milliseconds))
return false;

View File

@ -208,7 +208,7 @@ public:
/// Wait till replication queue's current last entry is processed or till size becomes 0
/// If timeout is exceeded returns false
bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, zkutil::ZooKeeperPtr & zookeeper, std::unordered_set<String> source_replicas);
bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, std::unordered_set<String> source_replicas);
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);