Use Poco::Event to simplify code.

This commit is contained in:
Vitaly Baranov 2022-07-26 09:53:32 +02:00
parent 76599d1231
commit c0ec6fd913

View File

@ -1,5 +1,4 @@
#include <Backups/BackupCoordinationStageSync.h> #include <Backups/BackupCoordinationStageSync.h>
#include <Common/scope_guard_safe.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h> #include <Common/ZooKeeper/KeeperException.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
@ -130,27 +129,8 @@ Strings BackupCoordinationStageSync::waitImpl(const Strings & all_hosts, const S
auto zookeeper = get_zookeeper(); auto zookeeper = get_zookeeper();
struct Watch /// Set by ZooKepper when list of zk nodes have changed.
{ auto watch = std::make_shared<Poco::Event>();
std::mutex mutex;
std::condition_variable event;
bool zk_nodes_changed = false;
bool watch_set = false;
};
/// shared_ptr because `watch_callback` can be called by ZooKeeper after leaving this function's scope.
auto watch = std::make_shared<Watch>();
/// Called by ZooKepper when list of zk nodes have changed.
auto watch_callback = [watch](const Coordination::WatchResponse &)
{
std::lock_guard lock{watch->mutex};
watch->zk_nodes_changed = true;
watch->watch_set = false; /// When it's triggered ZooKeeper resets the watch so we need to call getChildrenWatch() again.
watch->event.notify_all();
};
auto zk_nodes_changed = [watch] { return watch->zk_nodes_changed; };
bool use_timeout = timeout.has_value(); bool use_timeout = timeout.has_value();
std::chrono::steady_clock::time_point end_of_timeout; std::chrono::steady_clock::time_point end_of_timeout;
@ -164,12 +144,7 @@ Strings BackupCoordinationStageSync::waitImpl(const Strings & all_hosts, const S
for (;;) for (;;)
{ {
/// Get zk nodes and subscribe on their changes. /// Get zk nodes and subscribe on their changes.
{ Strings zk_nodes = zookeeper->getChildren(zookeeper_path, nullptr, watch);
std::lock_guard lock{watch->mutex};
watch->watch_set = true;
watch->zk_nodes_changed = false;
}
Strings zk_nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback);
/// Read and analyze the current state of zk nodes. /// Read and analyze the current state of zk nodes.
state = readCurrentState(zookeeper, zk_nodes, all_hosts, stage_to_wait); state = readCurrentState(zookeeper, zk_nodes, all_hosts, stage_to_wait);
@ -186,19 +161,17 @@ Strings BackupCoordinationStageSync::waitImpl(const Strings & all_hosts, const S
/// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed. /// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed.
{ {
std::unique_lock lock{watch->mutex};
if (use_timeout) if (use_timeout)
{ {
auto current_time = std::chrono::steady_clock::now(); auto current_time = std::chrono::steady_clock::now();
if ((current_time > end_of_timeout) || !watch->event.wait_for(lock, end_of_timeout - current_time, zk_nodes_changed)) if ((current_time > end_of_timeout)
|| !watch->tryWait(std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time).count()))
break; break;
} }
else else
{ {
watch->event.wait(lock, zk_nodes_changed); watch->wait();
} }
assert(watch->zk_nodes_changed);
assert(!watch->watch_set);
} }
} }