mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 05:00:47 +00:00
Refactor of asyncMulti. [#CLICKHOUSE-2]
This commit is contained in:
parent
8028ba0de9
commit
f38dc334d6
@ -978,26 +978,27 @@ ZooKeeper::TryRemoveFuture ZooKeeper::asyncTryRemove(const std::string & path, i
|
|||||||
|
|
||||||
ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception)
|
ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception)
|
||||||
{
|
{
|
||||||
size_t count = ops_.size();
|
|
||||||
auto results_native = std::make_shared<std::vector<zoo_op_result_t>>(count);
|
|
||||||
|
|
||||||
/// We need to hold all references to ops data until the end of multi callback
|
/// We need to hold all references to ops data until the end of multi callback
|
||||||
struct OpsHolder
|
struct OpsHolder
|
||||||
{
|
{
|
||||||
std::shared_ptr<zkutil::Ops> ops_ptr = std::make_shared<zkutil::Ops>();
|
std::shared_ptr<zkutil::Ops> ops_ptr;
|
||||||
std::shared_ptr<std::vector<zoo_op_t>> ops_raw_ptr = std::make_shared<std::vector<zoo_op_t>>();
|
std::shared_ptr<std::vector<zoo_op_t>> ops_native;
|
||||||
|
std::shared_ptr<std::vector<zoo_op_result_t>> op_results_native;
|
||||||
} holder;
|
} holder;
|
||||||
|
|
||||||
for (const auto & op : ops_)
|
/// Copy ops (swallow copy)
|
||||||
{
|
holder.ops_ptr = std::make_shared<zkutil::Ops>(ops_);
|
||||||
holder.ops_ptr->emplace_back(op->clone());
|
/// Copy native ops to contiguous vector
|
||||||
holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data);
|
holder.ops_native = std::make_shared<std::vector<zoo_op_t>>();
|
||||||
}
|
for (const OpPtr & op : *holder.ops_ptr)
|
||||||
|
holder.ops_native->push_back(*op->data);
|
||||||
|
/// Allocate native result holders
|
||||||
|
holder.op_results_native = std::make_shared<std::vector<zoo_op_result_t>>(holder.ops_ptr->size());
|
||||||
|
|
||||||
MultiFuture future{ [throw_exception, results_native, holder, zookeeper=this] (int rc) {
|
MultiFuture future{ [throw_exception, holder, zookeeper=this] (int rc) {
|
||||||
OpResultsAndCode res;
|
OpResultsAndCode res;
|
||||||
res.code = rc;
|
res.code = rc;
|
||||||
convertOpResults(*results_native, res.results, zookeeper);
|
convertOpResults(*holder.op_results_native, res.results, zookeeper);
|
||||||
res.ops_ptr = holder.ops_ptr;
|
res.ops_ptr = holder.ops_ptr;
|
||||||
if (throw_exception && rc != ZOK)
|
if (throw_exception && rc != ZOK)
|
||||||
throw zkutil::KeeperException(rc);
|
throw zkutil::KeeperException(rc);
|
||||||
@ -1015,9 +1016,9 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool
|
|||||||
if (expired())
|
if (expired())
|
||||||
throw KeeperException(ZINVALIDSTATE);
|
throw KeeperException(ZINVALIDSTATE);
|
||||||
|
|
||||||
auto & ops = *holder.ops_raw_ptr;
|
int32_t code = zoo_amulti(impl, static_cast<int>(holder.ops_native->size()),
|
||||||
|
holder.ops_native->data(),
|
||||||
int32_t code = zoo_amulti(impl, static_cast<int>(ops.size()), ops.data(), results_native->data(),
|
holder.op_results_native->data(),
|
||||||
[] (int rc, const void * data)
|
[] (int rc, const void * data)
|
||||||
{
|
{
|
||||||
MultiFuture::TaskPtr owned_task =
|
MultiFuture::TaskPtr owned_task =
|
||||||
|
@ -6,6 +6,9 @@
|
|||||||
#pragma GCC diagnostic push
|
#pragma GCC diagnostic push
|
||||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include <Common/ShellCommand.h>
|
||||||
|
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
||||||
using namespace DB;
|
using namespace DB;
|
||||||
@ -32,32 +35,32 @@ TEST(zkutil, multi_nice_exception_msg)
|
|||||||
zkutil::Ops ops;
|
zkutil::Ops ops;
|
||||||
|
|
||||||
ASSERT_NO_THROW(
|
ASSERT_NO_THROW(
|
||||||
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
|
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
|
||||||
|
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
zookeeper->multi(ops);
|
zookeeper->multi(ops);
|
||||||
);
|
);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ops.clear();
|
ops.clear();
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/c", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/c", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi/c", -1));
|
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test/zkutil_multi/c", -1));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "BadBoy", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "BadBoy", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/b", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/b", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
|
|
||||||
zookeeper->multi(ops);
|
zookeeper->multi(ops);
|
||||||
FAIL();
|
FAIL();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
|
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
|
||||||
|
|
||||||
String msg = getCurrentExceptionMessage(false);
|
String msg = getCurrentExceptionMessage(false);
|
||||||
|
|
||||||
bool msg_has_reqired_patterns = msg.find("/clickhouse_test_zkutil_multi/a") != std::string::npos && msg.find("#2") != std::string::npos;
|
bool msg_has_reqired_patterns = msg.find("/clickhouse_test/zkutil_multi/a") != std::string::npos && msg.find("#2") != std::string::npos;
|
||||||
EXPECT_TRUE(msg_has_reqired_patterns) << msg;
|
EXPECT_TRUE(msg_has_reqired_patterns) << msg;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -69,7 +72,7 @@ TEST(zkutil, multi_async)
|
|||||||
auto acl = zookeeper->getDefaultACL();
|
auto acl = zookeeper->getDefaultACL();
|
||||||
zkutil::Ops ops;
|
zkutil::Ops ops;
|
||||||
|
|
||||||
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
|
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
|
||||||
|
|
||||||
{
|
{
|
||||||
ops.clear();
|
ops.clear();
|
||||||
@ -78,8 +81,8 @@ TEST(zkutil, multi_async)
|
|||||||
|
|
||||||
{
|
{
|
||||||
ops.clear();
|
ops.clear();
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "", acl, zkutil::CreateMode::Persistent));
|
||||||
|
|
||||||
auto fut = zookeeper->tryAsyncMulti(ops);
|
auto fut = zookeeper->tryAsyncMulti(ops);
|
||||||
ops.clear();
|
ops.clear();
|
||||||
@ -97,11 +100,11 @@ TEST(zkutil, multi_async)
|
|||||||
for (size_t i = 0; i < 10000; ++i)
|
for (size_t i = 0; i < 10000; ++i)
|
||||||
{
|
{
|
||||||
ops.clear();
|
ops.clear();
|
||||||
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi", -1));
|
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test/zkutil_multi", -1));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test_zkutil_multi", -1));
|
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test/zkutil_multi", -1));
|
||||||
ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test_zkutil_multi", "xxx", 42));
|
ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test/zkutil_multi", "xxx", 42));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
|
|
||||||
futures.emplace_back(zookeeper->asyncMulti(ops));
|
futures.emplace_back(zookeeper->asyncMulti(ops));
|
||||||
}
|
}
|
||||||
@ -115,8 +118,8 @@ TEST(zkutil, multi_async)
|
|||||||
|
|
||||||
{
|
{
|
||||||
ops.clear();
|
ops.clear();
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
|
||||||
|
|
||||||
auto fut = zookeeper->tryAsyncMulti(ops);
|
auto fut = zookeeper->tryAsyncMulti(ops);
|
||||||
ops.clear();
|
ops.clear();
|
||||||
@ -128,11 +131,30 @@ TEST(zkutil, multi_async)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run this test under sudo
|
||||||
|
TEST(zkutil, multi_async_libzookeeper_segfault)
|
||||||
|
{
|
||||||
|
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181", "", 1000);
|
||||||
|
zkutil::Ops ops;
|
||||||
|
|
||||||
|
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test/zkutil_multi", 0));
|
||||||
|
|
||||||
|
/// Uncomment to test
|
||||||
|
//auto cmd = ShellCommand::execute("sudo service zookeeper restart");
|
||||||
|
//cmd->wait();
|
||||||
|
|
||||||
|
auto future = zookeeper->asyncMulti(ops);
|
||||||
|
auto res = future.get();
|
||||||
|
|
||||||
|
EXPECT_TRUE(zkutil::isUnrecoverableErrorCode(res.code));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST(zkutil, multi_create_sequential)
|
TEST(zkutil, multi_create_sequential)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
/// Create chroot node firstly
|
||||||
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
|
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
|
||||||
zookeeper->createAncestors("/clickhouse_test/");
|
zookeeper->createAncestors("/clickhouse_test/");
|
||||||
|
|
||||||
@ -144,17 +166,14 @@ TEST(zkutil, multi_create_sequential)
|
|||||||
zookeeper->tryRemoveRecursive(base_path);
|
zookeeper->tryRemoveRecursive(base_path);
|
||||||
zookeeper->createAncestors(base_path + "/");
|
zookeeper->createAncestors(base_path + "/");
|
||||||
|
|
||||||
String entry_path = base_path + "/queue-";
|
String sequential_node_prefix = base_path + "/queue-";
|
||||||
ops.emplace_back(new zkutil::Op::Create(entry_path, "", acl, zkutil::CreateMode::EphemeralSequential));
|
ops.emplace_back(new zkutil::Op::Create(sequential_node_prefix, "", acl, zkutil::CreateMode::EphemeralSequential));
|
||||||
zkutil::OpResultsPtr results = zookeeper->multi(ops);
|
zkutil::OpResultsPtr results = zookeeper->multi(ops);
|
||||||
zkutil::OpResult & result = results->at(0);
|
zkutil::OpResult & sequential_node_result_op = results->at(0);
|
||||||
|
|
||||||
EXPECT_TRUE(result.value != nullptr);
|
EXPECT_FALSE(sequential_node_result_op.value.empty());
|
||||||
|
EXPECT_GT(sequential_node_result_op.value.length(), sequential_node_prefix.length());
|
||||||
String path = result.value;
|
EXPECT_EQ(sequential_node_result_op.value.substr(0, sequential_node_prefix.length()), sequential_node_prefix);
|
||||||
std::cout << path << "\n";
|
|
||||||
|
|
||||||
EXPECT_TRUE(startsWith(result.value, entry_path));
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user