From 0c6ce276b2b979822308f958cb365bd2cd156f40 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 26 Nov 2020 17:57:32 +0300 Subject: [PATCH] Add dead watches cleaner --- src/Common/ZooKeeper/TestKeeperStorage.cpp | 52 +++++++++++++++++----- src/Common/ZooKeeper/TestKeeperStorage.h | 11 ++++- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeperStorage.cpp b/src/Common/ZooKeeper/TestKeeperStorage.cpp index a2f7b1a8b2d..146eb374e28 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.cpp +++ b/src/Common/ZooKeeper/TestKeeperStorage.cpp @@ -49,9 +49,9 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & auto it = watches.find(watch_response.path); if (it != watches.end()) { - for (auto & callback : it->second) - if (callback) - callback(std::make_shared(watch_response)); + for (auto & watcher : it->second) + if (watcher.watch_callback) + watcher.watch_callback(std::make_shared(watch_response)); watches.erase(it); } @@ -64,9 +64,9 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & it = list_watches.find(watch_list_response.path); if (it != list_watches.end()) { - for (auto & callback : it->second) - if (callback) - callback(std::make_shared(watch_list_response)); + for (auto & watcher : it->second) + if (watcher.watch_callback) + watcher.watch_callback(std::make_shared(watch_list_response)); list_watches.erase(it); } @@ -529,6 +529,7 @@ void TestKeeperStorage::processingThread() } ephemerals.erase(it); } + clearDeadWatches(info.session_id); } else { @@ -542,11 +543,13 @@ void TestKeeperStorage::processingThread() ? list_watches : watches; - watches_type[zk_request->getPath()].emplace_back(std::move(info.watch_callback)); + watches_type[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback}); + sessions_and_watchers[info.session_id].emplace(zk_request->getPath()); } else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) { - watches[zk_request->getPath()].emplace_back(std::move(info.watch_callback)); + watches[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback}); + sessions_and_watchers[info.session_id].emplace(zk_request->getPath()); } else { @@ -601,13 +604,13 @@ void TestKeeperStorage::finalize() response.state = Coordination::EXPIRED_SESSION; response.error = Coordination::Error::ZSESSIONEXPIRED; - for (auto & callback : watch_pair.second) + for (auto & watcher : watch_pair.second) { - if (callback) + if (watcher.watch_callback) { try { - callback(std::make_shared(response)); + watcher.watch_callback(std::make_shared(response)); } catch (...) { @@ -622,6 +625,7 @@ void TestKeeperStorage::finalize() for (auto & path_watch : list_watches) finish_watch(path_watch); list_watches.clear(); + sessions_and_watchers.clear(); } RequestInfo info; while (requests_queue.tryPop(info)) @@ -755,4 +759,30 @@ TestKeeperStorage::~TestKeeperStorage() } } +void TestKeeperStorage::clearDeadWatches(int64_t session_id) +{ + auto watches_it = sessions_and_watchers.find(session_id); + if (watches_it != sessions_and_watchers.end()) + { + for (const auto & watch_path : watches_it->second) + { + auto watch = watches.find(watch_path); + if (watch != watches.end()) + { + auto & watches_for_path = watch->second; + for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();) + { + if (w_it->session_id == session_id) + w_it = watches_for_path.erase(w_it); + else + ++w_it; + } + if (watches_for_path.empty()) + watches.erase(watch); + } + } + sessions_and_watchers.erase(watches_it); + } +} + } diff --git a/src/Common/ZooKeeper/TestKeeperStorage.h b/src/Common/ZooKeeper/TestKeeperStorage.h index ee8bc1ae713..80b871554d2 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.h +++ b/src/Common/ZooKeeper/TestKeeperStorage.h @@ -34,14 +34,22 @@ public: int32_t seq_num = 0; }; + struct Watcher + { + int64_t session_id; + ResponseCallback watch_callback; + }; + using Container = std::map; using Ephemerals = std::unordered_map>; + using SessionAndWatcher = std::unordered_map>; - using WatchCallbacks = std::vector; + using WatchCallbacks = std::vector; using Watches = std::map; Container container; Ephemerals ephemerals; + SessionAndWatcher sessions_and_watchers; std::atomic zxid{0}; std::atomic shutdown{false}; @@ -69,6 +77,7 @@ public: ThreadFromGlobalPool processing_thread; void processingThread(); + void clearDeadWatches(int64_t session_id); public: using AsyncResponse = std::future;