From 23af9ddd5e37a7160e1d0015643443395abf5076 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Thu, 10 Aug 2017 02:07:56 +0300 Subject: [PATCH] Fixed segfault: the future owns source ops. [#CLICKHOUSE-3207] --- dbms/src/Common/ZooKeeper/Types.h | 41 +++++++++++++++---- dbms/src/Common/ZooKeeper/ZooKeeper.cpp | 25 +++++++---- dbms/src/Common/ZooKeeper/ZooKeeper.h | 6 +-- .../tests/zkutil_test_multi_exception.cpp | 29 ++++++++++++- 4 files changed, 83 insertions(+), 18 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/Types.h b/dbms/src/Common/ZooKeeper/Types.h index 72b6e2852a4..a30e4715bc5 100644 --- a/dbms/src/Common/ZooKeeper/Types.h +++ b/dbms/src/Common/ZooKeeper/Types.h @@ -19,6 +19,8 @@ public: Op() : data(new zoo_op_t) {} virtual ~Op() {} + virtual std::unique_ptr clone() const = 0; + virtual std::string describe() = 0; std::unique_ptr data; @@ -31,21 +33,32 @@ public: struct Op::Remove : public Op { - Remove(const std::string & path_, int32_t version) : - path(path_) + Remove(const std::string & path_, int32_t version_) : + path(path_), version(version_) { zoo_delete_op_init(data.get(), path.c_str(), version); } + std::unique_ptr clone() const override + { + return std::unique_ptr(new Remove(path, version)); + } + std::string describe() override { return "command: remove, path: " + path; } private: std::string path; + int32_t version; }; struct Op::Create : public Op { - Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags); + Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_); + + std::unique_ptr clone() const override + { + return std::unique_ptr(new Create(path, value, acl, flags)); + } std::string getPathCreated() { @@ -62,17 +75,24 @@ struct Op::Create : public Op private: std::string path; std::string value; + ACLPtr acl; + int32_t flags; std::vector created_path; }; struct Op::SetData : public Op { - SetData(const std::string & path_, const std::string & value_, int32_t version) : - path(path_), value(value_) + SetData(const std::string & path_, const std::string & value_, int32_t version_) : + path(path_), value(value_), version(version_) { zoo_set_op_init(data.get(), path.c_str(), value.c_str(), value.size(), version, &stat); } + std::unique_ptr clone() const override + { + return std::unique_ptr(new SetData(path, value, version)); + } + std::string describe() override { return @@ -85,21 +105,28 @@ struct Op::SetData : public Op private: std::string path; std::string value; + int32_t version; Stat stat; }; struct Op::Check : public Op { - Check(const std::string & path_, int32_t version) : - path(path_) + Check(const std::string & path_, int32_t version_) : + path(path_), version(version_) { zoo_check_op_init(data.get(), path.c_str(), version); } + std::unique_ptr clone() const override + { + return std::unique_ptr(new Check(path, version)); + } + std::string describe() override { return "command: check, path: " + path; } private: std::string path; + int32_t version; }; struct OpResult : public zoo_op_result_t diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 4b9ca2f6805..e03b7ab7182 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -710,8 +710,8 @@ ZooKeeperPtr ZooKeeper::startNewSession() const return std::make_shared(hosts, session_timeout_ms); } -Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags) - : path(path_), value(value_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE) +Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_) + : path(path_), value(value_), acl(acl_), flags(flags_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE) { zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size()); } @@ -907,10 +907,24 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool size_t count = ops_.size(); OpResultsPtr results(new OpResults(count)); - MultiFuture future{ [throw_exception, results] (int rc) { + /// We need to hold all references to ops data until the end of multi callback + struct OpsHolder + { + std::shared_ptr ops_ptr = std::make_shared(); + std::shared_ptr> ops_raw_ptr = std::make_shared>();; + } holder; + + for (const auto & op : ops_) + { + holder.ops_ptr->emplace_back(op->clone()); + holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data); + } + + MultiFuture future{ [throw_exception, results, holder] (int rc) { OpResultsAndCode res; res.code = rc; res.results = results; + res.ops_ptr = holder.ops_ptr; if (throw_exception && rc != ZOK) throw zkutil::KeeperException(rc); return res; @@ -927,10 +941,7 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool if (expired()) throw KeeperException(ZINVALIDSTATE); - /// There is no need to hold these ops until the end of the passed callback - std::vector ops; - for (const auto & op : ops_) - ops.push_back(*(op->data)); + auto & ops = *holder.ops_raw_ptr; int32_t code = zoo_amulti(impl, static_cast(ops.size()), ops.data(), results->data(), [] (int rc, const void * data) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index ba75e372bc8..197cb8083b2 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -326,14 +326,14 @@ public: struct OpResultsAndCode { OpResultsPtr results; - Ops ops; + std::shared_ptr ops_ptr; int code; }; using MultiFuture = Future; - MultiFuture asyncMulti(const zkutil::Ops & ops); + MultiFuture asyncMulti(const Ops & ops); /// Like the previous one but don't throw any exceptions on future.get() - MultiFuture tryAsyncMulti(const zkutil::Ops & ops); + MultiFuture tryAsyncMulti(const Ops & ops); static std::string error2string(int32_t code); diff --git a/dbms/src/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp b/dbms/src/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp index 77af1db15d8..d728d9f0ca6 100644 --- a/dbms/src/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp +++ b/dbms/src/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp @@ -1,6 +1,7 @@ -#include #include #include +#include +#include #include using namespace DB; @@ -67,8 +68,33 @@ TEST(zkutil, multi_async) auto res = fut.get(); ASSERT_TRUE(res.code == ZOK); + ASSERT_EQ(res.results->size(), 2); + ASSERT_EQ(res.ops_ptr->size(), 2); } + EXPECT_ANY_THROW + ( + std::vector futures; + + for (size_t i = 0; i < 10000; ++i) + { + ops.clear(); + ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi", -1)); + ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent)); + ops.emplace_back(new zkutil::Op::Check("/clickhouse_test_zkutil_multi", -1)); + ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test_zkutil_multi", "xxx", 42)); + ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent)); + + futures.emplace_back(zookeeper->asyncMulti(ops)); + } + + futures[0].get(); + ); + + /// Check there are no segfaults for remaining 999 futures + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + { ops.clear(); ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent)); @@ -80,5 +106,6 @@ TEST(zkutil, multi_async) auto res = fut.get(); ASSERT_TRUE(res.code == ZNODEEXISTS); ASSERT_EQ(res.results->size(), 2); + ASSERT_EQ(res.ops_ptr->size(), 2); } } \ No newline at end of file