ClickHouse/dbms/src/Common/ZooKeeper/ZooKeeper.cpp

987 lines
29 KiB
C++
Raw Normal View History

#include "ZooKeeper.h"
#include <random>
#include <pcg_random.hpp>
#include <functional>
2015-09-29 19:19:54 +00:00
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/PODArray.h>
#include <Common/randomSeed.h>
2014-03-07 13:50:58 +00:00
namespace ProfileEvents
{
extern const Event ZooKeeperInit;
extern const Event ZooKeeperTransactions;
extern const Event ZooKeeperCreate;
extern const Event ZooKeeperRemove;
extern const Event ZooKeeperExists;
extern const Event ZooKeeperMulti;
extern const Event ZooKeeperGet;
extern const Event ZooKeeperSet;
extern const Event ZooKeeperGetChildren;
}
2016-10-24 13:47:15 +00:00
namespace CurrentMetrics
{
extern const Metric ZooKeeperWatch;
2016-10-24 13:47:15 +00:00
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
2014-03-07 13:50:58 +00:00
namespace zkutil
{
const int CreateMode::Persistent = 0;
const int CreateMode::Ephemeral = 1;
const int CreateMode::PersistentSequential = 2;
const int CreateMode::EphemeralSequential = 3;
static void check(int32_t code, const std::string & path)
{
if (code)
throw KeeperException(code, path);
}
void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, const std::string & chroot_)
2014-03-07 13:50:58 +00:00
{
log = &Logger::get("ZooKeeper");
hosts = hosts_;
identity = identity_;
session_timeout_ms = session_timeout_ms_;
chroot = chroot_;
std::string hosts_for_lib = hosts + chroot;
impl = zookeeper_init(hosts_for_lib.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
if (!identity.empty())
{
auto code = zoo_add_auth(impl, "digest", identity.c_str(), static_cast<int>(identity.size()), nullptr, nullptr);
if (code != ZOK)
throw KeeperException("Zookeeper authentication failed. Hosts are " + hosts, code);
default_acl = &ZOO_CREATOR_ALL_ACL;
}
else
default_acl = &ZOO_OPEN_ACL_UNSAFE;
LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
if (!chroot.empty() && !exists("/"))
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.");
2014-03-07 13:50:58 +00:00
}
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity,
int32_t session_timeout_ms, const std::string & chroot)
{
init(hosts, identity, session_timeout_ms, chroot);
}
struct ZooKeeperArgs
{
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
std::vector<std::string> hosts_strings;
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
for (const auto & key : keys)
{
if (startsWith(key, "node"))
{
hosts_strings.push_back(
config.getString(config_name + "." + key + ".host") + ":"
+ config.getString(config_name + "." + key + ".port", "2181")
);
}
else if (key == "session_timeout_ms")
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "identity")
{
identity = config.getString(config_name + "." + key);
}
else if (key == "root")
{
chroot = config.getString(config_name + "." + key);
}
else throw KeeperException(std::string("Unknown key ") + key + " in config file");
}
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
pcg64 rng(randomSeed());
std::shuffle(hosts_strings.begin(), hosts_strings.end(), rng);
for (auto & host : hosts_strings)
{
if (hosts.size())
hosts += ",";
hosts += host;
}
if (!chroot.empty())
{
if (chroot.front() != '/')
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot);
if (chroot.back() == '/')
chroot.pop_back();
}
}
std::string hosts;
std::string identity;
int session_timeout_ms;
std::string chroot;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, args.identity, args.session_timeout_ms, args.chroot);
}
static WatchCallback callbackForEvent(const EventPtr & watch)
{
return [watch](const ZooKeeperImpl::ZooKeeper::WatchResponse &) { watch->set(); };
}
int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Stat * stat,
WatchCallback watch_callback)
{
int32_t code = 0;
Poco::Event event;
auto callback = [&](const ZooKeeperImpl::ZooKeeper::ListResponse & response)
{
code = response.error;
if (!code)
{
res = response.names;
if (stat)
*stat = response.stat;
}
event.set();
};
impl.list(path, callback, watch_callback);
event.wait();
return code;
}
Strings ZooKeeper::getChildren(
const std::string & path, Stat * stat, const EventPtr & watch)
{
Strings res;
check(tryGetChildren(path, res, stat, watch), path);
return res;
}
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Stat * stat, const EventPtr & watch)
2014-03-07 17:57:53 +00:00
{
int32_t code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
if (!(code == ZOK || code == ZNONODE))
throw KeeperException(code, path);
return code;
2014-03-07 17:57:53 +00:00
}
int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
2014-03-07 17:57:53 +00:00
{
int32_t code = 0;
Poco::Event event;
2014-03-21 18:58:24 +00:00
auto callback = [&](const ZooKeeperImpl::ZooKeeper::CreateResponse & response)
{
code = response.error;
if (!code)
path_created = response.path_created;
event.set();
};
impl.create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperCreate);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t type)
{
std::string path_created;
check(tryCreate(path, data, type, path_created), path);
return path_created;
}
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{
int32_t code = createImpl(path, data, mode, path_created);
if (!(code == ZOK ||
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS))
throw KeeperException(code, path);
return code;
2014-03-07 17:57:53 +00:00
}
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode)
2014-05-07 13:58:20 +00:00
{
std::string path_created;
return tryCreate(path, data, mode, path_created);
2014-05-07 13:58:20 +00:00
}
void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
{
std::string path_created;
int32_t code = createImpl(path, data, zkutil::CreateMode::Persistent, path_created);
if (code == ZOK || code == ZNODEEXISTS)
return;
else
throw KeeperException(code, path);
}
2014-08-11 14:05:38 +00:00
void ZooKeeper::createAncestors(const std::string & path)
{
size_t pos = 1;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
createIfNotExists(path.substr(0, pos), "");
++pos;
}
2014-08-11 14:05:38 +00:00
}
int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version)
{
int32_t code = 0;
Poco::Event event;
auto callback = [&](const ZooKeeperImpl::ZooKeeper::RemoveResponse & response)
{
if (response.error)
code = response.error;
event.set();
};
impl.remove(path, version, callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
2014-03-07 17:57:53 +00:00
void ZooKeeper::remove(const std::string & path, int32_t version)
{
check(tryRemove(path, version), path);
2014-03-07 17:57:53 +00:00
}
int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
2014-03-07 17:57:53 +00:00
{
int32_t code = removeImpl(path, version);
if (!(code == ZOK ||
code == ZNONODE ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
throw KeeperException(code, path);
return code;
2014-03-07 17:57:53 +00:00
}
int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat, WatchCallback watch_callback)
{
int32_t code = 0;
Poco::Event event;
auto callback = [&](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response)
{
code = response.error;
if (!code && stat)
*stat = response.stat;
event.set();
};
impl.exists(path, callback, watch_callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
bool ZooKeeper::exists(const std::string & path, Stat * stat, const EventPtr & watch)
{
return existsWatch(path, stat, callbackForEvent(watch));
2014-03-07 17:57:53 +00:00
}
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback)
{
int32_t code = existsImpl(path, stat, watch_callback);
if (!(code == ZOK || code == ZNONODE))
throw KeeperException(code, path);
if (code == ZNONODE)
return false;
return true;
}
int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback)
{
int32_t code = 0;
Poco::Event event;
auto callback = [&](const ZooKeeperImpl::ZooKeeper::GetResponse & response)
{
code = response.error;
if (!code)
{
res = response.data;
if (stat)
*stat = response.stat;
}
event.set();
};
impl.get(path, callback, watch_callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr & watch)
2014-03-07 17:57:53 +00:00
{
int32_t code = 0;
std::string res;
if (tryGet(path, res, stat, watch, &code))
return res;
else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
2014-03-07 17:57:53 +00:00
}
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, const EventPtr & watch, int * return_code)
{
return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
}
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * return_code)
2014-03-07 17:57:53 +00:00
{
int32_t code = getImpl(path, res, stat, watch_callback);
if (!(code == ZOK || code == ZNONODE))
throw KeeperException(code, path);
if (return_code)
*return_code = code;
return code == ZOK;
2014-03-07 13:50:58 +00:00
}
int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data,
int32_t version, Stat * stat)
{
int32_t code = 0;
Poco::Event event;
auto callback = [&](const ZooKeeperImpl::ZooKeeper::SetResponse & response)
{
code = response.error;
if (!code && stat)
*stat = response.stat;
event.set();
};
impl.set(path, data, version, callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperSet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
2014-03-07 17:57:53 +00:00
void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Stat * stat)
{
check(trySet(path, data, version, stat), path);
2014-03-07 17:57:53 +00:00
}
void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode)
{
int32_t code = trySet(path, data, -1);
if (code == ZNONODE)
{
create(path, data, mode);
}
else if (code != ZOK)
throw zkutil::KeeperException(code, path);
}
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Stat * stat)
{
int32_t code = setImpl(path, data, version, stat);
if (!(code == ZOK ||
code == ZNONODE ||
code == ZBADVERSION))
throw KeeperException(code, path);
return code;
}
int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_op_results, MultiTransactionInfo * out_info)
{
if (ops_.empty())
return ZOK;
2014-07-28 14:31:07 +00:00
size_t count = ops_.size();
std::vector<zoo_op_result_t> out_results_native(count);
/// Copy the struct containing pointers with default copy-constructor.
/// It is safe because it hasn't got a destructor.
std::vector<zoo_op_t> ops;
ops.reserve(ops_.size());
for (const auto & op : ops_)
ops.push_back(*(op->data));
int32_t code = zoo_multi(impl, static_cast<int>(ops.size()), ops.data(), out_results_native.data());
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (out_op_results || out_info)
{
OpResultsPtr op_results;
convertOpResults(out_results_native, op_results, this);
if (out_op_results)
*out_op_results = op_results;
if (out_info)
*out_info = MultiTransactionInfo(code, ops_, op_results);
}
return code;
}
OpResultsPtr ZooKeeper::multi(const Ops & ops)
{
OpResultsPtr op_results;
int32_t code = multiImpl(ops, &op_results);
KeeperMultiException::check(code, ops, op_results);
return op_results;
}
int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
{
int32_t code = multiImpl(ops_, out_results_);
if (!(code == ZOK ||
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
throw KeeperException(code);
return code;
}
int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results, size_t * attempt)
{
int32_t code = retry(std::bind(&ZooKeeper::multiImpl, this, std::ref(ops), out_results, nullptr), attempt);
if (!(code == ZOK ||
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
throw KeeperException(code);
return code;
}
2014-05-27 12:08:40 +00:00
void ZooKeeper::removeChildrenRecursive(const std::string & path)
2014-03-22 14:44:44 +00:00
{
Strings children = getChildren(path);
while (!children.empty())
{
zkutil::Ops ops;
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
removeChildrenRecursive(path + "/" + children.back());
ops.emplace_back(std::make_unique<Op::Remove>(path + "/" + children.back(), -1));
children.pop_back();
}
multi(ops);
}
}
2014-07-07 09:51:42 +00:00
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
{
Strings children;
if (tryGetChildren(path, children) != ZOK)
return;
while (!children.empty())
{
zkutil::Ops ops;
Strings batch;
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
batch.push_back(path + "/" + children.back());
children.pop_back();
tryRemoveChildrenRecursive(batch.back());
ops.emplace_back(std::make_unique<Op::Remove>(batch.back(), -1));
}
/// Try to remove the children with a faster method - in bulk. If this fails,
/// this means someone is concurrently removing these children and we will have
/// to remove them one by one.
if (tryMulti(ops) != ZOK)
{
for (const std::string & child : batch)
{
tryRemove(child);
}
}
}
2014-07-07 09:51:42 +00:00
}
void ZooKeeper::removeRecursive(const std::string & path)
{
removeChildrenRecursive(path);
remove(path);
2014-03-22 14:44:44 +00:00
}
2014-07-07 09:51:42 +00:00
void ZooKeeper::tryRemoveRecursive(const std::string & path)
{
tryRemoveChildrenRecursive(path);
tryRemove(path);
2014-07-07 09:51:42 +00:00
}
void ZooKeeper::waitForDisappear(const std::string & path)
{
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string unused;
/// Use get instead of exists to prevent watch leak if the node has already disappeared.
if (!tryGet(path, unused, nullptr, event))
break;
event->wait();
}
}
2014-06-30 07:58:16 +00:00
ZooKeeper::~ZooKeeper()
2014-03-07 17:57:53 +00:00
{
LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");
2014-08-08 12:53:55 +00:00
int32_t code = zookeeper_close(impl);
if (code != ZOK)
{
LOG_ERROR(&Logger::get("~ZooKeeper"), "Failed to close ZooKeeper session: " << zerror(code));
}
LOG_INFO(&Logger::get("~ZooKeeper"), "Removing " << watch_context_store.size() << " watches");
2014-08-08 12:53:55 +00:00
/// Destroy WatchContexts that will never be used.
for (WatchContext * context : watch_context_store)
delete context;
2014-08-08 12:53:55 +00:00
LOG_INFO(&Logger::get("~ZooKeeper"), "Removed watches");
2014-03-07 17:57:53 +00:00
}
2014-04-25 13:55:15 +00:00
ZooKeeperPtr ZooKeeper::startNewSession() const
{
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, chroot);
2014-04-25 13:55:15 +00:00
}
Op::Create::Create(const std::string & path_pattern_, const std::string & value_, ACLPtr acl_, int32_t flags_)
: path_pattern(path_pattern_), value(value_), acl(acl_), flags(flags_), created_path(path_pattern_.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
{
zoo_create_op_init(data.get(), path_pattern.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
}
2014-11-30 07:01:00 +00:00
ACLPtr ZooKeeper::getDefaultACL()
{
std::lock_guard<std::mutex> lock(mutex);
return default_acl;
}
2014-06-04 13:51:40 +00:00
std::string ZooKeeper::error2string(int32_t code)
{
return zerror(code);
}
bool ZooKeeper::expired()
{
return is_dirty || zoo_state(impl) == ZOO_EXPIRED_SESSION_STATE;
}
2014-07-03 17:24:17 +00:00
Int64 ZooKeeper::getClientID()
2014-07-03 17:24:17 +00:00
{
return zoo_client_id(impl)->client_id;
2014-07-03 17:24:17 +00:00
}
ZooKeeper::GetFuture ZooKeeper::asyncGet(const std::string & path)
{
GetFuture future {
[path] (int rc, const char * value, int value_len, const Stat * stat)
{
if (rc != ZOK)
throw KeeperException(rc, path);
std::string value_str;
if (value_len > 0) /// May be otherwise of the node contains NULL. We don't distinguish it from the empty string.
value_str = { value, size_t(value_len) };
return ValueAndStat{ value_str, stat ? *stat : Stat() };
}};
int32_t code = zoo_aget(
impl, path.c_str(), 0,
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
GetFuture::TaskPtr owned_task = std::move(const_cast<GetFuture::TaskPtr &>(*static_cast<const GetFuture::TaskPtr *>(data)));
(*owned_task)(rc, value, value_len, stat);
},
future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
2014-10-16 22:03:38 +00:00
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::TryGetFuture ZooKeeper::asyncTryGet(const std::string & path)
{
TryGetFuture future {
[path] (int rc, const char * value, int value_len, const Stat * stat)
{
if (rc != ZOK && rc != ZNONODE)
throw KeeperException(rc, path);
std::string value_str;
if (value_len > 0) /// May be otherwise of the node contains NULL. We don't distinguish it from the empty string.
value_str = { value, size_t(value_len) };
return ValueAndStatAndExists{ value_str, stat ? *stat : Stat(), rc != ZNONODE };
}};
int32_t code = zoo_aget(
impl, path.c_str(), 0,
[] (int rc, const char * value, int value_len, const Stat * stat, const void * data)
{
TryGetFuture::TaskPtr owned_task = std::move(const_cast<TryGetFuture::TaskPtr &>(*static_cast<const TryGetFuture::TaskPtr *>(data)));
(*owned_task)(rc, value, value_len, stat);
},
future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
2014-10-16 22:03:38 +00:00
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::ExistsFuture ZooKeeper::asyncExists(const std::string & path)
{
ExistsFuture future {
[path] (int rc, const Stat * stat)
{
if (rc != ZOK && rc != ZNONODE)
throw KeeperException(rc, path);
return StatAndExists{ stat ? *stat : Stat(), rc != ZNONODE };
}};
int32_t code = zoo_aexists(
impl, path.c_str(), 0,
[] (int rc, const Stat * stat, const void * data)
{
ExistsFuture::TaskPtr owned_task = std::move(const_cast<ExistsFuture::TaskPtr &>(*static_cast<const ExistsFuture::TaskPtr *>(data)));
(*owned_task)(rc, stat);
},
future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
2014-10-16 22:03:38 +00:00
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::GetChildrenFuture ZooKeeper::asyncGetChildren(const std::string & path)
{
GetChildrenFuture future {
[path] (int rc, const String_vector * strings)
{
if (rc != ZOK)
throw KeeperException(rc, path);
Strings res;
res.resize(strings->count);
for (int i = 0; i < strings->count; ++i)
res[i] = std::string(strings->data[i]);
return res;
}};
int32_t code = zoo_aget_children(
impl, path.c_str(), 0,
[] (int rc, const String_vector * strings, const void * data)
{
GetChildrenFuture::TaskPtr owned_task =
std::move(const_cast<GetChildrenFuture::TaskPtr &>(*static_cast<const GetChildrenFuture::TaskPtr *>(data)));
(*owned_task)(rc, strings);
},
future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperGetChildren);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
2014-10-16 22:03:38 +00:00
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::RemoveFuture ZooKeeper::asyncRemove(const std::string & path, int32_t version)
{
RemoveFuture future {
[path] (int rc)
{
if (rc != ZOK)
throw KeeperException(rc, path);
}};
int32_t code = zoo_adelete(
impl, path.c_str(), version,
[] (int rc, const void * data)
{
RemoveFuture::TaskPtr owned_task =
std::move(const_cast<RemoveFuture::TaskPtr &>(*static_cast<const RemoveFuture::TaskPtr *>(data)));
(*owned_task)(rc);
},
future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::TryRemoveFuture ZooKeeper::asyncTryRemove(const std::string & path, int32_t version)
{
TryRemoveFuture future {
[path] (int rc)
{
if (rc != ZOK && rc != ZNONODE && rc != ZBADVERSION && rc != ZNOTEMPTY)
throw KeeperException(rc, path);
return rc;
}};
int32_t code = zoo_adelete(
impl, path.c_str(), version,
[] (int rc, const void * data)
{
TryRemoveFuture::TaskPtr owned_task =
std::move(const_cast<TryRemoveFuture::TaskPtr &>(*static_cast<const TryRemoveFuture::TaskPtr *>(data)));
(*owned_task)(rc);
},
future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (code != ZOK)
throw KeeperException(code, path);
return future;
}
ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception)
{
/// We need to hold all references to ops data until the end of multi callback
struct OpsHolder
{
std::shared_ptr<zkutil::Ops> ops_ptr;
std::shared_ptr<std::vector<zoo_op_t>> ops_native;
std::shared_ptr<std::vector<zoo_op_result_t>> op_results_native;
} holder;
/// Copy ops (swallow copy)
holder.ops_ptr = std::make_shared<zkutil::Ops>(ops_);
/// Copy native ops to contiguous vector
holder.ops_native = std::make_shared<std::vector<zoo_op_t>>();
for (const OpPtr & op : *holder.ops_ptr)
holder.ops_native->push_back(*op->data);
/// Allocate native result holders
holder.op_results_native = std::make_shared<std::vector<zoo_op_result_t>>(holder.ops_ptr->size());
MultiFuture future{ [throw_exception, holder, zookeeper=this] (int rc) {
OpResultsAndCode res;
res.code = rc;
convertOpResults(*holder.op_results_native, res.results, zookeeper);
res.ops_ptr = holder.ops_ptr;
if (throw_exception && rc != ZOK)
throw zkutil::KeeperException(rc);
return res;
}};
if (ops_.empty())
{
(**future.task)(ZOK);
return future;
}
/// Workaround of the libzookeeper bug.
/// TODO: check if the bug is fixed in the latest version of libzookeeper.
if (expired())
throw KeeperException(ZINVALIDSTATE);
int32_t code = zoo_amulti(impl, static_cast<int>(holder.ops_native->size()),
holder.ops_native->data(),
holder.op_results_native->data(),
[] (int rc, const void * data)
{
MultiFuture::TaskPtr owned_task =
2017-08-10 02:40:53 +00:00
std::move(const_cast<MultiFuture::TaskPtr &>(*static_cast<const MultiFuture::TaskPtr *>(data)));
(*owned_task)(rc);
}, future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (code != ZOK)
throw KeeperException(code);
return future;
}
ZooKeeper::MultiFuture ZooKeeper::tryAsyncMulti(const zkutil::Ops & ops)
{
return asyncMultiImpl(ops, false);
}
ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
{
return asyncMultiImpl(ops, true);
}
size_t getFailedOpIndex(const OpResultsPtr & op_results, int32_t transaction_return_code)
{
if (op_results == nullptr || op_results->empty())
throw DB::Exception("OpResults is empty", DB::ErrorCodes::LOGICAL_ERROR);
for (size_t index = 0; index < op_results->size(); ++index)
{
if ((*op_results)[index].err != ZOK)
return index;
}
if (!isUserError(transaction_return_code))
{
throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(transaction_return_code) + "' is not valid response code for that",
DB::ErrorCodes::LOGICAL_ERROR);
}
throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
}
OpResult::OpResult(const zoo_op_result_t & op_result, const ZooKeeper * zookeeper)
: err(op_result.err)
{
if (op_result.value)
{
value = std::string(op_result.value, op_result.value + op_result.valuelen);
/// Current version of libzookeeper does not cut chroot path prefixes
/// We do it here manually
if (zookeeper && !zookeeper->chroot.empty())
{
if (startsWith(value, zookeeper->chroot))
value = value.substr(zookeeper->chroot.length());
else
throw DB::Exception("Expected ZooKeeper path with chroot " + zookeeper->chroot + ", got " + value,
DB::ErrorCodes::LOGICAL_ERROR);
}
}
if (op_result.stat)
stat = std::make_unique<Stat>(*op_result.stat);
}
KeeperMultiException::KeeperMultiException(const MultiTransactionInfo & info_, size_t failed_op_index_)
:KeeperException(
"Transaction failed at op #" + std::to_string(failed_op_index_) + ": " + info_.ops.at(failed_op_index_)->describe(),
info_.code),
info(info_) {}
void KeeperMultiException::check(int32_t code, const Ops & ops, const OpResultsPtr & op_results)
{
if (code == ZOK) {}
else if (zkutil::isUserError(code))
throw KeeperMultiException(MultiTransactionInfo(code, ops, op_results), getFailedOpIndex(op_results, code));
else
throw KeeperException(code);
}
void KeeperMultiException::check(const MultiTransactionInfo & info)
{
if (info.code == ZOK) {}
else if (zkutil::isUserError(info.code))
throw KeeperMultiException(info, getFailedOpIndex(info.op_results, info.code));
else
throw KeeperException(info.code);
}
const Op & MultiTransactionInfo::getFailedOp() const
{
return *ops.at(getFailedOpIndex(op_results, code));
}
2014-03-07 17:57:53 +00:00
}