diff --git a/libs/libzkutil/include/zkutil/KeeperException.h b/libs/libzkutil/include/zkutil/KeeperException.h new file mode 100644 index 00000000000..ca8a5078fba --- /dev/null +++ b/libs/libzkutil/include/zkutil/KeeperException.h @@ -0,0 +1,21 @@ +#pragma once +#include +#include + + +namespace zkutil +{ + +class KeeperException : public Poco::Exception +{ +public: + KeeperException(const std::string & msg) : Poco::Exception(msg), code(ReturnCode::Ok) {} + KeeperException(const std::string & msg, ReturnCode::type code_) + : Poco::Exception(msg + " (" + ReturnCode::toString(code) + ")"), code(code_) {} + KeeperException(ReturnCode::type code_) + : Poco::Exception(ReturnCode::toString(code)), code(code_) {} + + ReturnCode::type code; +}; + +}; diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h new file mode 100644 index 00000000000..1578c32bb30 --- /dev/null +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -0,0 +1,108 @@ +#pragma once +#include +#include +#include +#include + + +namespace zkutil +{ + +const UInt32 DEFAULT_SESSION_TIMEOUT = 30000; + +namespace zk = org::apache::zookeeper; +namespace CreateMode = zk::CreateMode; +namespace ReturnCode = zk::ReturnCode; +namespace SessionState = zk::SessionState; +namespace WatchEvent = zk::WatchEvent; + +typedef zk::data::Stat Stat; +typedef zk::data::ACL ACL; + +typedef std::vector ACLs; +typedef std::vector Strings; + +typedef boost::function WatchFunction; + +struct WatchEventInfo +{ + WatchEvent::type event; + SessionState::type state; + std::string path; + + WatchEventInfo() {} + WatchEventInfo(WatchEvent::type event_, SessionState::type state_, const std::string & path_) + : event(event_), state(state_), path(path_) {} +}; + +typedef std::future WatchFuture; + +/** Сессия в ZooKeeper. Интерфейс существенно отличается от обычного API ZooKeeper. + * Вместо callback-ов для watch-ей используются std::future. + * Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке. + * Методы с названиями, начинающимися с try, не бросают исключение только при некторых видах ошибок. + * Например, исключение бросается в любом случае, если сессия разорвалась или если не хватает прав или ресурсов. + */ +class ZooKeeper +{ +public: + ZooKeeper(const std::string & hosts, int32_t sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT, WatchFunction * watch = nullptr); + + /// Возвращает true, если сессия навсегда завершена. + bool disconnected(); + + void setDefaultACL(ACLs & acl); + + /** Создать znode. Используется ACL, выставленный вызовом setDefaultACL (по умолчанию, всем полный доступ). + * Если что-то пошло не так, бросить исключение. + */ + std::string create(const std::string & path, const std::string & data, CreateMode::type mode); + + /** Не бросает исключение при следующих ошибках: + * - Нет родителя создаваемой ноды. + * - Родитель эфемерный. + * - Такая нода уже есть. + * При остальных ошибках бросает исключение. + */ + ReturnCode::type tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated); + + /** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия). + */ + void remove(const std::string & path, int32_t version = -1); + + /** Не бросает исключение при следующих ошибках: + * - Такой ноды нет. + * - У ноды другая версия. + * - У ноды есть дети. + */ + ReturnCode::type tryRemove(const std::string & path, int32_t version = -1); + + bool exists(const std::string & path, Stat * stat = nullptr, WatchFuture * watch = nullptr); + + std::string get(const std::string & path, WatchFuture * watch, Stat * stat); + + /// Возвращает false, если нет такой ноды. При остальных ошибках бросает исключение. + bool tryGet(const std::string & path, std::string & data, Stat * stat = nullptr, WatchFuture * watch = nullptr); + + void set(const std::string & path, const std::string & data, + int32_t version = -1, Stat * stat = nullptr); + + Strings getChildren(const std::string & path, + Stat * stat = nullptr, + WatchFuture * watch = nullptr); + + boost::ptr_vector & multi(const boost::ptr_vector & ops); + +private: + friend struct StateWatch; + + zk::ZooKeeper impl; + ACLs default_acl; + WatchFunction * state_watch; + SessionState::type session_state; + + void stateChanged(WatchEvent::type event, SessionState::type state, const std::string& path); +}; + +} diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp new file mode 100644 index 00000000000..b1bb2da992b --- /dev/null +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -0,0 +1,88 @@ +#include +#include +#include + + +#define CHECKED(x) { ReturnCode::type code = x; if (code) throw KeeperException(code); } + +namespace zkutil +{ + +typedef std::promise WatchPromise; + +struct WatchWithPromise : public zk::Watch +{ + WatchPromise promise; + + void process(WatchEvent::type event, SessionState::type state, const std::string & path) + { + promise.set_value(WatchEventInfo(event, state, path)); + } +}; + +typedef boost::shared_ptr WatchPtr; +typedef boost::shared_ptr WatchWithPromisePtr; + +static WatchPtr watchForFuture(WatchFuture * future) +{ + if (!future) + return nullptr; + WatchWithPromisePtr res = boost::make_shared(); + *future = res->promise.get_future(); + return res; +} + +struct StateWatch : public zk::Watch +{ + ZooKeeper * owner; + + StateWatch(ZooKeeper * owner_) : owner(owner_) {} + + void process(WatchEvent::type event, SessionState::type state, const std::string & path) + { + owner->stateChanged(event, state, path); + } +}; + +ZooKeeper::ZooKeeper(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_) + : state_watch(watch_) +{ + CHECKED(impl.init(hosts, sessionTimeoutMs, boost::make_shared(this))); + + ACL perm; + perm.getid().getscheme() = "world"; + perm.getid().getid() = "anyone"; + perm.setperms(zk::Permission::All); + default_acl.push_back(perm); +} + +void ZooKeeper::stateChanged(WatchEvent::type event, SessionState::type state, const std::string & path) +{ + session_state = state; + if (state_watch) + (*state_watch)(event, state, path); +} + +bool ZooKeeper::disconnected() +{ + return session_state == SessionState::Expired || session_state == SessionState::AuthFailed; +} + +void ZooKeeper::setDefaultACL(ACLs & acl) +{ + default_acl = acl; +} + +std::vector ZooKeeper::getChildren( + const std::string & path, Stat * stat, WatchFuture * watch) +{ + Stat s; + Strings res; + CHECKED(impl.getChildren(path, watchForFuture(watch), res, s)); + if (stat) + *stat = s; + return res; +} + +} + diff --git a/libs/libzkutil/src/tests/zkutil_test.cpp b/libs/libzkutil/src/tests/zkutil_test.cpp new file mode 100644 index 00000000000..a53fcc5a050 --- /dev/null +++ b/libs/libzkutil/src/tests/zkutil_test.cpp @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include + + +int main(int argc, char ** argv) +{ + try + { + if (argc != 2) + { + std::cerr << "usage: " << argv[0] << " hosts" << std::endl; + return 2; + } + + zkutil::ZooKeeper zk(argv[1]); + + while (char * line = readline(":3 ")) + { + try + { + std::stringstream ss(line); + + std::string cmd; + if (!(ss >> cmd)) + continue; + + if (cmd == "q" || cmd == "quit" || cmd == "exit" || cmd == ":q") + break; + + if (cmd == "help") + { + std::cout << "commands: q, ls (not yet: stat, get, set, create, remove)" << std::endl; + continue; + } + std::string path; + ss >> path; + if (cmd == "ls") + { + std::vector v = zk.getChildren(path); + for (size_t i = 0; i < v.size(); ++i) + { + std::cout << v[i] << std::endl; + } + } + } + catch (zkutil::KeeperException & e) + { + std::cerr << "KeeperException: " << e.displayText() << std::endl; + } + } + } + catch (zkutil::KeeperException & e) + { + std::cerr << "KeeperException: " << e.displayText() << std::endl; + return 1; + } + + return 0; +}