diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 41d715b23f1..bee875d1c74 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -663,7 +663,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & { WaitForDisappearStatePtr state = std::make_shared(); - auto callback = [state](const Coordination::ExistsResponse & response) + auto callback = [state](const Coordination::GetResponse & response) { state->code = int32_t(response.error); if (state->code) @@ -683,8 +683,9 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & while (!condition || !condition()) { - /// NOTE: if the node doesn't exist, the watch will leak. - impl->exists(path, callback, watch); + /// Use getData insteand of exists to avoid watch leak. + impl->get(path, callback, watch); + if (!condition) state->event.wait(); else if (!state->event.tryWait(1000)) diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 2ee0c5aba17..abb8158781b 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -422,6 +422,18 @@ void ZooKeeperRequest::write(WriteBuffer & out) const } +static void removeRootPath(String & path, const String & root_path) +{ + if (root_path.empty()) + return; + + if (path.size() <= root_path.size()) + throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY); + + path = path.substr(root_path.size()); +} + + struct ZooKeeperResponse : virtual Response { virtual ~ZooKeeperResponse() override = default; @@ -1092,8 +1104,6 @@ void ZooKeeper::sendThread() { info.request->has_watch = true; CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch); - std::lock_guard lock(watches_mutex); - watches[info.request->getPath()].emplace_back(std::move(info.watch)); } if (expired) @@ -1278,6 +1288,30 @@ void ZooKeeper::receiveEvent() response->removeRootPath(root_path); } + /// Instead of setting the watch in sendEvent, set it in receiveEvent becuase need to check the response. + /// The watch shouldn't be set if the node does not exist and it will never exist like sequential ephemeral nodes. + /// By using getData() instead of exists(), a watch won't be set if the node doesn't exist. + if (request_info.watch) + { + bool add_watch = false; + /// 3 indicates the ZooKeeperExistsRequest. + // For exists, we set the watch on both node exist and nonexist case. + // For other case like getData, we only set the watch when node exists. + if (request_info.request->getOpNum() == 3) + add_watch = (response->error == Error::ZOK || response->error == Error::ZNONODE); + else + add_watch = response->error == Error::ZOK; + + if (add_watch) + { + /// The key of wathces should exclude the root_path + String req_path = request_info.request->getPath(); + removeRootPath(req_path, root_path); + std::lock_guard lock(watches_mutex); + watches[req_path].emplace_back(std::move(request_info.watch)); + } + } + int32_t actual_length = in->count() - count_before_event; if (length != actual_length) throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), Error::ZMARSHALLINGERROR); diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index 89715085eda..a5f7ebce84f 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -124,7 +124,8 @@ private: /// Watch for the node in front of us. --my_node_it; - if (!zookeeper.existsWatch(path + "/" + *my_node_it, nullptr, task->getWatchCallback())) + std::string get_path_value; + if (!zookeeper.tryGetWatch(path + "/" + *my_node_it, get_path_value, nullptr, task->getWatchCallback())) task->schedule(); success = true;