zkutil: use apache C client library instead of libzkcpp

This commit is contained in:
Pavel Kartavyy 2014-06-04 17:48:36 +04:00
parent 80403594bc
commit 07861aca9b
6 changed files with 380 additions and 274 deletions

View File

@ -9,19 +9,19 @@ namespace zkutil
class KeeperException : public DB::Exception
{
public:
KeeperException(const std::string & msg) : DB::Exception(msg), code(ReturnCode::Ok) {}
KeeperException(const std::string & msg, ReturnCode::type code_)
: DB::Exception(msg + " (" + ReturnCode::toString(code_) + ")"), code(code_) {}
KeeperException(ReturnCode::type code_)
: DB::Exception(ReturnCode::toString(code_)), code(code_) {}
KeeperException(ReturnCode::type code_, const std::string & path_)
: DB::Exception(ReturnCode::toString(code_) + " path: " + path_), code(code_) {}
KeeperException(const std::string & msg) : DB::Exception(msg), code(ZOK) {}
KeeperException(const std::string & msg, int32_t code_)
: DB::Exception(msg + " (" + zerror(code_) + ")"), code(code_) {}
KeeperException(int32_t code_)
: DB::Exception(zerror(code_)), code(code_) {}
KeeperException(int32_t code_, const std::string & path_)
: DB::Exception(std::string(zerror(code_)) + " path: " + path_), code(code_) {}
const char * name() const throw() { return "zkutil::KeeperException"; }
const char * className() const throw() { return "zkutil::KeeperException"; }
KeeperException * clone() const { return new KeeperException(message(), code); }
ReturnCode::type code;
int32_t code;
};
};

View File

@ -1,44 +1,110 @@
#pragma once
#include <zookeeper/zookeeper.hh>
#include <Yandex/Common.h>
#include <boost/function.hpp>
#include <future>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <zookeeper/zookeeper.h>
namespace zkutil
{
typedef const ACL_vector * AclPtr;
typedef Stat Stat;
namespace zk = org::apache::zookeeper;
namespace CreateMode = zk::CreateMode;
namespace ReturnCode = zk::ReturnCode;
namespace SessionState = zk::SessionState;
namespace WatchEvent = zk::WatchEvent;
struct Op
{
public:
Op() : data(new zoo_op_t) {}
virtual ~Op() {}
typedef zk::data::Stat Stat;
typedef zk::data::ACL ACL;
typedef zk::Op Op;
typedef zk::OpResult OpResult;
std::unique_ptr<zoo_op_t> data;
class Remove;
class Create;
class SetData;
class Check;
};
struct Op::Remove : public Op
{
Remove(const std::string & path_, int32_t version) :
path(path_)
{
zoo_delete_op_init(data.get(), path.c_str(), version);
}
private:
std::string path;
};
struct Op::Create : public Op
{
Create(const std::string & path_, const std::string & value_, AclPtr acl, int32_t flags);
std::string getPathCreated()
{
return created_path.data();
}
private:
std::string path;
std::string value;
std::vector<char> created_path;
};
struct Op::SetData : public Op
{
SetData(const std::string & path_, const std::string & value_, int32_t version) :
path(path_), value(value_)
{
zoo_set_op_init(data.get(), path.c_str(), value.c_str(), value.size(), version, &stat);
}
private:
std::string path;
std::string value;
Stat stat;
};
struct Op::Check : public Op
{
Check(const std::string & path_, int32_t version) :
path(path_)
{
zoo_check_op_init(data.get(), path.c_str(), version);
}
private:
std::string path;
};
struct OpResult : public zoo_op_result_t, boost::noncopyable
{
};
typedef std::vector<ACL> ACLs;
typedef boost::ptr_vector<Op> Ops;
typedef boost::ptr_vector<OpResult> OpResults;
typedef std::shared_ptr<boost::ptr_vector<OpResult> > OpResultsPtr;
typedef std::vector<OpResult> OpResults;
typedef std::shared_ptr<OpResults> OpResultsPtr;
typedef std::vector<std::string> Strings;
typedef boost::function<void (WatchEvent::type event, SessionState::type state,
const std::string & path)> WatchFunction;
typedef void (*WatchFunction)(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
namespace CreateMode
{
extern const int Persistent;
extern const int Ephemeral;
extern const int EphemeralSequential;
extern const int PersistentSequential;
}
struct WatchEventInfo
{
WatchEvent::type event;
SessionState::type state;
int32_t event;
int32_t state;
std::string path;
WatchEventInfo() {}
WatchEventInfo(WatchEvent::type event_, SessionState::type state_, const std::string & path_)
WatchEventInfo(int32_t event_, int32_t state_, const char * path_)
: event(event_), state(state_), path(path_) {}
};
typedef std::future<WatchEventInfo> WatchFuture;
}

View File

@ -2,6 +2,7 @@
#include <zkutil/Types.h>
#include <zkutil/KeeperException.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <unordered_set>
namespace zkutil
@ -9,6 +10,11 @@ namespace zkutil
const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
struct WatchWithPromise;
/// Из-за вызова С кода легче самому явно управлять памятью
typedef WatchWithPromise * WatchWithPromisePtr;
/** Сессия в ZooKeeper. Интерфейс существенно отличается от обычного API ZooKeeper.
* Вместо callback-ов для watch-ей используются std::future.
* Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке.
@ -45,6 +51,8 @@ public:
*/
Ptr startNewSession() const;
int state();
/** Возвращает true, если сессия навсегда завершена.
* Это возможно только если соединение было установлено, потом разорвалось, потом снова восстановилось, но слишком поздно.
* Это достаточно редкая ситуация.
@ -53,14 +61,14 @@ public:
*/
bool expired();
void setDefaultACL(ACLs & acl);
AclPtr getDefaultACL();
ACLs getDefaultACL();
void setDefaultACL(AclPtr new_acl);
/** Создать znode. Используется ACL, выставленный вызовом setDefaultACL (по умолчанию, всем полный доступ).
* Если что-то пошло не так, бросить исключение.
*/
std::string create(const std::string & path, const std::string & data, CreateMode::type mode);
std::string create(const std::string & path, const std::string & data, int32_t mode);
/** Не бросает исключение при следующих ошибках:
* - Нет родителя создаваемой ноды.
@ -68,8 +76,8 @@ public:
* - Такая нода уже есть.
* При остальных ошибках бросает исключение.
*/
ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated);
ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode);
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated);
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode);
/** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия).
*/
@ -80,7 +88,7 @@ public:
* - У ноды другая версия.
* - У ноды есть дети.
*/
ReturnCode::type tryRemove(const std::string & path, int32_t version = -1);
int32_t tryRemove(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Stat * stat = nullptr, WatchFuture * watch = nullptr);
@ -98,7 +106,7 @@ public:
* - Такой ноды нет.
* - У ноды другая версия.
*/
ReturnCode::type trySet(const std::string & path, const std::string & data,
int32_t trySet(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
Strings getChildren(const std::string & path,
@ -108,7 +116,7 @@ public:
/** Не бросает исключение при следующих ошибках:
* - Такой ноды нет. В таком случае возвращает false.
*/
bool tryGetChildren(const std::string & path, Strings & res,
int32_t tryGetChildren(const std::string & path, Strings & res,
Stat * stat = nullptr,
WatchFuture * watch = nullptr);
@ -120,36 +128,37 @@ public:
/** Бросает исключение только если какая-нибудь операция вернула "неожиданную" ошибку - такую ошибку,
* увидев которую соответствующий метод try* бросил бы исключение. */
ReturnCode::type tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr);
int32_t tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr);
/** Удаляет ноду вместе с поддеревом. Если в это время кто-то добавит иили удалит ноду в поддереве, результат не определен.
*/
void removeRecursive(const std::string & path);
static std::string error2string(int32_t code);
/// максимальный размер данных в узле в байтах
/// В версии 3.4.5. максимальный размер узла 1 Mb
static const size_t MAX_NODE_SIZE = 1048576;
/// Размер прибавляемого ZooKeeper суффикса при создании Sequential ноды
/// На самом деле размер меньше, но для удобства округлим в верхнюю сторону
static const size_t SEQUENTIAL_SUFFIX_SIZE = 64;
private:
void init(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_);
friend struct StateWatch;
zk::ZooKeeper impl;
void removeChildrenRecursive(const std::string & path);
WatchWithPromisePtr watchForFuture(WatchFuture * future);
static void processPromise(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx);
std::string hosts;
int32_t sessionTimeoutMs;
WatchFunction * state_watch;
Poco::FastMutex mutex;
ACLs default_acl;
SessionState::type session_state;
AclPtr default_acl;
zhandle_t * impl;
void stateChanged(WatchEvent::type event, SessionState::type state, const std::string& path);
/** Бросает исключение, если сессия истекла. Почему-то zkcpp этого не делает, а вместо этого виснет (смотри zkcpp_expiration_test).
* Не очень надежно: возможно, вызов к zkcpp все же может повиснуть, если между проверкой и вызовом состояние успеет поменяться.
* Если это окажется проблемой, возможно, стоит избавиться от zkcpp.
*/
void checkNotExpired();
void removeChildrenRecursive(const std::string & path);
WatchFunction * state_watch;
std::unordered_set<WatchWithPromise *> watch_store;
};
typedef ZooKeeper::Ptr ZooKeeperPtr;

View File

@ -2,28 +2,34 @@
#include <boost/make_shared.hpp>
#include <Yandex/logger_useful.h>
#define CHECKED(x, path) { ReturnCode::type code = x; if (code != ReturnCode::Ok) throw KeeperException(code, path); }
#define CHECKED_WITHOUT_PATH(x) { ReturnCode::type code = x; if (code != ReturnCode::Ok) throw KeeperException(code); }
namespace zkutil
{
const int CreateMode::Persistent = 0;
const int CreateMode::Ephemeral = ZOO_EPHEMERAL;
const int CreateMode::EphemeralSequential = ZOO_EPHEMERAL | ZOO_SEQUENCE;
const int CreateMode::PersistentSequential = ZOO_SEQUENCE;
#define CHECK(x, path) { int32_t code = x; if (code != ZOK) throw KeeperException(code, path); }
#define CHECK_NO_PATH(x) { int32_t code = x; if (code != ZOK) throw KeeperException(code); }
typedef std::promise<WatchEventInfo> WatchPromise;
struct WatchWithPromise : public zk::Watch
struct WatchWithPromise
{
WatchPromise promise;
bool notified;
/// существует все время существования WatchWithPromise
ZooKeeper & zk;
WatchWithPromise() : notified(false) {}
WatchWithPromise(ZooKeeper & zk_) : notified(false), zk(zk_) {}
void process(WatchEvent::type event, SessionState::type state, const std::string & path)
void process(zhandle_t * zh, int32_t event, int32_t state, const char * path)
{
if (notified)
{
LOG_WARNING(&Logger::get("WatchWithPromise"), "Ignoring event " << WatchEvent::toString(event) << " with state "
<< SessionState::toString(state) << (path.empty() ? "" : " for path " + path));
LOG_WARNING(&Logger::get("WatchWithPromise"), "Ignoring event " << event << " with state "
<< state << (path ? std::string(" for path ") + path : ""));
return;
}
promise.set_value(WatchEventInfo(event, state, path));
@ -31,29 +37,27 @@ struct WatchWithPromise : public zk::Watch
}
};
typedef boost::shared_ptr<zk::Watch> WatchPtr;
typedef boost::shared_ptr<WatchWithPromise> WatchWithPromisePtr;
static WatchPtr watchForFuture(WatchFuture * future)
WatchWithPromisePtr ZooKeeper::watchForFuture(WatchFuture * future)
{
if (!future)
return nullptr;
WatchWithPromisePtr res = boost::make_shared<WatchWithPromise>();
WatchWithPromisePtr res = new WatchWithPromise(*this);
watch_store.insert(res);
*future = res->promise.get_future();
return res;
}
struct StateWatch : public zk::Watch
void ZooKeeper::processPromise(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx)
{
ZooKeeper * owner;
StateWatch(ZooKeeper * owner_) : owner(owner_) {}
void process(WatchEvent::type event, SessionState::type state, const std::string & path)
if (watcherCtx)
{
owner->stateChanged(event, state, path);
WatchWithPromise * watch = reinterpret_cast<WatchWithPromise *>(watcherCtx);
watch->process(zh, type, state, path);
delete watch;
watch->zk.watch_store.erase(watch);
}
};
}
void ZooKeeper::init(const std::string & hosts_, int32_t sessionTimeoutMs_, WatchFunction * watch_)
{
@ -61,17 +65,20 @@ void ZooKeeper::init(const std::string & hosts_, int32_t sessionTimeoutMs_, Watc
sessionTimeoutMs = sessionTimeoutMs_;
state_watch = watch_;
CHECKED_WITHOUT_PATH(impl.init(hosts, sessionTimeoutMs, boost::make_shared<StateWatch>(this)));
if (watch_)
impl = zookeeper_init(hosts.c_str(), *watch_, sessionTimeoutMs, nullptr, nullptr, 0);
else
impl = zookeeper_init(hosts.c_str(), nullptr, sessionTimeoutMs, nullptr, nullptr, 0);
ACL perm;
perm.getid().getscheme() = "world";
perm.getid().getid() = "anyone";
perm.setperms(zk::Permission::All);
default_acl.push_back(perm);
if (!impl)
throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts);
default_acl = &ZOO_OPEN_ACL_UNSAFE;
}
ZooKeeper::ZooKeeper(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_)
{
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
init(hosts, sessionTimeoutMs, watch_);
}
@ -111,207 +118,195 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std
init(args.hosts, args.session_timeout_ms, watch);
}
void ZooKeeper::stateChanged(WatchEvent::type event, SessionState::type state, const std::string & path)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
session_state = state;
if (state_watch)
(*state_watch)(event, state, path);
}
void ZooKeeper::checkNotExpired()
{
if (expired())
throw KeeperException(ReturnCode::SessionExpired);
}
bool ZooKeeper::expired()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return session_state == SessionState::Expired;
}
void ZooKeeper::setDefaultACL(ACLs & acl)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
default_acl = acl;
}
ACLs ZooKeeper::getDefaultACL()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return default_acl;
}
Strings ZooKeeper::getChildren(
const std::string & path, Stat * stat, WatchFuture * watch)
{
checkNotExpired();
Stat s;
Strings res;
CHECKED(impl.getChildren(path, watchForFuture(watch), res, s), path);
if (stat)
*stat = s;
CHECK(tryGetChildren(path, res, stat, watch), path);
return res;
}
bool ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Stat * stat, WatchFuture * watch)
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Stat * stat_, WatchFuture * watch)
{
checkNotExpired();
Stat s;
ReturnCode::type code = impl.getChildren(path, watchForFuture(watch), res, s);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode))
String_vector strings;
int code;
Stat stat;
if (watch)
{
code = zoo_wget_children2(impl, path.c_str(), processPromise, reinterpret_cast<void *>(watchForFuture(watch)), &strings, &stat);
}
else
{
code = zoo_wget_children2(impl, path.c_str(), nullptr, nullptr, &strings, &stat);
}
if (stat_)
*stat_ = stat;
if (code == ZOK)
{
res.resize(strings.count);
for (int i = 0; i < strings.count; ++i)
res[i] = std::string(strings.data[i]);
deallocate_String_vector(&strings);
}
else if (code == ZNONODE)
throw KeeperException(code, path);
if (code == ReturnCode::NoNode)
return false;
if (stat)
*stat = s;
return true;
}
std::string ZooKeeper::create(const std::string & path, const std::string & data, CreateMode::type mode)
{
checkNotExpired();
std::string res;
CHECKED(impl.create(path, data, getDefaultACL(), mode, res), path);
return res;
}
ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated)
{
checkNotExpired();
ReturnCode::type code = impl.create(path, data, getDefaultACL(), mode, pathCreated);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode ||
code == ReturnCode::NodeExists ||
code == ReturnCode::NoChildrenForEphemerals))
throw KeeperException(code, path);
return code;
}
ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::string & data, CreateMode::type mode)
std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t type)
{
std::string path_created;
return tryCreate(path, data, mode, path_created);
std::string pathCreated;
CHECK(tryCreate(path, data, type, pathCreated), path);
return pathCreated;
}
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated)
{
int code;
/// имя ноды может быть больше переданного пути, если создается sequential нода.
size_t name_buffer_size = path.size() + SEQUENTIAL_SUFFIX_SIZE;
char * name_buffer = new char[name_buffer_size];
code = zoo_create(impl, path.c_str(), data.c_str(), data.size(), default_acl, mode, name_buffer, name_buffer_size);
if (!( code == ZOK ||
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS))
throw KeeperException(code, path);
pathCreated = std::string(name_buffer);
delete[] name_buffer;
return code;
}
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode)
{
std::string pathCreated;
return tryCreate(path, data, mode, pathCreated);
}
void ZooKeeper::remove(const std::string & path, int32_t version)
{
checkNotExpired();
CHECKED(impl.remove(path, version), path);
CHECK(tryRemove(path, version), path);
}
ReturnCode::type ZooKeeper::tryRemove(const std::string & path, int32_t version)
int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
{
checkNotExpired();
ReturnCode::type code = impl.remove(path, version);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode ||
code == ReturnCode::BadVersion ||
code == ReturnCode::NotEmpty))
int32_t code = zoo_delete(impl, path.c_str(), version);
if (!( code == ZOK ||
code == ZNONODE ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
throw KeeperException(code, path);
return code;
}
bool ZooKeeper::exists(const std::string & path, Stat * stat, WatchFuture * watch)
bool ZooKeeper::exists(const std::string & path, Stat * stat_, WatchFuture * watch)
{
checkNotExpired();
Stat s;
ReturnCode::type code = impl.exists(path, watchForFuture(watch), s);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode))
int32_t code;
Stat stat;
if (watch)
code = zoo_wexists(impl, path.c_str(), processPromise, reinterpret_cast<void *>(watchForFuture(watch)), &stat);
else
code = zoo_wexists(impl, path.c_str(), nullptr, nullptr, &stat);
if (stat_)
*stat_ = stat;
if (!( code == ZOK ||
code == ZNONODE))
throw KeeperException(code, path);
if (code == ReturnCode::NoNode)
if (code == ZNONODE)
return false;
if (stat)
*stat = s;
return true;
}
std::string ZooKeeper::get(const std::string & path, Stat * stat, WatchFuture * watch)
{
checkNotExpired();
std::string res;
Stat s;
CHECKED(impl.get(path, watchForFuture(watch), res, s), path);
if (stat)
*stat = s;
return res;
if (tryGet(path, res, stat, watch))
return res;
else
throw KeeperException("Fail to get data for node " + path);
}
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, WatchFuture * watch)
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, WatchFuture * watch)
{
checkNotExpired();
Stat s;
ReturnCode::type code = impl.get(path, watchForFuture(watch), res, s);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode))
char buffer[MAX_NODE_SIZE];
int buffer_len = MAX_NODE_SIZE;
int32_t code;
Stat stat;
if (watch)
code = zoo_wget(impl, path.c_str(), processPromise, reinterpret_cast<void *>(watchForFuture(watch)), buffer, &buffer_len, &stat);
else
code = zoo_wget(impl, path.c_str(), nullptr, nullptr, buffer, &buffer_len, &stat);
if (stat_)
*stat_ = stat;
if (!( code == ZOK ||
code == ZNONODE))
throw KeeperException(code, path);
if (code == ReturnCode::NoNode)
res = std::string(buffer, buffer_len);
if (code == ZOK)
return true;
else
return false;
if (stat)
*stat = s;
return true;
}
void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Stat * stat)
{
checkNotExpired();
Stat s;
CHECKED(impl.set(path, data, version, s), path);
if (stat)
*stat = s;
CHECK(trySet(path, data, version, stat), path);
}
ReturnCode::type ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Stat * stat)
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Stat * stat_)
{
checkNotExpired();
Stat s;
ReturnCode::type code = impl.set(path, data, version, s);
if (!( code == ReturnCode::Ok ||
code == ReturnCode::NoNode ||
code == ReturnCode::BadVersion))
Stat stat;
int32_t code = zoo_set2(impl, path.c_str(), data.c_str(), data.length(), version, &stat);
if (stat_)
*stat_ = stat;
if (!( code == ZOK ||
code == ZNONODE ||
code == ZBADVERSION))
throw KeeperException(code, path);
if (stat)
*stat = s;
return code;
}
OpResultsPtr ZooKeeper::multi(const Ops & ops)
{
checkNotExpired();
OpResultsPtr res = std::make_shared<OpResults>();
CHECKED_WITHOUT_PATH(impl.multi(ops, *res));
for (size_t i = 0; i < res->size(); ++i)
{
if ((*res)[i].getReturnCode() != ReturnCode::Ok)
throw KeeperException((*res)[i].getReturnCode());
}
return res;
OpResultsPtr results;
CHECK_NO_PATH(tryMulti(ops, &results));
return results;
}
ReturnCode::type ZooKeeper::tryMulti(const Ops & ops, OpResultsPtr * out_results)
int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
{
checkNotExpired();
OpResultsPtr results = std::make_shared<OpResults>();
ReturnCode::type code = impl.multi(ops, *results);
if (out_results)
*out_results = results;
if (code != ReturnCode::Ok &&
code != ReturnCode::NoNode &&
code != ReturnCode::NodeExists &&
code != ReturnCode::NoChildrenForEphemerals &&
code != ReturnCode::BadVersion &&
code != ReturnCode::NotEmpty)
size_t count = ops_.size();
OpResultsPtr out_results(new OpResults(count));
/// копируем структуру содержащую указатели дефолтным конструктором копирования
/// это безопасно, т.к. у нее нет деструктора
std::vector<zoo_op_t> ops;
for (const Op & op : ops_)
ops.push_back(*(op.data));
int32_t code = zoo_multi(impl, ops.size(), ops.data(), out_results->data());
if (out_results_)
*out_results_ = out_results;
if (code != ZOK &&
code != ZNONODE &&
code != ZNODEEXISTS &&
code != ZNOCHILDRENFOREPHEMERALS &&
code != ZBADVERSION &&
code != ZNOTEMPTY)
throw KeeperException(code);
return code;
}
@ -320,10 +315,12 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path)
{
Strings children = getChildren(path);
zkutil::Ops ops;
Strings strings;
for (const std::string & child : children)
{
removeChildrenRecursive(path + "/" + child);
ops.push_back(new Op::Remove(path + "/" + child, -1));
strings.push_back(path + "/" + child);
ops.push_back(new Op::Remove(strings.back(), -1));
}
multi(ops);
}
@ -336,7 +333,12 @@ void ZooKeeper::removeRecursive(const std::string & path)
void ZooKeeper::close()
{
CHECKED_WITHOUT_PATH(impl.close());
CHECK_NO_PATH(zookeeper_close(impl));
/// удаляем WatchWithPromise которые уже никогда не будут обработаны
for (WatchWithPromise * watch : watch_store)
delete watch;
watch_store.clear();
}
ZooKeeper::~ZooKeeper()
@ -356,4 +358,36 @@ ZooKeeperPtr ZooKeeper::startNewSession() const
return new ZooKeeper(hosts, sessionTimeoutMs, state_watch);
}
Op::Create::Create(const std::string & path_, const std::string & value_, AclPtr acl, int32_t flags)
: path(path_), value(value_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
{
zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
}
AclPtr ZooKeeper::getDefaultACL()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return default_acl;
}
void ZooKeeper::setDefaultACL(AclPtr new_acl)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
default_acl = new_acl;
}
static std::string ZooKeeper::error2string(int32_t code)
{
return zerror(code);
}
int ZooKeeper::state()
{
return zoo_state(impl);
}
bool ZooKeeper::expired()
{
return state() == ZOO_EXPIRED_SESSION_STATE;
}
}

View File

@ -1,47 +1,44 @@
#include <zookeeper/zookeeper.hh>
#include <zkutil/ZooKeeper.h>
#include <iostream>
#include <unistd.h>
namespace zk = org::apache::zookeeper;
/** Проверяет, правда ли, что вызовы в zkcpp при просроченной сессии блокируются навсегда.
using namespace zkutil;
/** Проверяет, правда ли, что вызовы при просроченной сессии блокируются навсегда.
* Разорвать сессию можно, например, так: `./nozk.sh && sleep 6s && ./yeszk.sh`
*/
void stateChanged(zk::WatchEvent::type event, zk::SessionState::type state, const std::string & path)
void watcher(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx)
{
std::cout << "state changed; event: " << zk::WatchEvent::toString(event) << ", state: " << zk::SessionState::toString(state)
<< ", path: " << path << std::endl;
}
int main()
{
zk::ZooKeeper zookeeper;
zookeeper.init("example1:2181,example2:2181,example3:2181", 5000, nullptr);
std::vector<std::string> children;
zk::data::Stat stat;
zk::ReturnCode::type ret = zookeeper.getChildren("/", nullptr, children, stat);
std::cout << "getChildren returned " << zk::ReturnCode::toString(ret) << std::endl;
std::cout << "children of /:" << std::endl;
for (const auto & s : children)
try
{
std::cout << s << std::endl;
ZooKeeper zk("mtfilter01t:2181,metrika-test:2181,mtweb01t:2181", 5000);
Strings children;
children = zk.getChildren("/");
for (auto s : children)
{
std::cout << s << std::endl;
}
sleep(5);
children = zk.getChildren("/");
for (auto s : children)
{
std::cout << s << std::endl;
}
Ops ops;
std::string node = "/test";
std::string value = "dummy";
ops.push_back(new Op::Create(node, value, zk.getDefaultACL(), CreateMode::PersistentSequential));
OpResultsPtr res = zk.multi(ops);
std::cout << "path created: " << dynamic_cast<Op::Create &>(ops[0]).getPathCreated() << std::endl;
}
std::cout << "break connection to example1:2181,example2:2181,example3:2181 for at least 5 seconds and enter something" << std::endl;
std::string unused;
std::cin >> unused;
children.clear();
std::cout << "will getChildren (this call will either succeded either return OperationTimeout)" << std::endl;
ret = zookeeper.getChildren("/", nullptr, children, stat);
std::cout << "getChildren returned " << zk::ReturnCode::toString(ret) << std::endl;
std::cout << "children of /:" << std::endl;
for (const auto & s : children)
catch (KeeperException & e)
{
std::cout << s << std::endl;
std::cerr << "KeeperException " << e.what() << " " << e.message() << std::endl;
}
return 0;
}

View File

@ -13,24 +13,24 @@
void printStat(const zkutil::Stat & s)
{
std::cout << "Stat:\n";
std::cout << " czxid: " << s.getczxid() << '\n';
std::cout << " mzxid: " << s.getmzxid() << '\n';
std::cout << " ctime: " << s.getctime() << '\n';
std::cout << " mtime: " << s.getmtime() << '\n';
std::cout << " version: " << s.getversion() << '\n';
std::cout << " cversion: " << s.getcversion() << '\n';
std::cout << " aversion: " << s.getaversion() << '\n';
std::cout << " ephemeralOwner: " << s.getephemeralOwner() << '\n';
std::cout << " dataLength: " << s.getdataLength() << '\n';
std::cout << " numChildren: " << s.getnumChildren() << '\n';
std::cout << " pzxid: " << s.getpzxid() << std::endl;
std::cout << " czxid: " << s.czxid << '\n';
std::cout << " mzxid: " << s.mzxid << '\n';
std::cout << " ctime: " << s.ctime << '\n';
std::cout << " mtime: " << s.mtime << '\n';
std::cout << " version: " << s.version << '\n';
std::cout << " cversion: " << s.cversion << '\n';
std::cout << " aversion: " << s.aversion << '\n';
std::cout << " ephemeralOwner: " << s.ephemeralOwner << '\n';
std::cout << " dataLength: " << s.dataLength << '\n';
std::cout << " numChildren: " << s.numChildren << '\n';
std::cout << " pzxid: " << s.pzxid << std::endl;
}
void waitForWatch(zkutil::WatchFuture & future)
{
std::cout << "waiting for watch" << std::endl;
zkutil::WatchEventInfo res = future.get();
std::cout << "event: " << zkutil::WatchEvent::toString(res.event) << std::endl;
std::cout << "event: " << res.event << std::endl;
}
@ -118,7 +118,7 @@ int main(int argc, char ** argv)
DB::skipWhitespaceIfAny(in);
readUntilSpace(mode, in);
zkutil::CreateMode::type m;
int32_t m;
if (mode == "p")
m = zkutil::CreateMode::Persistent;
else if (mode == "ps")