2017-08-25 16:35:10 +00:00
|
|
|
#include <random>
|
2017-09-09 23:17:38 +00:00
|
|
|
#include <pcg_random.hpp>
|
2014-10-16 01:21:03 +00:00
|
|
|
#include <functional>
|
2017-06-19 20:06:35 +00:00
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
2015-09-29 19:19:54 +00:00
|
|
|
#include <common/logger_useful.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/ProfileEvents.h>
|
|
|
|
#include <Common/StringUtils.h>
|
2017-10-23 15:08:31 +00:00
|
|
|
#include <Common/PODArray.h>
|
2017-09-09 23:17:38 +00:00
|
|
|
#include <Common/randomSeed.h>
|
2014-10-16 01:21:03 +00:00
|
|
|
|
2014-03-07 13:50:58 +00:00
|
|
|
|
2016-10-24 02:02:37 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Event ZooKeeperInit;
|
|
|
|
extern const Event ZooKeeperTransactions;
|
|
|
|
extern const Event ZooKeeperCreate;
|
|
|
|
extern const Event ZooKeeperRemove;
|
|
|
|
extern const Event ZooKeeperExists;
|
|
|
|
extern const Event ZooKeeperMulti;
|
|
|
|
extern const Event ZooKeeperGet;
|
|
|
|
extern const Event ZooKeeperSet;
|
|
|
|
extern const Event ZooKeeperGetChildren;
|
2016-10-24 02:02:37 +00:00
|
|
|
}
|
|
|
|
|
2016-10-24 13:47:15 +00:00
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Metric ZooKeeperWatch;
|
2016-10-24 13:47:15 +00:00
|
|
|
}
|
|
|
|
|
2016-10-24 02:02:37 +00:00
|
|
|
|
2014-03-07 13:50:58 +00:00
|
|
|
namespace zkutil
|
|
|
|
{
|
|
|
|
|
2014-06-04 13:48:36 +00:00
|
|
|
const int CreateMode::Persistent = 0;
|
|
|
|
const int CreateMode::Ephemeral = ZOO_EPHEMERAL;
|
|
|
|
const int CreateMode::EphemeralSequential = ZOO_EPHEMERAL | ZOO_SEQUENCE;
|
|
|
|
const int CreateMode::PersistentSequential = ZOO_SEQUENCE;
|
|
|
|
|
2017-09-07 21:04:48 +00:00
|
|
|
|
|
|
|
static void check(int32_t code, const std::string & path)
|
2014-06-04 16:48:55 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code != ZOK)
|
2017-09-07 21:04:48 +00:00
|
|
|
throw KeeperException(code, path);
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-09-07 21:04:48 +00:00
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
struct WatchContext
|
2014-03-07 13:50:58 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// ZooKeeper instance exists for the entire WatchContext lifetime.
|
|
|
|
ZooKeeper & zk;
|
|
|
|
WatchCallback callback;
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperWatch};
|
2014-04-08 10:13:49 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
WatchContext(ZooKeeper & zk_, WatchCallback callback_) : zk(zk_), callback(std::move(callback_)) {}
|
2014-03-07 13:50:58 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
void process(int32_t event_type, int32_t state, const char * path)
|
|
|
|
{
|
|
|
|
if (callback)
|
|
|
|
callback(zk, event_type, state, path);
|
|
|
|
}
|
2014-06-30 11:33:06 +00:00
|
|
|
};
|
2014-03-07 13:50:58 +00:00
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
void ZooKeeper::processCallback(zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx)
|
2014-06-04 13:48:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
WatchContext * context = static_cast<WatchContext *>(watcher_ctx);
|
|
|
|
context->process(type, state, path);
|
2014-07-01 08:50:03 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// It is guaranteed that non-ZOO_SESSION_EVENT notification will be delivered only once
|
|
|
|
/// (https://issues.apache.org/jira/browse/ZOOKEEPER-890)
|
|
|
|
if (type != ZOO_SESSION_EVENT)
|
|
|
|
destroyContext(context);
|
2014-06-04 13:48:36 +00:00
|
|
|
}
|
2014-03-07 13:50:58 +00:00
|
|
|
|
2017-11-01 22:59:27 +00:00
|
|
|
void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
|
|
|
|
int32_t session_timeout_ms_, bool check_root_exists)
|
2014-03-07 13:50:58 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
log = &Logger::get("ZooKeeper");
|
|
|
|
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
|
|
|
|
hosts = hosts_;
|
2017-08-28 17:12:43 +00:00
|
|
|
identity = identity_;
|
2017-04-01 07:20:54 +00:00
|
|
|
session_timeout_ms = session_timeout_ms_;
|
2014-03-13 14:49:17 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!impl)
|
|
|
|
throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts);
|
2014-03-07 13:50:58 +00:00
|
|
|
|
2017-08-28 16:35:57 +00:00
|
|
|
if (!identity.empty())
|
|
|
|
{
|
2017-11-01 22:59:27 +00:00
|
|
|
auto code = zoo_add_auth(impl, "digest", identity.c_str(), static_cast<int>(identity.size()), nullptr, nullptr);
|
2017-08-28 16:35:57 +00:00
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException("Zookeeper authentication failed. Hosts are " + hosts, code);
|
|
|
|
|
|
|
|
default_acl = &ZOO_CREATOR_ALL_ACL;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
default_acl = &ZOO_OPEN_ACL_UNSAFE;
|
2017-08-29 19:18:27 +00:00
|
|
|
|
|
|
|
LOG_TRACE(log, "initialized, hosts: " << hosts);
|
2017-11-01 22:59:27 +00:00
|
|
|
|
|
|
|
if (check_root_exists && !exists("/"))
|
|
|
|
throw KeeperException("Zookeeper root doesn't exist. You should create root node before start.");
|
2014-03-07 13:50:58 +00:00
|
|
|
}
|
|
|
|
|
2017-11-01 22:59:27 +00:00
|
|
|
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity,
|
|
|
|
int32_t session_timeout_ms, bool check_root_exists)
|
2014-03-13 14:49:17 +00:00
|
|
|
{
|
2017-11-01 22:59:27 +00:00
|
|
|
init(hosts, identity, session_timeout_ms, check_root_exists);
|
2014-03-13 14:49:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
struct ZooKeeperArgs
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
|
|
|
{
|
|
|
|
Poco::Util::AbstractConfiguration::Keys keys;
|
|
|
|
config.keys(config_name, keys);
|
|
|
|
|
|
|
|
std::vector<std::string> hosts_strings;
|
2017-08-30 14:01:21 +00:00
|
|
|
std::string root;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
|
2017-11-01 22:59:27 +00:00
|
|
|
has_chroot = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
for (const auto & key : keys)
|
|
|
|
{
|
|
|
|
if (startsWith(key, "node"))
|
|
|
|
{
|
|
|
|
hosts_strings.push_back(
|
2017-08-28 16:35:57 +00:00
|
|
|
config.getString(config_name + "." + key + ".host") + ":"
|
|
|
|
+ config.getString(config_name + "." + key + ".port", "2181")
|
|
|
|
);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
else if (key == "session_timeout_ms")
|
|
|
|
{
|
|
|
|
session_timeout_ms = config.getInt(config_name + "." + key);
|
|
|
|
}
|
2017-08-28 16:35:57 +00:00
|
|
|
else if (key == "identity")
|
|
|
|
{
|
|
|
|
identity = config.getString(config_name + "." + key);
|
|
|
|
}
|
2017-08-29 19:18:27 +00:00
|
|
|
else if (key == "root")
|
|
|
|
{
|
2017-08-30 14:01:21 +00:00
|
|
|
root = config.getString(config_name + "." + key);
|
2017-08-29 19:18:27 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
else throw KeeperException(std::string("Unknown key ") + key + " in config file");
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
|
2017-09-09 23:17:38 +00:00
|
|
|
pcg64 rng(randomSeed());
|
|
|
|
std::shuffle(hosts_strings.begin(), hosts_strings.end(), rng);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
for (auto & host : hosts_strings)
|
|
|
|
{
|
|
|
|
if (hosts.size())
|
|
|
|
hosts += ",";
|
2017-08-30 14:01:21 +00:00
|
|
|
hosts += host;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2017-08-30 14:01:21 +00:00
|
|
|
|
2017-08-30 18:04:47 +00:00
|
|
|
if (!root.empty())
|
|
|
|
{
|
|
|
|
if (root.front() != '/')
|
2017-08-30 18:21:49 +00:00
|
|
|
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + root);
|
2017-11-01 22:59:27 +00:00
|
|
|
if (root.back() == '/')
|
|
|
|
root.pop_back();
|
|
|
|
|
2017-08-30 18:04:47 +00:00
|
|
|
hosts += root;
|
2017-11-01 22:59:27 +00:00
|
|
|
has_chroot = true;
|
2017-08-30 18:04:47 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::string hosts;
|
2017-08-28 16:35:57 +00:00
|
|
|
std::string identity;
|
|
|
|
int session_timeout_ms;
|
2017-11-01 22:59:27 +00:00
|
|
|
bool has_chroot;
|
2014-03-13 14:49:17 +00:00
|
|
|
};
|
|
|
|
|
2014-07-08 12:45:10 +00:00
|
|
|
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
2014-03-13 14:49:17 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
ZooKeeperArgs args(config, config_name);
|
2017-11-01 22:59:27 +00:00
|
|
|
init(args.hosts, args.identity, args.session_timeout_ms, args.has_chroot);
|
2014-03-13 14:49:17 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event)
|
2014-06-30 11:33:06 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
WatchCallback callback;
|
|
|
|
if (event)
|
|
|
|
{
|
|
|
|
callback = [e=event](ZooKeeper &, int, int, const char *) mutable
|
|
|
|
{
|
|
|
|
if (e)
|
|
|
|
{
|
|
|
|
e->set();
|
|
|
|
e.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events.
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
return callback;
|
2017-03-17 00:44:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
WatchContext * ZooKeeper::createContext(WatchCallback && callback)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (callback)
|
|
|
|
{
|
|
|
|
WatchContext * res = new WatchContext(*this, std::move(callback));
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
watch_context_store.insert(res);
|
|
|
|
if (watch_context_store.size() % 10000 == 0)
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "There are " << watch_context_store.size() << " active watches. There must be a leak somewhere.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
return nullptr;
|
2014-06-30 11:33:06 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
void ZooKeeper::destroyContext(WatchContext * context)
|
2014-06-30 11:33:06 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (context)
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(context->zk.mutex);
|
|
|
|
context->zk.watch_context_store.erase(context);
|
|
|
|
}
|
|
|
|
delete context;
|
2014-06-30 11:33:06 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 16:48:55 +00:00
|
|
|
int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
|
2017-04-01 07:20:54 +00:00
|
|
|
Stat * stat_,
|
|
|
|
WatchCallback watch_callback)
|
|
|
|
{
|
|
|
|
String_vector strings;
|
|
|
|
int code;
|
|
|
|
Stat stat;
|
|
|
|
watcher_fn watcher = watch_callback ? processCallback : nullptr;
|
|
|
|
WatchContext * context = createContext(std::move(watch_callback));
|
|
|
|
code = zoo_wget_children2(impl, path.c_str(), watcher, context, &strings, &stat);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperGetChildren);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
|
|
|
|
|
|
|
if (code == ZOK)
|
|
|
|
{
|
|
|
|
if (stat_)
|
|
|
|
*stat_ = stat;
|
|
|
|
res.resize(strings.count);
|
|
|
|
for (int i = 0; i < strings.count; ++i)
|
|
|
|
res[i] = std::string(strings.data[i]);
|
|
|
|
deallocate_String_vector(&strings);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// The call was unsuccessful, so the watch was not set. Destroy the context.
|
|
|
|
destroyContext(context);
|
|
|
|
}
|
|
|
|
|
|
|
|
return code;
|
2014-03-07 19:18:48 +00:00
|
|
|
}
|
2014-06-04 16:48:55 +00:00
|
|
|
Strings ZooKeeper::getChildren(
|
2017-04-01 07:20:54 +00:00
|
|
|
const std::string & path, Stat * stat, const EventPtr & watch)
|
2014-06-04 16:48:55 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Strings res;
|
|
|
|
check(tryGetChildren(path, res, stat, watch), path);
|
|
|
|
return res;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
2014-03-07 19:18:48 +00:00
|
|
|
|
2014-06-04 16:48:55 +00:00
|
|
|
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
|
2017-04-01 07:20:54 +00:00
|
|
|
Stat * stat_, const EventPtr & watch)
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = retry(std::bind(&ZooKeeper::getChildrenImpl, this, std::ref(path), std::ref(res), stat_, callbackForEvent(watch)));
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK || code == ZNONODE))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw KeeperException(code, path);
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2016-04-09 02:03:44 +00:00
|
|
|
int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int code;
|
|
|
|
/// The name of the created node can be longer than path if the sequential node is created.
|
|
|
|
size_t name_buffer_size = path.size() + SEQUENTIAL_SUFFIX_SIZE;
|
|
|
|
char * name_buffer = new char[name_buffer_size];
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
code = zoo_create(impl, path.c_str(), data.c_str(), data.size(), getDefaultACL(), mode, name_buffer, name_buffer_size);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperCreate);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-03-21 18:58:24 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code == ZOK)
|
|
|
|
{
|
|
|
|
path_created = std::string(name_buffer);
|
|
|
|
}
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
delete[] name_buffer;
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t type)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string path_created;
|
|
|
|
check(tryCreate(path, data, type, path_created), path);
|
|
|
|
return path_created;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
2016-04-09 02:03:44 +00:00
|
|
|
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
|
2014-06-04 16:48:55 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int code = createImpl(path, data, mode, path_created);
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK ||
|
|
|
|
code == ZNONODE ||
|
|
|
|
code == ZNODEEXISTS ||
|
|
|
|
code == ZNOCHILDRENFOREPHEMERALS))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw KeeperException(code, path);
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 13:48:36 +00:00
|
|
|
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode)
|
2014-05-07 13:58:20 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string path_created;
|
|
|
|
return tryCreate(path, data, mode, path_created);
|
2014-05-07 13:58:20 +00:00
|
|
|
}
|
|
|
|
|
2016-04-09 02:03:44 +00:00
|
|
|
int32_t ZooKeeper::tryCreateWithRetries(const std::string & path, const std::string & data, int32_t mode, std::string & path_created, size_t* attempt)
|
2014-07-03 17:24:17 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return retry([&path, &data, mode, &path_created, this] { return tryCreate(path, data, mode, path_created); }, attempt);
|
2014-07-03 17:24:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2014-06-27 17:52:50 +00:00
|
|
|
void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string path_created;
|
|
|
|
int32_t code = retry(std::bind(&ZooKeeper::createImpl, this, std::ref(path), std::ref(data), zkutil::CreateMode::Persistent, std::ref(path_created)));
|
2014-06-27 17:52:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code == ZOK || code == ZNODEEXISTS)
|
|
|
|
return;
|
|
|
|
else
|
|
|
|
throw KeeperException(code, path);
|
2014-06-27 17:52:50 +00:00
|
|
|
}
|
|
|
|
|
2014-08-11 14:05:38 +00:00
|
|
|
void ZooKeeper::createAncestors(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t pos = 1;
|
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
pos = path.find('/', pos);
|
|
|
|
if (pos == std::string::npos)
|
|
|
|
break;
|
|
|
|
createIfNotExists(path.substr(0, pos), "");
|
|
|
|
++pos;
|
|
|
|
}
|
2014-08-11 14:05:38 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 16:48:55 +00:00
|
|
|
int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = zoo_delete(impl, path.c_str(), version);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
|
|
|
return code;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
2014-03-07 17:57:53 +00:00
|
|
|
void ZooKeeper::remove(const std::string & path, int32_t version)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
check(tryRemove(path, version), path);
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2015-05-15 14:53:39 +00:00
|
|
|
void ZooKeeper::removeWithRetries(const std::string & path, int32_t version)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t attempt;
|
|
|
|
int code = tryRemoveWithRetries(path, version, &attempt);
|
2015-05-15 14:53:39 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!(code == ZOK || (code == ZNONODE && attempt > 0)))
|
|
|
|
throw KeeperException(code, path);
|
2015-05-15 14:53:39 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 13:48:36 +00:00
|
|
|
int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = removeImpl(path, version);
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK ||
|
|
|
|
code == ZNONODE ||
|
|
|
|
code == ZBADVERSION ||
|
|
|
|
code == ZNOTEMPTY))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw KeeperException(code, path);
|
|
|
|
return code;
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2014-07-03 17:24:17 +00:00
|
|
|
int32_t ZooKeeper::tryRemoveWithRetries(const std::string & path, int32_t version, size_t * attempt)
|
2014-07-02 15:04:42 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = retry(std::bind(&ZooKeeper::removeImpl, this, std::ref(path), version), attempt);
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK ||
|
|
|
|
code == ZNONODE ||
|
|
|
|
code == ZBADVERSION ||
|
|
|
|
code == ZNOTEMPTY))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
throw KeeperException(code, path);
|
|
|
|
}
|
2016-04-09 02:03:44 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-07-02 15:04:42 +00:00
|
|
|
}
|
|
|
|
|
2016-04-09 02:03:44 +00:00
|
|
|
int32_t ZooKeeper::tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version, size_t * attempt)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
return tryRemoveWithRetries(path, version, attempt);
|
|
|
|
}
|
|
|
|
catch (const KeeperException &)
|
|
|
|
{
|
|
|
|
/// Set the flag indicating that the session is better treated as expired so that someone
|
|
|
|
/// recreates it and the ephemeral nodes are indeed deleted.
|
|
|
|
is_dirty = true;
|
2016-04-09 02:03:44 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
2016-04-09 02:03:44 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, WatchCallback watch_callback)
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code;
|
|
|
|
Stat stat;
|
|
|
|
watcher_fn watcher = watch_callback ? processCallback : nullptr;
|
|
|
|
WatchContext * context = createContext(std::move(watch_callback));
|
|
|
|
code = zoo_wexists(impl, path.c_str(), watcher, context, &stat);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code == ZOK)
|
|
|
|
{
|
|
|
|
if (stat_)
|
|
|
|
*stat_ = stat;
|
|
|
|
}
|
|
|
|
if (code != ZOK && code != ZNONODE)
|
|
|
|
{
|
|
|
|
/// The call was unsuccessful, so the watch was not set. Destroy the context.
|
|
|
|
destroyContext(context);
|
|
|
|
}
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
bool ZooKeeper::exists(const std::string & path, Stat * stat_, const EventPtr & watch)
|
2014-06-04 16:48:55 +00:00
|
|
|
{
|
2017-11-10 15:32:43 +00:00
|
|
|
return existsWatch(path, stat_, callbackForEvent(watch));
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchCallback & watch_callback)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = retry(std::bind(&ZooKeeper::existsImpl, this, path, stat_, watch_callback));
|
2017-03-17 00:44:00 +00:00
|
|
|
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK || code == ZNONODE))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw KeeperException(code, path);
|
|
|
|
if (code == ZNONODE)
|
|
|
|
return false;
|
|
|
|
return true;
|
2017-03-17 00:44:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * stat_, WatchCallback watch_callback)
|
2014-06-04 16:48:55 +00:00
|
|
|
{
|
2017-10-23 15:08:31 +00:00
|
|
|
DB::PODArray<char> buffer;
|
2017-10-23 14:39:29 +00:00
|
|
|
buffer.resize(MAX_NODE_SIZE);
|
2017-04-01 07:20:54 +00:00
|
|
|
int buffer_len = MAX_NODE_SIZE;
|
2017-10-18 22:13:42 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code;
|
|
|
|
Stat stat;
|
|
|
|
watcher_fn watcher = watch_callback ? processCallback : nullptr;
|
|
|
|
WatchContext * context = createContext(std::move(watch_callback));
|
|
|
|
|
2017-10-18 22:13:42 +00:00
|
|
|
code = zoo_wget(impl, path.c_str(), watcher, context, buffer.data(), &buffer_len, &stat);
|
2017-04-01 07:20:54 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
|
|
|
|
|
|
|
if (code == ZOK)
|
|
|
|
{
|
|
|
|
if (stat_)
|
|
|
|
*stat_ = stat;
|
|
|
|
|
|
|
|
if (buffer_len < 0) /// This can happen if the node contains NULL. Do not distinguish it from the empty string.
|
|
|
|
res.clear();
|
|
|
|
else
|
2017-10-18 22:13:42 +00:00
|
|
|
res.assign(buffer.data(), buffer_len);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// The call was unsuccessful, so the watch was not set. Destroy the context.
|
|
|
|
destroyContext(context);
|
|
|
|
}
|
|
|
|
return code;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
2017-10-23 15:08:31 +00:00
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr & watch)
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int code;
|
|
|
|
std::string res;
|
|
|
|
if (tryGet(path, res, stat, watch, &code))
|
|
|
|
return res;
|
|
|
|
else
|
|
|
|
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 00:44:00 +00:00
|
|
|
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, const EventPtr & watch, int * return_code)
|
|
|
|
{
|
2017-11-10 15:32:43 +00:00
|
|
|
return tryGetWatch(path, res, stat_, callbackForEvent(watch), return_code);
|
2017-03-17 00:44:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat_, const WatchCallback & watch_callback, int * return_code)
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = retry(std::bind(&ZooKeeper::getImpl, this, std::ref(path), std::ref(res), stat_, watch_callback));
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK || code == ZNONODE))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw KeeperException(code, path);
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (return_code)
|
|
|
|
*return_code = code;
|
2015-10-01 12:24:44 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code == ZOK;
|
2014-03-07 13:50:58 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 16:48:55 +00:00
|
|
|
int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data,
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t version, Stat * stat_)
|
2014-06-04 16:48:55 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Stat stat;
|
|
|
|
int32_t code = zoo_set2(impl, path.c_str(), data.c_str(), data.length(), version, &stat);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperSet);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-09-11 20:34:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code == ZOK)
|
|
|
|
{
|
|
|
|
if (stat_)
|
|
|
|
*stat_ = stat;
|
|
|
|
}
|
|
|
|
return code;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
2014-03-07 17:57:53 +00:00
|
|
|
void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Stat * stat)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
check(trySet(path, data, version, stat), path);
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2015-06-09 12:30:30 +00:00
|
|
|
void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int code = trySet(path, data, -1);
|
|
|
|
if (code == ZNONODE)
|
|
|
|
{
|
|
|
|
create(path, data, mode);
|
|
|
|
}
|
|
|
|
else if (code != ZOK)
|
|
|
|
throw zkutil::KeeperException(code, path);
|
2015-06-09 12:30:30 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 13:48:36 +00:00
|
|
|
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t version, Stat * stat_)
|
2014-03-07 19:18:48 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = setImpl(path, data, version, stat_);
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-10-23 15:08:31 +00:00
|
|
|
if (!(code == ZOK ||
|
|
|
|
code == ZNONODE ||
|
|
|
|
code == ZBADVERSION))
|
2017-04-01 07:20:54 +00:00
|
|
|
throw KeeperException(code, path);
|
|
|
|
return code;
|
2014-03-07 19:18:48 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 16:48:55 +00:00
|
|
|
int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
|
2014-06-04 13:48:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (ops_.empty())
|
|
|
|
return ZOK;
|
2014-07-28 14:31:07 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Workaround of the libzookeeper bug. If the session is expired, zoo_multi sometimes
|
|
|
|
/// segfaults.
|
|
|
|
/// Possibly, there is a race condition and a segfault is still possible if the session
|
|
|
|
/// expires between this check and zoo_multi call.
|
|
|
|
/// TODO: check if the bug is fixed in the latest version of libzookeeper.
|
|
|
|
if (expired())
|
|
|
|
return ZINVALIDSTATE;
|
2014-08-11 11:24:10 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t count = ops_.size();
|
|
|
|
OpResultsPtr out_results(new OpResults(count));
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Copy the struct containing pointers with default copy-constructor.
|
|
|
|
/// It is safe because it hasn't got a destructor.
|
|
|
|
std::vector<zoo_op_t> ops;
|
|
|
|
for (const auto & op : ops_)
|
|
|
|
ops.push_back(*(op->data));
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-08-09 21:09:44 +00:00
|
|
|
int32_t code = zoo_multi(impl, static_cast<int>(ops.size()), ops.data(), out_results->data());
|
2017-04-01 07:20:54 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (out_results_)
|
|
|
|
*out_results_ = out_results;
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
OpResultsPtr ZooKeeper::multi(const Ops & ops)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
OpResultsPtr results;
|
2017-08-03 00:21:50 +00:00
|
|
|
int code = tryMulti(ops, &results);
|
|
|
|
if (code != ZOK)
|
|
|
|
{
|
|
|
|
if (results && results->size() == ops.size())
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < ops.size(); ++i)
|
|
|
|
{
|
|
|
|
if (results->at(i).err == code)
|
|
|
|
throw KeeperException("multi() failed at op #" + std::to_string(i) + ", " + ops[i]->describe(), code);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
throw KeeperException(code);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
return results;
|
2014-06-04 16:48:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = multiImpl(ops_, out_results_);
|
2014-06-04 16:48:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!(code == ZOK ||
|
|
|
|
code == ZNONODE ||
|
|
|
|
code == ZNODEEXISTS ||
|
|
|
|
code == ZNOCHILDRENFOREPHEMERALS ||
|
|
|
|
code == ZBADVERSION ||
|
|
|
|
code == ZNOTEMPTY))
|
2017-10-23 15:08:31 +00:00
|
|
|
throw KeeperException(code);
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-03-07 19:18:48 +00:00
|
|
|
}
|
|
|
|
|
2014-07-03 11:22:12 +00:00
|
|
|
int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results, size_t * attempt)
|
2014-06-30 14:21:39 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = retry(std::bind(&ZooKeeper::multiImpl, this, std::ref(ops), out_results), attempt);
|
|
|
|
if (!(code == ZOK ||
|
|
|
|
code == ZNONODE ||
|
|
|
|
code == ZNODEEXISTS ||
|
|
|
|
code == ZNOCHILDRENFOREPHEMERALS ||
|
|
|
|
code == ZBADVERSION ||
|
|
|
|
code == ZNOTEMPTY))
|
2017-10-23 15:08:31 +00:00
|
|
|
throw KeeperException(code);
|
2017-04-01 07:20:54 +00:00
|
|
|
return code;
|
2014-06-30 14:21:39 +00:00
|
|
|
}
|
|
|
|
|
2014-05-27 12:08:40 +00:00
|
|
|
void ZooKeeper::removeChildrenRecursive(const std::string & path)
|
2014-03-22 14:44:44 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Strings children = getChildren(path);
|
|
|
|
while (!children.empty())
|
|
|
|
{
|
|
|
|
zkutil::Ops ops;
|
2017-08-09 21:09:44 +00:00
|
|
|
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
removeChildrenRecursive(path + "/" + children.back());
|
|
|
|
ops.emplace_back(std::make_unique<Op::Remove>(path + "/" + children.back(), -1));
|
|
|
|
children.pop_back();
|
|
|
|
}
|
|
|
|
multi(ops);
|
|
|
|
}
|
2014-05-19 09:21:57 +00:00
|
|
|
}
|
|
|
|
|
2014-07-07 09:51:42 +00:00
|
|
|
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Strings children;
|
|
|
|
if (tryGetChildren(path, children) != ZOK)
|
|
|
|
return;
|
|
|
|
while (!children.empty())
|
|
|
|
{
|
|
|
|
zkutil::Ops ops;
|
|
|
|
Strings batch;
|
2017-08-09 21:09:44 +00:00
|
|
|
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
batch.push_back(path + "/" + children.back());
|
|
|
|
children.pop_back();
|
|
|
|
tryRemoveChildrenRecursive(batch.back());
|
|
|
|
ops.emplace_back(std::make_unique<Op::Remove>(batch.back(), -1));
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Try to remove the children with a faster method - in bulk. If this fails,
|
|
|
|
/// this means someone is concurrently removing these children and we will have
|
|
|
|
/// to remove them one by one.
|
|
|
|
if (tryMulti(ops) != ZOK)
|
|
|
|
{
|
|
|
|
for (const std::string & child : batch)
|
|
|
|
{
|
|
|
|
tryRemove(child);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-07-07 09:51:42 +00:00
|
|
|
}
|
|
|
|
|
2014-05-19 09:21:57 +00:00
|
|
|
void ZooKeeper::removeRecursive(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
removeChildrenRecursive(path);
|
|
|
|
remove(path);
|
2014-03-22 14:44:44 +00:00
|
|
|
}
|
|
|
|
|
2014-07-07 09:51:42 +00:00
|
|
|
void ZooKeeper::tryRemoveRecursive(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
tryRemoveChildrenRecursive(path);
|
|
|
|
tryRemove(path);
|
2014-07-07 09:51:42 +00:00
|
|
|
}
|
|
|
|
|
2015-09-11 02:13:59 +00:00
|
|
|
|
|
|
|
void ZooKeeper::waitForDisappear(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
2015-09-11 02:13:59 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string unused;
|
|
|
|
/// Use get instead of exists to prevent watch leak if the node has already disappeared.
|
|
|
|
if (!tryGet(path, unused, nullptr, event))
|
|
|
|
break;
|
2015-09-11 02:13:59 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
event->wait();
|
|
|
|
}
|
2015-09-11 02:13:59 +00:00
|
|
|
}
|
|
|
|
|
2014-06-30 07:58:16 +00:00
|
|
|
ZooKeeper::~ZooKeeper()
|
2014-03-07 17:57:53 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");
|
2014-08-08 12:53:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
int code = zookeeper_close(impl);
|
|
|
|
if (code != ZOK)
|
|
|
|
{
|
|
|
|
LOG_ERROR(&Logger::get("~ZooKeeper"), "Failed to close ZooKeeper session: " << zerror(code));
|
|
|
|
}
|
2014-06-04 13:48:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_INFO(&Logger::get("~ZooKeeper"), "Removing " << watch_context_store.size() << " watches");
|
2014-08-08 12:53:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Destroy WatchContexts that will never be used.
|
|
|
|
for (WatchContext * context : watch_context_store)
|
|
|
|
delete context;
|
2014-08-08 12:53:55 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_INFO(&Logger::get("~ZooKeeper"), "Removed watches");
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|
|
|
|
|
2014-04-25 13:55:15 +00:00
|
|
|
ZooKeeperPtr ZooKeeper::startNewSession() const
|
|
|
|
{
|
2017-08-28 17:12:43 +00:00
|
|
|
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms);
|
2014-04-25 13:55:15 +00:00
|
|
|
}
|
|
|
|
|
2017-08-09 23:07:56 +00:00
|
|
|
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)
|
|
|
|
: path(path_), value(value_), acl(acl_), flags(flags_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
|
2014-06-04 13:48:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
|
2014-06-04 13:48:36 +00:00
|
|
|
}
|
|
|
|
|
2014-11-30 07:01:00 +00:00
|
|
|
ACLPtr ZooKeeper::getDefaultACL()
|
2014-06-04 13:48:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
return default_acl;
|
2014-06-04 13:48:36 +00:00
|
|
|
}
|
|
|
|
|
2014-11-30 07:01:00 +00:00
|
|
|
void ZooKeeper::setDefaultACL(ACLPtr new_acl)
|
2014-06-04 13:48:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
default_acl = new_acl;
|
2014-06-04 13:48:36 +00:00
|
|
|
}
|
|
|
|
|
2014-06-04 13:51:40 +00:00
|
|
|
std::string ZooKeeper::error2string(int32_t code)
|
2014-06-04 13:48:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return zerror(code);
|
2014-06-04 13:48:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool ZooKeeper::expired()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return is_dirty || zoo_state(impl) == ZOO_EXPIRED_SESSION_STATE;
|
2014-06-04 13:48:36 +00:00
|
|
|
}
|
2014-07-03 17:24:17 +00:00
|
|
|
|
2016-10-26 22:27:38 +00:00
|
|
|
Int64 ZooKeeper::getClientID()
|
2014-07-03 17:24:17 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return zoo_client_id(impl)->client_id;
|
2014-07-03 17:24:17 +00:00
|
|
|
}
|
|
|
|
|
2014-10-16 20:05:26 +00:00
|
|
|
|
|
|
|
ZooKeeper::GetFuture ZooKeeper::asyncGet(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
GetFuture future {
|
|
|
|
[path] (int rc, const char * value, int value_len, const Stat * stat)
|
|
|
|
{
|
|
|
|
if (rc != ZOK)
|
|
|
|
throw KeeperException(rc, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string value_str;
|
|
|
|
if (value_len > 0) /// May be otherwise of the node contains NULL. We don't distinguish it from the empty string.
|
|
|
|
value_str = { value, size_t(value_len) };
|
2015-10-13 09:57:59 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return ValueAndStat{ value_str, stat ? *stat : Stat() };
|
|
|
|
}};
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = zoo_aget(
|
|
|
|
impl, path.c_str(), 0,
|
|
|
|
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
|
|
|
|
{
|
|
|
|
GetFuture::TaskPtr owned_task = std::move(const_cast<GetFuture::TaskPtr &>(*static_cast<const GetFuture::TaskPtr *>(data)));
|
|
|
|
(*owned_task)(rc, value, value_len, stat);
|
|
|
|
},
|
|
|
|
future.task.get());
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-10-16 22:03:38 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException(code, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return future;
|
2014-10-16 20:05:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ZooKeeper::TryGetFuture ZooKeeper::asyncTryGet(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
TryGetFuture future {
|
|
|
|
[path] (int rc, const char * value, int value_len, const Stat * stat)
|
|
|
|
{
|
|
|
|
if (rc != ZOK && rc != ZNONODE)
|
|
|
|
throw KeeperException(rc, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string value_str;
|
|
|
|
if (value_len > 0) /// May be otherwise of the node contains NULL. We don't distinguish it from the empty string.
|
|
|
|
value_str = { value, size_t(value_len) };
|
2015-10-13 09:57:59 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return ValueAndStatAndExists{ value_str, stat ? *stat : Stat(), rc != ZNONODE };
|
|
|
|
}};
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = zoo_aget(
|
|
|
|
impl, path.c_str(), 0,
|
|
|
|
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
|
|
|
|
{
|
|
|
|
TryGetFuture::TaskPtr owned_task = std::move(const_cast<TryGetFuture::TaskPtr &>(*static_cast<const TryGetFuture::TaskPtr *>(data)));
|
|
|
|
(*owned_task)(rc, value, value_len, stat);
|
|
|
|
},
|
|
|
|
future.task.get());
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-10-16 22:03:38 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException(code, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return future;
|
2014-10-16 20:05:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ZooKeeper::ExistsFuture ZooKeeper::asyncExists(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
ExistsFuture future {
|
|
|
|
[path] (int rc, const Stat * stat)
|
|
|
|
{
|
|
|
|
if (rc != ZOK && rc != ZNONODE)
|
|
|
|
throw KeeperException(rc, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return StatAndExists{ stat ? *stat : Stat(), rc != ZNONODE };
|
|
|
|
}};
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = zoo_aexists(
|
|
|
|
impl, path.c_str(), 0,
|
|
|
|
[] (int rc, const Stat * stat, const void * data)
|
|
|
|
{
|
|
|
|
ExistsFuture::TaskPtr owned_task = std::move(const_cast<ExistsFuture::TaskPtr &>(*static_cast<const ExistsFuture::TaskPtr *>(data)));
|
|
|
|
(*owned_task)(rc, stat);
|
|
|
|
},
|
|
|
|
future.task.get());
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-10-16 22:03:38 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException(code, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return future;
|
2014-10-16 20:05:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ZooKeeper::GetChildrenFuture ZooKeeper::asyncGetChildren(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
GetChildrenFuture future {
|
|
|
|
[path] (int rc, const String_vector * strings)
|
|
|
|
{
|
|
|
|
if (rc != ZOK)
|
|
|
|
throw KeeperException(rc, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
Strings res;
|
|
|
|
res.resize(strings->count);
|
|
|
|
for (int i = 0; i < strings->count; ++i)
|
|
|
|
res[i] = std::string(strings->data[i]);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return res;
|
|
|
|
}};
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
int32_t code = zoo_aget_children(
|
|
|
|
impl, path.c_str(), 0,
|
|
|
|
[] (int rc, const String_vector * strings, const void * data)
|
|
|
|
{
|
|
|
|
GetChildrenFuture::TaskPtr owned_task =
|
|
|
|
std::move(const_cast<GetChildrenFuture::TaskPtr &>(*static_cast<const GetChildrenFuture::TaskPtr *>(data)));
|
|
|
|
(*owned_task)(rc, strings);
|
|
|
|
},
|
|
|
|
future.task.get());
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperGetChildren);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
2014-10-16 22:03:38 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException(code, path);
|
2014-10-16 20:05:26 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return future;
|
2014-10-16 20:05:26 +00:00
|
|
|
}
|
|
|
|
|
2015-09-23 20:51:01 +00:00
|
|
|
ZooKeeper::RemoveFuture ZooKeeper::asyncRemove(const std::string & path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
RemoveFuture future {
|
|
|
|
[path] (int rc)
|
|
|
|
{
|
|
|
|
if (rc != ZOK)
|
|
|
|
throw KeeperException(rc, path);
|
|
|
|
}};
|
|
|
|
|
|
|
|
int32_t code = zoo_adelete(
|
|
|
|
impl, path.c_str(), -1,
|
|
|
|
[] (int rc, const void * data)
|
|
|
|
{
|
|
|
|
RemoveFuture::TaskPtr owned_task =
|
|
|
|
std::move(const_cast<RemoveFuture::TaskPtr &>(*static_cast<const RemoveFuture::TaskPtr *>(data)));
|
|
|
|
(*owned_task)(rc);
|
|
|
|
},
|
|
|
|
future.task.get());
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
|
|
|
|
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException(code, path);
|
|
|
|
|
|
|
|
return future;
|
2015-09-23 20:51:01 +00:00
|
|
|
}
|
|
|
|
|
2017-08-09 21:09:44 +00:00
|
|
|
ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception)
|
|
|
|
{
|
|
|
|
size_t count = ops_.size();
|
|
|
|
OpResultsPtr results(new OpResults(count));
|
|
|
|
|
2017-08-09 23:07:56 +00:00
|
|
|
/// We need to hold all references to ops data until the end of multi callback
|
|
|
|
struct OpsHolder
|
|
|
|
{
|
|
|
|
std::shared_ptr<zkutil::Ops> ops_ptr = std::make_shared<zkutil::Ops>();
|
2017-08-10 02:38:18 +00:00
|
|
|
std::shared_ptr<std::vector<zoo_op_t>> ops_raw_ptr = std::make_shared<std::vector<zoo_op_t>>();
|
2017-08-09 23:07:56 +00:00
|
|
|
} holder;
|
|
|
|
|
|
|
|
for (const auto & op : ops_)
|
|
|
|
{
|
|
|
|
holder.ops_ptr->emplace_back(op->clone());
|
|
|
|
holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data);
|
|
|
|
}
|
|
|
|
|
|
|
|
MultiFuture future{ [throw_exception, results, holder] (int rc) {
|
2017-08-09 21:09:44 +00:00
|
|
|
OpResultsAndCode res;
|
|
|
|
res.code = rc;
|
|
|
|
res.results = results;
|
2017-08-09 23:07:56 +00:00
|
|
|
res.ops_ptr = holder.ops_ptr;
|
2017-08-09 21:09:44 +00:00
|
|
|
if (throw_exception && rc != ZOK)
|
|
|
|
throw zkutil::KeeperException(rc);
|
|
|
|
return res;
|
|
|
|
}};
|
|
|
|
|
|
|
|
if (ops_.empty())
|
|
|
|
{
|
|
|
|
(**future.task)(ZOK);
|
|
|
|
return future;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Workaround of the libzookeeper bug.
|
|
|
|
/// TODO: check if the bug is fixed in the latest version of libzookeeper.
|
|
|
|
if (expired())
|
|
|
|
throw KeeperException(ZINVALIDSTATE);
|
|
|
|
|
2017-08-09 23:07:56 +00:00
|
|
|
auto & ops = *holder.ops_raw_ptr;
|
2017-08-09 21:09:44 +00:00
|
|
|
|
|
|
|
int32_t code = zoo_amulti(impl, static_cast<int>(ops.size()), ops.data(), results->data(),
|
|
|
|
[] (int rc, const void * data)
|
|
|
|
{
|
|
|
|
MultiFuture::TaskPtr owned_task =
|
2017-08-10 02:40:53 +00:00
|
|
|
std::move(const_cast<MultiFuture::TaskPtr &>(*static_cast<const MultiFuture::TaskPtr *>(data)));
|
2017-08-09 21:09:44 +00:00
|
|
|
(*owned_task)(rc);
|
|
|
|
}, future.task.get());
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
|
|
|
|
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
|
|
|
|
|
|
|
|
if (code != ZOK)
|
|
|
|
throw KeeperException(code);
|
|
|
|
|
|
|
|
return future;
|
|
|
|
}
|
|
|
|
|
|
|
|
ZooKeeper::MultiFuture ZooKeeper::tryAsyncMulti(const zkutil::Ops & ops)
|
|
|
|
{
|
|
|
|
return asyncMultiImpl(ops, false);
|
|
|
|
}
|
|
|
|
|
|
|
|
ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
|
|
|
|
{
|
|
|
|
return asyncMultiImpl(ops, true);
|
|
|
|
}
|
|
|
|
|
2014-03-07 17:57:53 +00:00
|
|
|
}
|