#pragma once #include #include #include #include #include #include namespace zkutil { const UInt32 DEFAULT_SESSION_TIMEOUT = 30000; const UInt32 MEDIUM_SESSION_TIMEOUT = 120000; const UInt32 BIG_SESSION_TIMEOUT = 600000; const UInt32 DEFAULT_RETRY_NUM = 3; struct WatchWithEvent; /** Сессия в ZooKeeper. Интерфейс существенно отличается от обычного API ZooKeeper. * Вместо callback-ов для watch-ей используются Poco::Event. Для указанного события вызывается set() только при первом вызове watch. * Методы на чтение при восстанавливаемых ошибках OperationTimeout, ConnectionLoss пытаются еще retry_num раз. * Методы на запись не пытаются повторить при восстанавливаемых ошибках, т.к. это приводит к проблеммам типа удаления дважды одного и того же. * * Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке. */ class ZooKeeper { public: typedef Poco::SharedPtr Ptr; ZooKeeper(const std::string & hosts, int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT); /** конфиг вида example1 2181 example2 2181 30000 */ ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name); ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, int32_t session_timeout_ms); ~ZooKeeper(); /** Создает новую сессию с теми же параметрами. Можно использовать для переподключения, если сессия истекла. * Новой сессии соответствует только возвращенный экземпляр ZooKeeper, этот экземпляр не изменяется. */ Ptr startNewSession() const; int state(); /** Возвращает true, если сессия навсегда завершена. * Это возможно только если соединение было установлено, потом разорвалось, потом снова восстановилось, но слишком поздно. * Это достаточно редкая ситуация. * С другой стороны, если, например, указан неправильный сервер или порт, попытки соединения будут продолжаться бесконечно, * expired() будет возвращать false, и все вызовы будут выбрасывать исключение ConnectionLoss. */ bool expired(); ACLPtr getDefaultACL(); void setDefaultACL(ACLPtr new_acl); /** Создать znode. Используется ACL, выставленный вызовом setDefaultACL (по умолчанию, всем полный доступ). * Если что-то пошло не так, бросить исключение. */ std::string create(const std::string & path, const std::string & data, int32_t mode); /** Не бросает исключение при следующих ошибках: * - Нет родителя создаваемой ноды. * - Родитель эфемерный. * - Такая нода уже есть. * При остальных ошибках бросает исключение. */ int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated); int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode); int32_t tryCreateWithRetries(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated, size_t * attempt = nullptr); /** создает Persistent ноду. * Игнорирует, если нода уже создана. * Пытается сделать retry при ConnectionLoss или OperationTimeout */ void createIfNotExists(const std::string & path, const std::string & data); /** Создает всех еще не существующих предков ноды, с пустыми данными. Саму указанную ноду не создает. */ void createAncestors(const std::string & path); /** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия). */ void remove(const std::string & path, int32_t version = -1); /** Не бросает исключение при следующих ошибках: * - Такой ноды нет. * - У ноды другая версия. * - У ноды есть дети. */ int32_t tryRemove(const std::string & path, int32_t version = -1); /// Если есть проблемы с сетью может сам удалить ноду и вернуть ZNONODE int32_t tryRemoveWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr); bool exists(const std::string & path, Stat * stat = nullptr, EventPtr watch = nullptr); std::string get(const std::string & path, Stat * stat = nullptr, EventPtr watch = nullptr); /** Не бросает исключение при следующих ошибках: * - Такой ноды нет. В таком случае возвращает false. */ bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, EventPtr watch = nullptr); void set(const std::string & path, const std::string & data, int32_t version = -1, Stat * stat = nullptr); /** Не бросает исключение при следующих ошибках: * - Такой ноды нет. * - У ноды другая версия. */ int32_t trySet(const std::string & path, const std::string & data, int32_t version = -1, Stat * stat = nullptr); Strings getChildren(const std::string & path, Stat * stat = nullptr, EventPtr watch = nullptr); /** Не бросает исключение при следующих ошибках: * - Такой ноды нет. */ int32_t tryGetChildren(const std::string & path, Strings & res, Stat * stat = nullptr, EventPtr watch = nullptr); /** Транзакционно выполняет несколько операций. При любой ошибке бросает исключение. */ OpResultsPtr multi(const Ops & ops); /** Бросает исключение только если какая-нибудь операция вернула "неожиданную" ошибку - такую ошибку, * увидев которую соответствующий метод try* бросил бы исключение. */ int32_t tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr); /** Использовать только для методов на чтение */ int32_t tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results = nullptr, size_t * attempt = nullptr); int64_t getClientID(); /** Удаляет ноду вместе с поддеревом. Если в это время кто-то добавит иили удалит ноду в поддереве, результат не определен. */ void removeRecursive(const std::string & path); /** Удаляет ноду вместе с поддеревом. Если в это время кто-то будет тоже удалять какие-нибудь ноды в поддереве, не будет ошибок. * Например, можно вызвать одновременно дважды для одной ноды, и результат будет тот же, как если вызвать один раз. */ void tryRemoveRecursive(const std::string & path); /** Асинхронный интерфейс (реализовано небольшое подмножество операций). * * Использование: * * // Эти вызовы не блокируются. * auto future1 = zk.asyncGet("/path1"); * auto future2 = zk.asyncGet("/path2"); * ... * * // Эти вызовы могут заблокироваться до выполнения операции. * auto result1 = future1.get(); * auto result2 = future2.get(); * * future не должна быть уничтожена до получения результата. * Результат обязательно необходимо получать. */ template class Future { friend class ZooKeeper; private: using Task = std::packaged_task; using TaskPtr = std::unique_ptr; using TaskPtrPtr = std::unique_ptr; /** Всё очень сложно. * * В асинхронном интерфейсе libzookeeper, функция (например, zoo_aget) * принимает указатель на свободную функцию-коллбэк и void* указатель на данные. * Указатель на данные потом передаётся в callback. * Это значит, что мы должны сами обеспечить, чтобы данные жили во время работы этой функции и до конца работы callback-а, * и не можем просто так передать владение данными внутрь функции. * Для этого, мы засовываем данные в объект Future, который возвращается пользователю. Данные будут жить, пока живёт объект Future. * Данные засунуты в unique_ptr, чтобы при возврате объекта Future из функции, их адрес (который передаётся в libzookeeper) не менялся. * * Вторая проблема состоит в том, что после того, как std::promise был удовлетворён, и пользователь получил результат из std::future, * объект Future может быть уничтожен, при чём раньше, чем завершит работу в другом потоке функция, которая удовлетворяет promise. * См. http://stackoverflow.com/questions/10843304/race-condition-in-pthread-once * Чтобы этого избежать, используется второй unique_ptr. Внутри callback-а, void* данные преобразуются в unique_ptr, и * перемещаются в локальную переменную unique_ptr, чтобы продлить время жизни данных. */ TaskPtrPtr task; std::future future; template Future(Args &&... args) : task(new TaskPtr(new Task(std::forward(args)...))), future((*task)->get_future()) {} public: Result get() { return future.get(); } }; struct ValueAndStat { std::string value; Stat stat; }; using GetFuture = Future; GetFuture asyncGet(const std::string & path); struct ValueAndStatAndExists { std::string value; Stat stat; bool exists; }; using TryGetFuture = Future; TryGetFuture asyncTryGet(const std::string & path); struct StatAndExists { Stat stat; bool exists; }; using ExistsFuture = Future; ExistsFuture asyncExists(const std::string & path); using GetChildrenFuture = Future; GetChildrenFuture asyncGetChildren(const std::string & path); static std::string error2string(int32_t code); /// максимальный размер данных в узле в байтах /// В версии 3.4.5. максимальный размер узла 1 Mb static const size_t MAX_NODE_SIZE = 1048576; /// Размер прибавляемого ZooKeeper суффикса при создании Sequential ноды /// На самом деле размер меньше, но для удобства округлим в верхнюю сторону static const size_t SEQUENTIAL_SUFFIX_SIZE = 64; private: friend struct WatchWithEvent; friend class EphemeralNodeHolder; void init(const std::string & hosts, int32_t session_timeout_ms); void removeChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path); void * watchForEvent(EventPtr event); watcher_fn callbackForEvent(EventPtr event); static void processEvent(zhandle_t * zh, int type, int state, const char * path, void * watcherCtx); template int32_t retry(T && operation, size_t * attempt = nullptr) { int32_t code = operation(); if (attempt) *attempt = 0; for (size_t i = 0; (i < retry_num) && (code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS); ++i) { if (attempt) *attempt = i; /// если потеряно соединение подождем timeout/3, авось восстановится static const int MAX_SLEEP_TIME = 10; if (code == ZCONNECTIONLOSS) usleep(std::min(session_timeout_ms * 1000 / 3, MAX_SLEEP_TIME * 1000 * 1000)); LOG_WARNING(log, "Error on attempt " << i << ": " << error2string(code) << ". Retry"); code = operation(); } return code; } /// методы не бросают исключений, а возвращают коды ошибок int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated); int32_t removeImpl(const std::string & path, int32_t version = -1); int32_t getImpl(const std::string & path, std::string & res, Stat * stat = nullptr, EventPtr watch = nullptr); int32_t setImpl(const std::string & path, const std::string & data, int32_t version = -1, Stat * stat = nullptr); int32_t getChildrenImpl(const std::string & path, Strings & res, Stat * stat = nullptr, EventPtr watch = nullptr); int32_t multiImpl(const Ops & ops, OpResultsPtr * out_results = nullptr); int32_t existsImpl(const std::string & path, Stat * stat_, EventPtr watch = nullptr); std::string hosts; int32_t session_timeout_ms; Poco::FastMutex mutex; ACLPtr default_acl; zhandle_t * impl; std::unordered_set watch_store; /// Количество попыток повторить операцию чтения при OperationTimeout, ConnectionLoss size_t retry_num = 3; Logger * log = nullptr; }; typedef ZooKeeper::Ptr ZooKeeperPtr; /** В конструкторе создает эфемерную ноду, в деструкторе - удаляет. */ class EphemeralNodeHolder { public: typedef Poco::SharedPtr Ptr; EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data) : path(path_), zookeeper(zookeeper_) { if (create) path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral); } std::string getPath() const { return path; } static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "") { return new EphemeralNodeHolder(path, zookeeper, true, false, data); } static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "") { return new EphemeralNodeHolder(path, zookeeper, true, true, data); } static Ptr existing(const std::string & path, ZooKeeper & zookeeper) { return new EphemeralNodeHolder(path, zookeeper, false, false, ""); } ~EphemeralNodeHolder() { try { zookeeper.tryRemove(path); } catch (const KeeperException & e) { LOG_ERROR(zookeeper.log, "~EphemeralNodeHolder(): " << e.displayText()); } } private: std::string path; ZooKeeper & zookeeper; }; typedef EphemeralNodeHolder::Ptr EphemeralNodeHolderPtr; }