
298 lines
11 KiB
Raw Normal View History

2014-03-07 13:50:58 +00:00
#pragma once
#include "Types.h"
#include <Poco/Util/LayeredConfiguration.h>
#include <unordered_set>
#include <future>
#include <memory>
#include <mutex>
2016-01-21 16:30:05 +00:00
#include <string>
2015-09-29 19:19:54 +00:00
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <port/unistd.h>
2016-10-24 13:47:15 +00:00
namespace ProfileEvents
extern const Event CannotRemoveEphemeralNode;
2016-10-24 13:47:15 +00:00
namespace CurrentMetrics
extern const Metric EphemeralNode;
2016-10-24 13:47:15 +00:00
2014-03-07 13:50:58 +00:00
namespace zkutil
/// Preferred size of multi() command (in number of ops)
constexpr size_t MULTI_BATCH_SIZE = 100;
2017-03-16 22:39:52 +00:00
/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
/// Poco::Event objects are used for watches. The event is set only once on the first
/// watch notification.
/// Callback-based watch interface is also provided.
/// Read-only methods retry retry_num times if recoverable errors like OperationTimeout
/// or ConnectionLoss are encountered.
/// Modifying methods do not retry, because it leads to problems of the double-delete type.
/// Methods with names not starting at try- raise KeeperException on any error.
class ZooKeeper
2014-03-07 13:50:58 +00:00
using Ptr = std::shared_ptr<ZooKeeper>;
ZooKeeper(const std::string & hosts, const std::string & identity = "",
int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT, const std::string & chroot = "");
/** Config of the form:
2017-08-30 18:21:49 +00:00
<!-- Optional. Chroot suffix. Should exist. -->
<!-- Optional. Zookeeper digest ACL string. -->
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
/// Creates a new session with the same parameters. This method can be used for reconnecting
/// after the session has expired.
/// This object remains unchanged, and the new session is returned.
Ptr startNewSession() const;
/// Returns true, if the session has expired.
bool expired();
/// Create a znode.
/// Throw an exception if something went wrong.
std::string create(const std::string & path, const std::string & data, int32_t mode);
/// Does not throw in the following cases:
/// * The parent for the created node does not exist
/// * The parent is ephemeral.
/// * The node already exists.
/// In case of other errors throws an exception.
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode);
/// Create a Persistent node.
/// Does nothing if the node already exists.
void createIfNotExists(const std::string & path, const std::string & data);
/// Creates all non-existent ancestors of the given path with empty contents.
/// Does not create the node itself.
void createAncestors(const std::string & path);
/// Remove the node if the version matches. (if version == -1, remove any version).
void remove(const std::string & path, int32_t version = -1);
/// Doesn't throw in the following cases:
/// * The node doesn't exist
/// * Versions don't match
/// * The node has children.
int32_t tryRemove(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
2018-04-05 02:56:11 +00:00
bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
/// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr);
2018-04-05 02:56:11 +00:00
bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * code = nullptr);
void set(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
/// Creates the node if it doesn't exist. Updates its contents otherwise.
void createOrUpdate(const std::string & path, const std::string & data, int32_t mode);
/// Doesn't not throw in the following cases:
/// * The node doesn't exist.
/// * Versions do not match.
int32_t trySet(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
Strings getChildren(const std::string & path,
Stat * stat = nullptr,
const EventPtr & watch = nullptr);
/// Doesn't not throw in the following cases:
/// * The node doesn't exist.
int32_t tryGetChildren(const std::string & path, Strings & res,
Stat * stat = nullptr,
const EventPtr & watch = nullptr);
/// Performs several operations in a transaction.
/// Throws on every error.
Responses multi(const Requests & requests);
/// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw.
int32_t tryMulti(const Requests & requests, Responses & responses);
/// Throws nothing, just alias of multiImpl
int32_t tryMultiNoThrow(const Requests & requests, Responses & responses)
return multiImpl(requests, responses);
Int64 getClientID();
/// Remove the node with the subtree. If someone concurrently adds or removes a node
/// in the subtree, the result is undefined.
void removeRecursive(const std::string & path);
/// Remove the node with the subtree. If someone concurrently removes a node in the subtree,
/// this will not cause errors.
/// For instance, you can call this method twice concurrently for the same node and the end
/// result would be the same as for the single call.
void tryRemoveRecursive(const std::string & path);
/// Wait for the node to disappear or return immediately if it doesn't exist.
void waitForDisappear(const std::string & path);
/// Async interface (a small subset of operations is implemented).
/// Usage:
/// // Non-blocking calls:
/// auto future1 = zk.asyncGet("/path1");
/// auto future2 = zk.asyncGet("/path2");
/// ...
/// // These calls can block until the operations are completed:
/// auto result1 = future1.get();
/// auto result2 = future2.get();
/// Future should not be destroyed before the result is gotten.
std::future<ZooKeeperImpl::ZooKeeper::GetResponse> asyncGet(const std::string & path);
std::future<ZooKeeperImpl::ZooKeeper::GetResponse> asyncTryGet(const std::string & path);
std::future<ZooKeeperImpl::ZooKeeper::ExistsResponse> asyncExists(const std::string & path);
std::future<ZooKeeperImpl::ZooKeeper::ListResponse> asyncGetChildren(const std::string & path);
std::future<ZooKeeperImpl::ZooKeeper::RemoveResponse> asyncRemove(const std::string & path, int32_t version = -1);
/// Doesn't throw in the following cases:
/// * The node doesn't exist
/// * The versions do not match
/// * The node has children
std::future<ZooKeeperImpl::ZooKeeper::RemoveResponse> asyncTryRemove(const std::string & path, int32_t version = -1);
std::future<ZooKeeperImpl::ZooKeeper::MultiResponse> asyncMulti(const Requests & ops);
/// Like the previous one but don't throw any exceptions on future.get()
std::future<ZooKeeperImpl::ZooKeeper::MultiResponse> tryAsyncMulti(const Requests & ops);
static std::string error2string(int32_t code);
2014-03-07 13:50:58 +00:00
friend class EphemeralNodeHolder;
void init(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_, const std::string & chroot_);
void removeChildrenRecursive(const std::string & path);
void tryRemoveChildrenRecursive(const std::string & path);
/// The following methods don't throw exceptions but return error codes.
int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
int32_t removeImpl(const std::string & path, int32_t version);
int32_t getImpl(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback);
int32_t setImpl(const std::string & path, const std::string & data, int32_t version, Stat * stat);
int32_t getChildrenImpl(const std::string & path, Strings & res, Stat * stat, WatchCallback watch_callback);
int32_t multiImpl(const Requests & requests, Responses & responses);
int32_t existsImpl(const std::string & path, Stat * stat_, WatchCallback watch_callback);
std::unique_ptr<ZooKeeperImpl::ZooKeeper> impl;
std::string hosts;
std::string identity;
int32_t session_timeout_ms;
std::string chroot;
std::mutex mutex;
Logger * log = nullptr;
2014-03-07 13:50:58 +00:00
using ZooKeeperPtr = ZooKeeper::Ptr;
2014-04-25 13:55:15 +00:00
2014-03-22 14:44:44 +00:00
2017-03-16 22:39:52 +00:00
/// Creates an ephemeral node in the constructor, removes it in the destructor.
2014-03-22 14:44:44 +00:00
class EphemeralNodeHolder
using Ptr = std::shared_ptr<EphemeralNodeHolder>;
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
: path(path_), zookeeper(zookeeper_)
if (create)
path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
std::string getPath() const
return path;
static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, data);
static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, true, data);
static Ptr existing(const std::string & path, ZooKeeper & zookeeper)
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, "");
catch (...)
2014-03-22 14:44:44 +00:00
std::string path;
ZooKeeper & zookeeper;
CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode};
2014-03-22 14:44:44 +00:00
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
2014-03-22 14:44:44 +00:00
2014-03-07 13:50:58 +00:00