diff --git a/dbms/src/Common/ZooKeeper/Lock.cpp b/dbms/src/Common/ZooKeeper/Lock.cpp index 62190fef0ef..e6f002c7805 100644 --- a/dbms/src/Common/ZooKeeper/Lock.cpp +++ b/dbms/src/Common/ZooKeeper/Lock.cpp @@ -11,29 +11,14 @@ bool Lock::tryLock() if (tryCheck() != Status::LOCKED_BY_ME) locked.reset(nullptr); } - - if (!locked) + else { - size_t attempt; std::string dummy; - - /// TODO: ошибка. можно создать эфемерную ноду, но при этом не получить подтверждения даже после нескольких попыток. - /// тогда все последующие локи будут неуспешные из-за существования ноды. - int32_t code = zookeeper->tryCreateWithRetries(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy, &attempt); + int32_t code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy); if (code == ZNODEEXISTS) { - if (attempt == 0) - locked.reset(nullptr); - else - { - zkutil::Stat stat; - zookeeper->get(lock_path, &stat); - if (stat.ephemeralOwner == zookeeper->getClientID()) - locked.reset(new ZooKeeperHandler(zookeeper)); - else - locked.reset(nullptr); - } + locked.reset(nullptr); } else if (code == ZOK) { @@ -52,34 +37,8 @@ void Lock::unlock() if (locked) { auto zookeeper = zookeeper_holder->getZooKeeper(); - try - { - if (tryCheck() == Status::LOCKED_BY_ME) - { - size_t attempt; - int32_t code = zookeeper->tryRemoveEphemeralNodeWithRetries(lock_path, -1, &attempt); - - if (attempt) - { - if (code != ZOK) - throw zkutil::KeeperException(code); - } - else - { - if (code == ZNONODE) - LOG_ERROR(log, "Node " << lock_path << " has been already removed. Probably due to network error."); - else if (code != ZOK) - throw zkutil::KeeperException(code); - } - } - } - catch (const zkutil::KeeperException & e) - { - /// если сессия находится в невостанавливаемом состоянии, то эфемерные ноды нам больше не принадлежат - /// и лок через таймаут будет отпущен - if (!e.isUnrecoverable()) - throw; - } + if (tryCheck() == Status::LOCKED_BY_ME) + zookeeper->remove(lock_path, -1); locked.reset(nullptr); } } @@ -97,47 +56,17 @@ Lock::Status Lock::tryCheck() const else { if (stat.ephemeralOwner == zookeeper->getClientID()) - { lock_status = LOCKED_BY_ME; - } else - { lock_status = LOCKED_BY_OTHER; - } } if (locked && lock_status != LOCKED_BY_ME) - LOG_WARNING(log, "Lock is lost. It is normal if session was reinitialized. Path: " << lock_path << "/" << lock_message); + LOG_WARNING(log, "Lock is lost. It is normal if session was expired. Path: " << lock_path << "/" << lock_message); return lock_status; } -std::string Lock::status2String(Status status) -{ - if (status >= END) - throw zkutil::KeeperException("Wrong status code: " + std::to_string(status)); - static const char * names[] = {"Unlocked", "Locked by me", "Locked by other"}; - return names[status]; -} - -void Lock::unlockOrMoveIfFailed(std::vector & failed_to_unlock_locks) -{ - try - { - unlock(); - } - catch (const zkutil::KeeperException & e) - { - if (e.isTemporaryError()) - { - LOG_WARNING(log, "Fail to unlock lock. Move lock to vector to remove later. Path: " << getPath()); - failed_to_unlock_locks.emplace_back(std::move(*this)); - } - else - throw; - } -} - void Lock::unlockAssumeLockNodeRemovedManually() { locked.reset(nullptr); diff --git a/dbms/src/Common/ZooKeeper/Lock.h b/dbms/src/Common/ZooKeeper/Lock.h index 1fca50a5293..17ded48d26b 100644 --- a/dbms/src/Common/ZooKeeper/Lock.h +++ b/dbms/src/Common/ZooKeeper/Lock.h @@ -51,15 +51,12 @@ namespace zkutil UNLOCKED, LOCKED_BY_ME, LOCKED_BY_OTHER, - END }; - std::string status2String(Status status); /// проверяет создана ли эфемерная нода и кто ее владелец. Status tryCheck() const; void unlock(); - void unlockOrMoveIfFailed(std::vector & failed_to_unlock_locks); void unlockAssumeLockNodeRemovedManually(); bool tryLock(); diff --git a/dbms/src/Common/ZooKeeper/Types.h b/dbms/src/Common/ZooKeeper/Types.h index 1938081bb2e..9bd3dd2b557 100644 --- a/dbms/src/Common/ZooKeeper/Types.h +++ b/dbms/src/Common/ZooKeeper/Types.h @@ -3,184 +3,17 @@ #include #include #include -#include +#include #include namespace zkutil { -using ACLPtr = const ACL_vector *; -using Stat = ::Stat; -class ZooKeeper; - - -struct Op -{ -public: - Op() : data(new zoo_op_t) {} - virtual ~Op() {} - - virtual std::shared_ptr clone() const = 0; - - virtual std::string getPath() const = 0; - - virtual std::string describe() const = 0; - - std::unique_ptr data; - - struct Remove; - struct Create; - struct SetData; - struct Check; -}; - -using OpPtr = std::shared_ptr; - - -struct Op::Remove : public Op -{ - Remove(const std::string & path_, int32_t version_) : - path(path_), version(version_) - { - zoo_delete_op_init(data.get(), path.c_str(), version); - } - - OpPtr clone() const override - { - return std::make_shared(path, version); - } - - std::string getPath() const override { return path; } - - std::string describe() const override { return "command: remove, path: " + path; } - -private: - std::string path; - int32_t version; -}; - -struct Op::Create : public Op -{ - Create(const std::string & path_pattern_, const std::string & value_, ACLPtr acl_, int32_t flags_); - - OpPtr clone() const override - { - return std::make_shared(path_pattern, value, acl, flags); - } - - std::string getPathCreated() { return created_path.data(); } - - std::string getPath() const override { return path_pattern; } - - std::string describe() const override - { - return "command: create" - ", path: " + path_pattern + - ", value: " + value; - } - -private: - std::string path_pattern; - std::string value; - ACLPtr acl; - int32_t flags; - std::vector created_path; -}; - -struct Op::SetData : public Op -{ - SetData(const std::string & path_, const std::string & value_, int32_t version_) : - path(path_), value(value_), version(version_) - { - zoo_set_op_init(data.get(), path.c_str(), value.c_str(), value.size(), version, &stat); - } - - OpPtr clone() const override - { - return std::make_shared(path, value, version); - } - - std::string getPath() const override { return path; } - - std::string describe() const override - { - return - "command: set" - ", path: " + path + - ", value: " + value + - ", version: " + std::to_string(data->set_op.version); - } - -private: - std::string path; - std::string value; - int32_t version; - Stat stat; -}; - -struct Op::Check : public Op -{ - Check(const std::string & path_, int32_t version_) : - path(path_), version(version_) - { - zoo_check_op_init(data.get(), path.c_str(), version); - } - - OpPtr clone() const override - { - return std::make_shared(path, version); - } - - std::string getPath() const override { return path; } - - std::string describe() const override { return "command: check, path: " + path; } - -private: - std::string path; - int32_t version; -}; - -using Ops = std::vector; - - -/// C++ version of zoo_op_result_t -struct OpResult -{ - int err; - std::string value; - std::unique_ptr stat; - - /// ZooKeeper is required for correct chroot path prefixes handling - explicit OpResult(const zoo_op_result_t & op_result, const ZooKeeper * zookeeper = nullptr); -}; -using OpResults = std::vector; -using OpResultsPtr = std::shared_ptr; +using Stat = ZooKeeperImpl::ZooKeeper::Stat; using Strings = std::vector; -/// Simple structure to handle transaction execution results -struct MultiTransactionInfo -{ - Ops ops; - int32_t code = ZOK; - OpResultsPtr op_results; - - MultiTransactionInfo() = default; - - MultiTransactionInfo(int32_t code_, const Ops & ops_, const OpResultsPtr & op_results_) - : ops(ops_), code(code_), op_results(op_results_) {} - - bool empty() const - { - return ops.empty(); - } - - /// Returns failed op if zkutil::isUserError(code) is true - const Op & getFailedOp() const; -}; - - namespace CreateMode { extern const int Persistent; @@ -196,18 +29,6 @@ class ZooKeeper; /// Callback to call when the watch fires. /// Because callbacks are called in the single "completion" thread internal to libzookeeper, /// they must execute as quickly as possible (preferably just set some notification). -/// Parameters: -/// zookeeper - zookeeper session to which the fired watch belongs -/// type - event type, one of the *_EVENT constants from zookeeper.h -/// state - session connection state, one of the *_STATE constants from zookeeper.h -/// path - znode path to which the change happened. if event == ZOO_SESSION_EVENT it is either NULL or empty string. -using WatchCallback = std::function; - - -/// Returns first op which code != ZOK or throws an exception -/// ZooKeeper client sets correct OP codes if the transaction fails because of logical (user) errors like ZNODEEXISTS -/// If it is failed because of network error, for example, OP codes is not set. -/// Therefore you should make zkutil::isUserError() check before the function invocation. -size_t getFailedOpIndex(const OpResultsPtr & op_results, int32_t transaction_return_code); +using WatchCallback = ZooKeeperImpl::ZooKeeper::WatchCallback; } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 746ed4c609a..fcf5e3736ec 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -42,50 +42,22 @@ namespace zkutil { const int CreateMode::Persistent = 0; -const int CreateMode::Ephemeral = ZOO_EPHEMERAL; -const int CreateMode::EphemeralSequential = ZOO_EPHEMERAL | ZOO_SEQUENCE; -const int CreateMode::PersistentSequential = ZOO_SEQUENCE; +const int CreateMode::Ephemeral = 1; +const int CreateMode::PersistentSequential = 2; +const int CreateMode::EphemeralSequential = 3; static void check(int32_t code, const std::string & path) { - if (code != ZOK) + if (code) throw KeeperException(code, path); } -struct WatchContext -{ - /// ZooKeeper instance exists for the entire WatchContext lifetime. - ZooKeeper & zk; - WatchCallback callback; - CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperWatch}; - - WatchContext(ZooKeeper & zk_, WatchCallback callback_) : zk(zk_), callback(std::move(callback_)) {} - - void process(int32_t event_type, int32_t state, const char * path) - { - if (callback) - callback(zk, event_type, state, path); - } -}; - -void ZooKeeper::processCallback(zhandle_t *, int type, int state, const char * path, void * watcher_ctx) -{ - WatchContext * context = static_cast(watcher_ctx); - context->process(type, state, path); - - /// It is guaranteed that non-ZOO_SESSION_EVENT notification will be delivered only once - /// (https://issues.apache.org/jira/browse/ZOOKEEPER-890) - if (type != ZOO_SESSION_EVENT) - destroyContext(context); -} - void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_, const std::string & chroot_) { log = &Logger::get("ZooKeeper"); - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); hosts = hosts_; identity = identity_; session_timeout_ms = session_timeout_ms_; @@ -95,9 +67,6 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, impl = zookeeper_init(hosts_for_lib.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0); ProfileEvents::increment(ProfileEvents::ZooKeeperInit); - if (!impl) - throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts_for_lib); - if (!identity.empty()) { auto code = zoo_add_auth(impl, "digest", identity.c_str(), static_cast(identity.size()), nullptr, nullptr); @@ -187,82 +156,37 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std init(args.hosts, args.identity, args.session_timeout_ms, args.chroot); } -WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event) + +static WatchCallback callbackForEvent(const EventPtr & watch) { - WatchCallback callback; - if (event) - { - callback = [e=event](ZooKeeper &, int, int, const char *) mutable - { - if (e) - { - e->set(); - e.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events. - } - }; - } - return callback; + return [watch](const ZooKeeperImpl::ZooKeeper::WatchResponse &) { watch->set(); }; } -WatchContext * ZooKeeper::createContext(WatchCallback && callback) -{ - if (callback) - { - WatchContext * res = new WatchContext(*this, std::move(callback)); - { - std::lock_guard lock(mutex); - watch_context_store.insert(res); - if (watch_context_store.size() % 10000 == 0) - { - LOG_ERROR(log, "There are " << watch_context_store.size() << " active watches. There must be a leak somewhere."); - } - } - return res; - } - else - return nullptr; -} - -void ZooKeeper::destroyContext(WatchContext * context) -{ - if (context) - { - std::lock_guard lock(context->zk.mutex); - context->zk.watch_context_store.erase(context); - } - delete context; -} int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res, - Stat * stat_, + Stat * stat, WatchCallback watch_callback) { - String_vector strings; - int code; - Stat stat; - watcher_fn watcher = watch_callback ? processCallback : nullptr; - WatchContext * context = createContext(std::move(watch_callback)); - code = zoo_wget_children2(impl, path.c_str(), watcher, context, &strings, &stat); - ProfileEvents::increment(ProfileEvents::ZooKeeperGetChildren); - ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); + int32_t code = 0; + Poco::Event event; - if (code == ZOK) + auto callback = [&](const ZooKeeperImpl::ZooKeeper::ListResponse & response) { - if (stat_) - *stat_ = stat; - res.resize(strings.count); - for (int i = 0; i < strings.count; ++i) - res[i] = std::string(strings.data[i]); - deallocate_String_vector(&strings); - } - else - { - /// The call was unsuccessful, so the watch was not set. Destroy the context. - destroyContext(context); - } + code = response.error; + if (!code) + { + res = response.names; + if (stat) + *stat = response.stat; + } + event.set(); + }; + impl.list(path, callback, watch_callback); + event.wait(); return code; } + Strings ZooKeeper::getChildren( const std::string & path, Stat * stat, const EventPtr & watch) { @@ -272,9 +196,9 @@ Strings ZooKeeper::getChildren( } int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res, - Stat * stat_, const EventPtr & watch) + Stat * stat, const EventPtr & watch) { - int32_t code = retry(std::bind(&ZooKeeper::getChildrenImpl, this, std::ref(path), std::ref(res), stat_, callbackForEvent(watch))); + int32_t code = getChildrenImpl(path, res, stat, callbackForEvent(watch)); if (!(code == ZOK || code == ZNONODE)) throw KeeperException(code, path); @@ -284,21 +208,22 @@ int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res, int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) { - int code; - /// The name of the created node can be longer than path if the sequential node is created. - size_t name_buffer_size = path.size() + SEQUENTIAL_SUFFIX_SIZE; - std::string name_buffer(name_buffer_size, '\0'); + int32_t code = 0; + Poco::Event event; + + auto callback = [&](const ZooKeeperImpl::ZooKeeper::CreateResponse & response) + { + code = response.error; + if (!code) + path_created = response.path_created; + event.set(); + }; + + impl.create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode + event.wait(); - code = zoo_create(impl, path.c_str(), data.c_str(), data.size(), getDefaultACL(), mode, name_buffer.data(), name_buffer_size); ProfileEvents::increment(ProfileEvents::ZooKeeperCreate); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); - - if (code == ZOK) - { - name_buffer.resize(strlen(name_buffer.data())); - path_created = std::move(name_buffer); - } - return code; } @@ -311,7 +236,7 @@ std::string ZooKeeper::create(const std::string & path, const std::string & data int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) { - int code = createImpl(path, data, mode, path_created); + int32_t code = createImpl(path, data, mode, path_created); if (!(code == ZOK || code == ZNONODE || @@ -328,16 +253,10 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, return tryCreate(path, data, mode, path_created); } -int32_t ZooKeeper::tryCreateWithRetries(const std::string & path, const std::string & data, int32_t mode, std::string & path_created, size_t* attempt) -{ - return retry([&path, &data, mode, &path_created, this] { return tryCreate(path, data, mode, path_created); }, attempt); -} - - void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data) { std::string path_created; - int32_t code = retry(std::bind(&ZooKeeper::createImpl, this, std::ref(path), std::ref(data), zkutil::CreateMode::Persistent, std::ref(path_created))); + int32_t code = createImpl(path, data, zkutil::CreateMode::Persistent, path_created); if (code == ZOK || code == ZNODEEXISTS) return; @@ -360,7 +279,19 @@ void ZooKeeper::createAncestors(const std::string & path) int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version) { - int32_t code = zoo_delete(impl, path.c_str(), version); + int32_t code = 0; + Poco::Event event; + + auto callback = [&](const ZooKeeperImpl::ZooKeeper::RemoveResponse & response) + { + if (response.error) + code = response.error; + event.set(); + }; + + impl.remove(path, version, callback); + event.wait(); + ProfileEvents::increment(ProfileEvents::ZooKeeperRemove); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); return code; @@ -371,15 +302,6 @@ void ZooKeeper::remove(const std::string & path, int32_t version) check(tryRemove(path, version), path); } -void ZooKeeper::removeWithRetries(const std::string & path, int32_t version) -{ - size_t attempt; - int code = tryRemoveWithRetries(path, version, &attempt); - - if (!(code == ZOK || (code == ZNONODE && attempt > 0))) - throw KeeperException(code, path); -} - int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version) { int32_t code = removeImpl(path, version); @@ -391,68 +313,35 @@ int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version) return code; } -int32_t ZooKeeper::tryRemoveWithRetries(const std::string & path, int32_t version, size_t * attempt) +int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat, WatchCallback watch_callback) { - int32_t code = retry(std::bind(&ZooKeeper::removeImpl, this, std::ref(path), version), attempt); - if (!(code == ZOK || - code == ZNONODE || - code == ZBADVERSION || - code == ZNOTEMPTY)) + int32_t code = 0; + Poco::Event event; + + auto callback = [&](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response) { - throw KeeperException(code, path); - } + code = response.error; + if (!code && stat) + *stat = response.stat; + event.set(); + }; - return code; -} + impl.exists(path, callback, watch_callback); + event.wait(); -int32_t ZooKeeper::tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version, size_t * attempt) -{ - try - { - return tryRemoveWithRetries(path, version, attempt); - } - catch (const KeeperException &) - { - /// Set the flag indicating that the session is better treated as expired so that someone - /// recreates it and the ephemeral nodes are indeed deleted. - is_dirty = true; - - throw; - } -} - -int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, WatchCallback watch_callback) -{ - int32_t code; - Stat stat; - watcher_fn watcher = watch_callback ? processCallback : nullptr; - WatchContext * context = createContext(std::move(watch_callback)); - code = zoo_wexists(impl, path.c_str(), watcher, context, &stat); ProfileEvents::increment(ProfileEvents::ZooKeeperExists); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); - - if (code == ZOK) - { - if (stat_) - *stat_ = stat; - } - if (code != ZOK && code != ZNONODE) - { - /// The call was unsuccessful, so the watch was not set. Destroy the context. - destroyContext(context); - } - return code; } -bool ZooKeeper::exists(const std::string & path, Stat * stat_, const EventPtr & watch) +bool ZooKeeper::exists(const std::string & path, Stat * stat, const EventPtr & watch) { - return existsWatch(path, stat_, callbackForEvent(watch)); + return existsWatch(path, stat, callbackForEvent(watch)); } -bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchCallback & watch_callback) +bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback) { - int32_t code = retry(std::bind(&ZooKeeper::existsImpl, this, path, stat_, watch_callback)); + int32_t code = existsImpl(path, stat, watch_callback); if (!(code == ZOK || code == ZNONODE)) throw KeeperException(code, path); @@ -461,43 +350,35 @@ bool ZooKeeper::existsWatch(const std::string & path, Stat * stat_, const WatchC return true; } -int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * stat_, WatchCallback watch_callback) +int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback) { - DB::PODArray buffer; - buffer.resize(MAX_NODE_SIZE); - int buffer_len = MAX_NODE_SIZE; + int32_t code = 0; + Poco::Event event; - int32_t code; - Stat stat; - watcher_fn watcher = watch_callback ? processCallback : nullptr; - WatchContext * context = createContext(std::move(watch_callback)); + auto callback = [&](const ZooKeeperImpl::ZooKeeper::GetResponse & response) + { + code = response.error; + if (!code) + { + res = response.data; + if (stat) + *stat = response.stat; + } + event.set(); + }; + + impl.get(path, callback, watch_callback); + event.wait(); - code = zoo_wget(impl, path.c_str(), watcher, context, buffer.data(), &buffer_len, &stat); ProfileEvents::increment(ProfileEvents::ZooKeeperGet); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); - - if (code == ZOK) - { - if (stat_) - *stat_ = stat; - - if (buffer_len < 0) /// This can happen if the node contains NULL. Do not distinguish it from the empty string. - res.clear(); - else - res.assign(buffer.data(), buffer_len); - } - else - { - /// The call was unsuccessful, so the watch was not set. Destroy the context. - destroyContext(context); - } return code; } std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr & watch) { - int code; + int32_t code = 0; std::string res; if (tryGet(path, res, stat, watch, &code)) return res; @@ -505,14 +386,14 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code); } -bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, const EventPtr & watch, int * return_code) +bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, const EventPtr & watch, int * return_code) { - return tryGetWatch(path, res, stat_, callbackForEvent(watch), return_code); + return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code); } -bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat_, const WatchCallback & watch_callback, int * return_code) +bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * return_code) { - int32_t code = retry(std::bind(&ZooKeeper::getImpl, this, std::ref(path), std::ref(res), stat_, watch_callback)); + int32_t code = getImpl(path, res, stat, watch_callback); if (!(code == ZOK || code == ZNONODE)) throw KeeperException(code, path); @@ -524,18 +405,24 @@ bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * } int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data, - int32_t version, Stat * stat_) + int32_t version, Stat * stat) { - Stat stat; - int32_t code = zoo_set2(impl, path.c_str(), data.c_str(), data.length(), version, &stat); + int32_t code = 0; + Poco::Event event; + + auto callback = [&](const ZooKeeperImpl::ZooKeeper::SetResponse & response) + { + code = response.error; + if (!code && stat) + *stat = response.stat; + event.set(); + }; + + impl.set(path, data, version, callback); + event.wait(); + ProfileEvents::increment(ProfileEvents::ZooKeeperSet); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); - - if (code == ZOK) - { - if (stat_) - *stat_ = stat; - } return code; } @@ -546,7 +433,7 @@ void ZooKeeper::set(const std::string & path, const std::string & data, int32_t void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode) { - int code = trySet(path, data, -1); + int32_t code = trySet(path, data, -1); if (code == ZNONODE) { create(path, data, mode); @@ -556,9 +443,9 @@ void ZooKeeper::createOrUpdate(const std::string & path, const std::string & dat } int32_t ZooKeeper::trySet(const std::string & path, const std::string & data, - int32_t version, Stat * stat_) + int32_t version, Stat * stat) { - int32_t code = setImpl(path, data, version, stat_); + int32_t code = setImpl(path, data, version, stat); if (!(code == ZOK || code == ZNONODE || @@ -567,31 +454,12 @@ int32_t ZooKeeper::trySet(const std::string & path, const std::string & data, return code; } -/// Makes deep copy of zoo_op_result_t and removes chroot prefix from paths -static void convertOpResults(const std::vector & op_results_native, OpResultsPtr & out_op_results, - const ZooKeeper * zookeeper = nullptr) -{ - if (!out_op_results) - out_op_results = std::make_shared(); - - out_op_results->reserve(op_results_native.size()); - for (const zoo_op_result_t & res_native : op_results_native) - out_op_results->emplace_back(res_native, zookeeper); -} int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_op_results, MultiTransactionInfo * out_info) { if (ops_.empty()) return ZOK; - /// Workaround of the libzookeeper bug. If the session is expired, zoo_multi sometimes - /// segfaults. - /// Possibly, there is a race condition and a segfault is still possible if the session - /// expires between this check and zoo_multi call. - /// TODO: check if the bug is fixed in the latest version of libzookeeper. - if (expired()) - return ZINVALIDSTATE; - size_t count = ops_.size(); std::vector out_results_native(count); @@ -624,7 +492,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_op_results, Mu OpResultsPtr ZooKeeper::multi(const Ops & ops) { OpResultsPtr op_results; - int code = multiImpl(ops, &op_results); + int32_t code = multiImpl(ops, &op_results); KeeperMultiException::check(code, ops, op_results); return op_results; } @@ -734,7 +602,7 @@ ZooKeeper::~ZooKeeper() { LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session"); - int code = zookeeper_close(impl); + int32_t code = zookeeper_close(impl); if (code != ZOK) { LOG_ERROR(&Logger::get("~ZooKeeper"), "Failed to close ZooKeeper session: " << zerror(code)); @@ -766,12 +634,6 @@ ACLPtr ZooKeeper::getDefaultACL() return default_acl; } -void ZooKeeper::setDefaultACL(ACLPtr new_acl) -{ - std::lock_guard lock(mutex); - default_acl = new_acl; -} - std::string ZooKeeper::error2string(int32_t code) { return zerror(code); @@ -1097,7 +959,7 @@ KeeperMultiException::KeeperMultiException(const MultiTransactionInfo & info_, s info_.code), info(info_) {} -void KeeperMultiException::check(int code, const Ops & ops, const OpResultsPtr & op_results) +void KeeperMultiException::check(int32_t code, const Ops & ops, const OpResultsPtr & op_results) { if (code == ZOK) {} else if (zkutil::isUserError(code)) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index 52f1968eba6..0c52727cf33 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -29,8 +29,6 @@ namespace zkutil { const UInt32 DEFAULT_SESSION_TIMEOUT = 30000; -const UInt32 MEDIUM_SESSION_TIMEOUT = 120000; -const UInt32 BIG_SESSION_TIMEOUT = 600000; /// Preferred size of multi() command (in number of ops) constexpr size_t MULTI_BATCH_SIZE = 100; @@ -84,20 +82,10 @@ public: /// This object remains unchanged, and the new session is returned. Ptr startNewSession() const; - /// Returns true, if the session has expired forever. - /// This is possible only if the connection has been established, then lost and re-established - /// again, but too late. - /// In contrast, if, for instance, the server name or port is misconfigured, connection - /// attempts will continue indefinitely, expired() will return false and all method calls - /// will raise ConnectionLoss exception. - /// Also returns true if is_dirty flag is set - a request to close the session ASAP. + /// Returns true, if the session has expired. bool expired(); - ACLPtr getDefaultACL(); - - void setDefaultACL(ACLPtr new_acl); - - /// Create a znode. ACL set by setDefaultACL is used (full access to everybody by default). + /// Create a znode. /// Throw an exception if something went wrong. std::string create(const std::string & path, const std::string & data, int32_t mode); @@ -108,12 +96,9 @@ public: /// In case of other errors throws an exception. int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode); - int32_t tryCreateWithRetries(const std::string & path, const std::string & data, int32_t mode, - std::string & path_created, size_t * attempt = nullptr); /// Create a Persistent node. /// Does nothing if the node already exists. - /// Retries on ConnectionLoss or OperationTimeout. void createIfNotExists(const std::string & path, const std::string & data); /// Creates all non-existent ancestors of the given path with empty contents. @@ -123,42 +108,11 @@ public: /// Remove the node if the version matches. (if version == -1, remove any version). void remove(const std::string & path, int32_t version = -1); - /// Removes the node. In case of network errors tries to remove again. - /// ZNONODE error for the second and the following tries is ignored. - void removeWithRetries(const std::string & path, int32_t version = -1); - /// Doesn't throw in the following cases: /// * The node doesn't exist /// * Versions don't match /// * The node has children. int32_t tryRemove(const std::string & path, int32_t version = -1); - /// Retries in case of network errors, returns ZNONODE if the node is already removed. - int32_t tryRemoveWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr); - - /// The same, but sets is_dirty flag if all removal attempts were unsuccessful. - /// This is needed because the session might still exist after all retries, - /// even if more time than session_timeout has passed. - /// So we do not rely on the ephemeral node being deleted and set is_dirty to - /// try and close the session ASAP. - /** Ridiculously Long Delay to Expire - When disconnects do happen, the common case should be a very* quick - reconnect to another server, but an extended network outage may - introduce a long delay before a client can reconnect to the ZooKeep‐ - er service. Some developers wonder why the ZooKeeper client li‐ - brary doesn’t simply decide at some point (perhaps twice the session - timeout) that enough is enough and kill the session itself. - There are two answers to this. First, ZooKeeper leaves this kind of - policy decision up to the developer. Developers can easily implement - such a policy by closing the handle themselves. Second, when a Zoo‐ - Keeper ensemble goes down, time freezes. Thus, when the ensemble is - brought back up, session timeouts are restarted. If processes using - ZooKeeper hang in there, they may find out that the long timeout was - due to an extended ensemble failure that has recovered and pick right - up where they left off without any additional startup delay. - - ZooKeeper: Distributed Process Coordination p118 - */ - int32_t tryRemoveEphemeralNodeWithRetries(const std::string & path, int32_t version = -1, size_t * attempt = nullptr); bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback); @@ -199,8 +153,6 @@ public: /// Throws only if some operation has returned an "unexpected" error /// - an error that would cause the corresponding try- method to throw. int32_t tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr); - /// Use only with read-only operations. - int32_t tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results = nullptr, size_t * attempt = nullptr); /// Throws nothing, just alias of multiImpl int32_t tryMultiNoThrow(const Ops & ops, OpResultsPtr * out_op_results = nullptr, MultiTransactionInfo * out_info = nullptr) { @@ -339,34 +291,15 @@ public: using TryRemoveFuture = Future; TryRemoveFuture asyncTryRemove(const std::string & path, int32_t version = -1); - struct OpResultsAndCode - { - OpResultsPtr results; - std::shared_ptr ops_ptr; - int code; - }; using MultiFuture = Future; MultiFuture asyncMulti(const Ops & ops); /// Like the previous one but don't throw any exceptions on future.get() MultiFuture tryAsyncMulti(const Ops & ops); - static std::string error2string(int32_t code); - /// Max size of node contents in bytes. - /// In 3.4.5 max node size is 1Mb. - static const size_t MAX_NODE_SIZE = 1048576; - - /// Length of the suffix that ZooKeeper adds to sequential nodes. - /// In fact it is smaller, but round it up for convenience. - static const size_t SEQUENTIAL_SUFFIX_SIZE = 64; - - - zhandle_t * getHandle() { return impl; } - private: - friend struct WatchContext; friend class EphemeralNodeHolder; friend struct OpResult; @@ -375,34 +308,6 @@ private: void removeChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path); - static WatchCallback callbackForEvent(const EventPtr & event); - WatchContext * createContext(WatchCallback && callback); - static void destroyContext(WatchContext * context); - static void processCallback(zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx); - - template - int32_t retry(T && operation, size_t * attempt = nullptr) - { - int32_t code = operation(); - if (attempt) - *attempt = 0; - for (size_t i = 0; (i < retry_num) && (code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS); ++i) - { - if (attempt) - *attempt = i; - - /// If the connection has been lost, wait timeout/3 hoping for connection re-establishment. - static const int MAX_SLEEP_TIME = 10; - if (code == ZCONNECTIONLOSS) - usleep(std::min(session_timeout_ms * 1000u / 3, MAX_SLEEP_TIME * 1000u * 1000u)); - - LOG_WARNING(log, "Error on attempt " << i << ": " << error2string(code) << ". Retry"); - code = operation(); - } - - return code; - } - /// The following methods don't throw exceptions but return error codes. int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); int32_t removeImpl(const std::string & path, int32_t version = -1); @@ -414,25 +319,16 @@ private: MultiFuture asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception); + ZooKeeperImpl::ZooKeeper impl; + std::string hosts; std::string identity; int32_t session_timeout_ms; std::string chroot; std::mutex mutex; - ACLPtr default_acl; - zhandle_t * impl; - std::unordered_set watch_context_store; - - /// Retries number in case of OperationTimeout or ConnectionLoss errors. - static constexpr size_t retry_num = 3; Logger * log = nullptr; - - /// If true, there were unsuccessfull attempts to remove ephemeral nodes. - /// It is better to close the session to remove ephemeral nodes with certainty - /// instead of continuing to use re-established session. - bool is_dirty = false; }; @@ -476,11 +372,7 @@ public: { try { - /// Important: if the ZooKeeper is temporarily unavailable, repeated attempts to - /// delete the node are made. - /// Otherwise it is possible that EphemeralNodeHolder is destroyed, - /// but the session has recovered and the node in ZooKeeper remains for the long time. - zookeeper.tryRemoveEphemeralNodeWithRetries(path); + zookeeper.tryRemove(path); } catch (const KeeperException & e) { diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 6e4ea0b808d..48bd0f3a5e6 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -306,7 +306,7 @@ void read(String & s, ReadBuffer & in) int32_t size = 0; read(size, in); if (size < 0) - throw Exception("Negative size"); + throw Exception("Negative size"); /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string. if (size > max_string_size) throw Exception("Too large string size"); /// TODO error code s.resize(size); @@ -555,7 +555,6 @@ void ZooKeeper::receiveHandshake() int32_t handshake_length; int32_t protocol_version_read; int32_t timeout; - int64_t session_id; constexpr int32_t passwd_len = 16; std::array passwd; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index 66ddf3a8a48..9fa4677ac15 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -342,8 +342,13 @@ public: ~ZooKeeper(); - /// If not valid, you can only destroy the object. All other methods will throw exception. - bool isValid() const { return !expired; } + + /// If expired, you can only destroy the object. All other methods will throw exception. + bool isExpired() const { return !expired; } + + /// Useful to check owner of ephemeral node. + int64_t getSessionID() const { return session_id; } + using CreateCallback = std::function; using RemoveCallback = std::function; @@ -476,6 +481,7 @@ private: std::optional in; std::optional out; + int64_t session_id = 0; std::atomic xid {1}; struct RequestInfo diff --git a/utils/zookeeper-dump-tree/main.cpp b/utils/zookeeper-dump-tree/main.cpp index 4c051273004..cabfe1c2068 100644 --- a/utils/zookeeper-dump-tree/main.cpp +++ b/utils/zookeeper-dump-tree/main.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include /** Outputs paths of all ZK nodes in arbitrary order. Possibly only in specified directory. @@ -106,7 +106,7 @@ try return 1; } - zkutil::ZooKeeper zookeeper_(options.at("address").as()); + ZooKeeperImpl::ZooKeeper zookeeper(options.at("address").as()); zookeeper = &zookeeper_; states.emplace_back();