From 22794bc93ea1fb0d3c678838391c5b0c3791cecd Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Mon, 12 Mar 2018 20:45:24 +0300 Subject: [PATCH] ZooKeeper multi op is transparent with chroot prefixes. [#CLICKHOUSE-3639] --- dbms/src/Common/ZooKeeper/Types.h | 13 ++- dbms/src/Common/ZooKeeper/ZooKeeper.cpp | 100 ++++++++++++------ dbms/src/Common/ZooKeeper/ZooKeeper.h | 6 +- .../gtest_zkutil_test_multi_exception.cpp | 12 ++- 4 files changed, 89 insertions(+), 42 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/Types.h b/dbms/src/Common/ZooKeeper/Types.h index 4097627f395..08183f8128d 100644 --- a/dbms/src/Common/ZooKeeper/Types.h +++ b/dbms/src/Common/ZooKeeper/Types.h @@ -12,6 +12,8 @@ namespace zkutil using ACLPtr = const ACL_vector *; using Stat = ::Stat; +class ZooKeeper; + struct Op { @@ -136,10 +138,15 @@ private: int32_t version; }; -struct OpResult : public zoo_op_result_t +/// C++ version of zoo_op_result_t +struct OpResult { - /// Pointers in this class point to fields of class Op. - /// Op instances have the same (or longer lifetime), therefore destructor is not required. + int err; + std::string value; + std::unique_ptr stat; + + /// ZooKeeper is required for correct chroot path prefixes handling + explicit OpResult(const zoo_op_result_t & op_result, const ZooKeeper * zookeeper = nullptr); }; using OpPtr = std::unique_ptr; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 85bbe140d39..105c275e711 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -82,19 +82,21 @@ void ZooKeeper::processCallback(zhandle_t *, int type, int state, const char * p } void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, - int32_t session_timeout_ms_, bool check_root_exists) + int32_t session_timeout_ms_, const std::string & chroot_) { log = &Logger::get("ZooKeeper"); - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); + zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG); hosts = hosts_; identity = identity_; session_timeout_ms = session_timeout_ms_; + chroot = chroot_; - impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0); + std::string hosts_for_lib = hosts + chroot; + impl = zookeeper_init(hosts_for_lib.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0); ProfileEvents::increment(ProfileEvents::ZooKeeperInit); if (!impl) - throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts); + throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts_for_lib); if (!identity.empty()) { @@ -107,16 +109,16 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, else default_acl = &ZOO_OPEN_ACL_UNSAFE; - LOG_TRACE(log, "initialized, hosts: " << hosts); + LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot)); - if (check_root_exists && !exists("/")) - throw KeeperException("Zookeeper root doesn't exist. You should create root node before start."); + if (!chroot.empty() && !exists("/")) + throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start."); } ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity, - int32_t session_timeout_ms, bool check_root_exists) + int32_t session_timeout_ms, const std::string & chroot) { - init(hosts, identity, session_timeout_ms, check_root_exists); + init(hosts, identity, session_timeout_ms, chroot); } struct ZooKeeperArgs @@ -127,10 +129,8 @@ struct ZooKeeperArgs config.keys(config_name, keys); std::vector hosts_strings; - std::string root; session_timeout_ms = DEFAULT_SESSION_TIMEOUT; - has_chroot = false; for (const auto & key : keys) { if (startsWith(key, "node")) @@ -150,7 +150,7 @@ struct ZooKeeperArgs } else if (key == "root") { - root = config.getString(config_name + "." + key); + chroot = config.getString(config_name + "." + key); } else throw KeeperException(std::string("Unknown key ") + key + " in config file"); } @@ -166,28 +166,25 @@ struct ZooKeeperArgs hosts += host; } - if (!root.empty()) + if (!chroot.empty()) { - if (root.front() != '/') - throw KeeperException(std::string("Root path in config file should start with '/', but got ") + root); - if (root.back() == '/') - root.pop_back(); - - hosts += root; - has_chroot = true; + if (chroot.front() != '/') + throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot); + if (chroot.back() == '/') + chroot.pop_back(); } } std::string hosts; std::string identity; int session_timeout_ms; - bool has_chroot; + std::string chroot; }; ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) { ZooKeeperArgs args(config, config_name); - init(args.hosts, args.identity, args.session_timeout_ms, args.has_chroot); + init(args.hosts, args.identity, args.session_timeout_ms, args.chroot); } WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event) @@ -290,19 +287,18 @@ int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data int code; /// The name of the created node can be longer than path if the sequential node is created. size_t name_buffer_size = path.size() + SEQUENTIAL_SUFFIX_SIZE; - char * name_buffer = new char[name_buffer_size]; + std::string name_buffer(name_buffer_size, '\0'); - code = zoo_create(impl, path.c_str(), data.c_str(), data.size(), getDefaultACL(), mode, name_buffer, name_buffer_size); + code = zoo_create(impl, path.c_str(), data.c_str(), data.size(), getDefaultACL(), mode, name_buffer.data(), name_buffer_size); ProfileEvents::increment(ProfileEvents::ZooKeeperCreate); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); if (code == ZOK) { - path_created = std::string(name_buffer); + name_buffer.resize(strlen(name_buffer.data())); + path_created = std::move(name_buffer); } - delete[] name_buffer; - return code; } @@ -571,6 +567,18 @@ int32_t ZooKeeper::trySet(const std::string & path, const std::string & data, return code; } + +static void convertOpResults(const std::vector & out_results_native, OpResultsPtr & out_results, + const ZooKeeper * zookeeper = nullptr) +{ + if (!out_results) + out_results = std::make_shared(); + + out_results->reserve(out_results_native.size()); + for (const zoo_op_result_t & res_native : out_results_native) + out_results->emplace_back(res_native, zookeeper); +} + int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_) { if (ops_.empty()) @@ -585,7 +593,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_) return ZINVALIDSTATE; size_t count = ops_.size(); - OpResultsPtr out_results(new OpResults(count)); + std::vector out_results_native(count); /// Copy the struct containing pointers with default copy-constructor. /// It is safe because it hasn't got a destructor. @@ -594,12 +602,12 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_) for (const auto & op : ops_) ops.push_back(*(op->data)); - int32_t code = zoo_multi(impl, static_cast(ops.size()), ops.data(), out_results->data()); + int32_t code = zoo_multi(impl, static_cast(ops.size()), ops.data(), out_results_native.data()); ProfileEvents::increment(ProfileEvents::ZooKeeperMulti); ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); if (out_results_) - *out_results_ = out_results; + convertOpResults(out_results_native, *out_results_, this); return code; } @@ -754,7 +762,7 @@ ZooKeeper::~ZooKeeper() ZooKeeperPtr ZooKeeper::startNewSession() const { - return std::make_shared(hosts, identity, session_timeout_ms); + return std::make_shared(hosts, identity, session_timeout_ms, chroot); } Op::Create::Create(const std::string & path_pattern_, const std::string & value_, ACLPtr acl_, int32_t flags_) @@ -982,7 +990,7 @@ ZooKeeper::TryRemoveFuture ZooKeeper::asyncTryRemove(const std::string & path, i ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception) { size_t count = ops_.size(); - OpResultsPtr results(new OpResults(count)); + auto results_native = std::make_shared>(count); /// We need to hold all references to ops data until the end of multi callback struct OpsHolder @@ -997,10 +1005,10 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data); } - MultiFuture future{ [throw_exception, results, holder] (int rc) { + MultiFuture future{ [throw_exception, results_native, holder, zookeeper=this] (int rc) { OpResultsAndCode res; res.code = rc; - res.results = results; + convertOpResults(*results_native, res.results, zookeeper); res.ops_ptr = holder.ops_ptr; if (throw_exception && rc != ZOK) throw zkutil::KeeperException(rc); @@ -1020,7 +1028,7 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool auto & ops = *holder.ops_raw_ptr; - int32_t code = zoo_amulti(impl, static_cast(ops.size()), ops.data(), results->data(), + int32_t code = zoo_amulti(impl, static_cast(ops.size()), ops.data(), results_native->data(), [] (int rc, const void * data) { MultiFuture::TaskPtr owned_task = @@ -1069,4 +1077,26 @@ size_t getFailedOpIndex(const OpResultsPtr & op_results, int32_t transaction_ret } +OpResult::OpResult(const zoo_op_result_t & op_result, const ZooKeeper * zookeeper) + : err(op_result.err) +{ + if (op_result.value) + { + value = std::string(op_result.value, op_result.value + op_result.valuelen); + + /// Current version of libzookeeper does not cut chroot path prefixes + /// We do it here manually + if (zookeeper && !zookeeper->chroot.empty()) + { + if (startsWith(value, zookeeper->chroot)) + value = value.substr(zookeeper->chroot.length()); + else + throw DB::Exception("Expected ZooKeeper path with chroot " + zookeeper->chroot + ", got " + value, + DB::ErrorCodes::LOGICAL_ERROR); + } + } + + if (op_result.stat) + stat = std::make_unique(*op_result.stat); +} } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index 8d35e37d27f..29b1d2fb69b 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -56,7 +56,7 @@ public: using Ptr = std::shared_ptr; ZooKeeper(const std::string & hosts, const std::string & identity = "", - int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT, bool check_root_exists = false); + int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT, const std::string & chroot = ""); /** Config of the form: @@ -366,9 +366,10 @@ public: private: friend struct WatchContext; friend class EphemeralNodeHolder; + friend class OpResult; void init(const std::string & hosts, const std::string & identity, - int32_t session_timeout_ms, bool check_root_exists); + int32_t session_timeout_ms, const std::string & chroot); void removeChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path); @@ -414,6 +415,7 @@ private: std::string hosts; std::string identity; int32_t session_timeout_ms; + std::string chroot; std::mutex mutex; ACLPtr default_acl; diff --git a/dbms/src/Common/ZooKeeper/tests/gtest_zkutil_test_multi_exception.cpp b/dbms/src/Common/ZooKeeper/tests/gtest_zkutil_test_multi_exception.cpp index df71feab43e..4d1d05b54e1 100644 --- a/dbms/src/Common/ZooKeeper/tests/gtest_zkutil_test_multi_exception.cpp +++ b/dbms/src/Common/ZooKeeper/tests/gtest_zkutil_test_multi_exception.cpp @@ -134,10 +134,13 @@ TEST(zkutil, multi_create_sequential) try { auto zookeeper = std::make_unique("localhost:2181"); + zookeeper->createAncestors("/clickhouse_test/"); + + zookeeper = std::make_unique("localhost:2181", "", zkutil::DEFAULT_SESSION_TIMEOUT, "/clickhouse_test"); auto acl = zookeeper->getDefaultACL(); zkutil::Ops ops; - String base_path = "/clickhouse_test/zkutil/multi_create_sequential"; + String base_path = "/zkutil/multi_create_sequential"; zookeeper->tryRemoveRecursive(base_path); zookeeper->createAncestors(base_path + "/"); @@ -147,6 +150,10 @@ TEST(zkutil, multi_create_sequential) zkutil::OpResult & result = results->at(0); EXPECT_TRUE(result.value != nullptr); + + String path = result.value; + std::cout << path << "\n"; + EXPECT_TRUE(startsWith(result.value, entry_path)); } catch (...) @@ -154,5 +161,6 @@ TEST(zkutil, multi_create_sequential) std::cerr << getCurrentExceptionMessage(false); throw; } - } + +