Add dead watches cleaner

This commit is contained in:
alesapin 2020-11-26 17:57:32 +03:00
parent 15a76b4763
commit 0c6ce276b2
2 changed files with 51 additions and 12 deletions

View File

@ -49,9 +49,9 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches &
auto it = watches.find(watch_response.path);
if (it != watches.end())
{
for (auto & callback : it->second)
if (callback)
callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(watch_response));
for (auto & watcher : it->second)
if (watcher.watch_callback)
watcher.watch_callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(watch_response));
watches.erase(it);
}
@ -64,9 +64,9 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches &
it = list_watches.find(watch_list_response.path);
if (it != list_watches.end())
{
for (auto & callback : it->second)
if (callback)
callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(watch_list_response));
for (auto & watcher : it->second)
if (watcher.watch_callback)
watcher.watch_callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(watch_list_response));
list_watches.erase(it);
}
@ -529,6 +529,7 @@ void TestKeeperStorage::processingThread()
}
ephemerals.erase(it);
}
clearDeadWatches(info.session_id);
}
else
{
@ -542,11 +543,13 @@ void TestKeeperStorage::processingThread()
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(std::move(info.watch_callback));
watches_type[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback});
sessions_and_watchers[info.session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(std::move(info.watch_callback));
watches[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback});
sessions_and_watchers[info.session_id].emplace(zk_request->getPath());
}
else
{
@ -601,13 +604,13 @@ void TestKeeperStorage::finalize()
response.state = Coordination::EXPIRED_SESSION;
response.error = Coordination::Error::ZSESSIONEXPIRED;
for (auto & callback : watch_pair.second)
for (auto & watcher : watch_pair.second)
{
if (callback)
if (watcher.watch_callback)
{
try
{
callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(response));
watcher.watch_callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(response));
}
catch (...)
{
@ -622,6 +625,7 @@ void TestKeeperStorage::finalize()
for (auto & path_watch : list_watches)
finish_watch(path_watch);
list_watches.clear();
sessions_and_watchers.clear();
}
RequestInfo info;
while (requests_queue.tryPop(info))
@ -755,4 +759,30 @@ TestKeeperStorage::~TestKeeperStorage()
}
}
void TestKeeperStorage::clearDeadWatches(int64_t session_id)
{
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())
{
for (const auto & watch_path : watches_it->second)
{
auto watch = watches.find(watch_path);
if (watch != watches.end())
{
auto & watches_for_path = watch->second;
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
{
if (w_it->session_id == session_id)
w_it = watches_for_path.erase(w_it);
else
++w_it;
}
if (watches_for_path.empty())
watches.erase(watch);
}
}
sessions_and_watchers.erase(watches_it);
}
}
}

View File

@ -34,14 +34,22 @@ public:
int32_t seq_num = 0;
};
struct Watcher
{
int64_t session_id;
ResponseCallback watch_callback;
};
using Container = std::map<std::string, Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>;
using WatchCallbacks = std::vector<ResponseCallback>;
using WatchCallbacks = std::vector<Watcher>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
Container container;
Ephemerals ephemerals;
SessionAndWatcher sessions_and_watchers;
std::atomic<int64_t> zxid{0};
std::atomic<bool> shutdown{false};
@ -69,6 +77,7 @@ public:
ThreadFromGlobalPool processing_thread;
void processingThread();
void clearDeadWatches(int64_t session_id);
public:
using AsyncResponse = std::future<Coordination::ZooKeeperResponsePtr>;