From 0d11b75defe7dd083b46ff2cd0b8bd795bda6339 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 19 Mar 2018 02:11:57 +0300 Subject: [PATCH] Rewriting ZooKeeper library [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 38 ++--- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 6 +- .../src/Common/ZooKeeper/tests/CMakeLists.txt | 3 + .../tests/zkutil_test_commands_new_lib.cpp | 130 ++++++++++++++++++ 4 files changed, 151 insertions(+), 26 deletions(-) create mode 100644 dbms/src/Common/ZooKeeper/tests/zkutil_test_commands_new_lib.cpp diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index f332f32c3da..c13f38b895c 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -259,7 +259,6 @@ void write(const String & s, WriteBuffer & out) template void write(std::array s, WriteBuffer & out) { - std::cerr << __PRETTY_FUNCTION__ << "\n"; write(int32_t(N), out); out.write(s.data(), N); } @@ -361,7 +360,6 @@ template void read(std::vector & arr, ReadBuffer & in) template void ZooKeeper::write(const T & x) { - std::cerr << __PRETTY_FUNCTION__ << "\n"; ZooKeeperImpl::write(x, *out); } @@ -746,25 +744,6 @@ void ZooKeeper::receiveEvent() read(zxid); read(err); - if (xid == ping_xid) - { - if (err) - throw Exception("Received error in heartbeat response: " + String(errorMessage(err))); - return; - } - - if (xid == watch_xid) - { - WatchResponse response; - if (err) - response.error = err; - else - { - response.readImpl(*in); - response.removeRootPath(root_path); - } - } - RequestInfo request_info; ResponsePtr response; @@ -774,6 +753,8 @@ void ZooKeeper::receiveEvent() throw Exception("Received error in heartbeat response: " + String(errorMessage(err))); response = std::make_shared(); + + std::cerr << "Received heartbeat\n"; } else if (xid == watch_xid) { @@ -794,6 +775,8 @@ void ZooKeeper::receiveEvent() watches.erase(it); }; + + std::cerr << "Received watch\n"; } else { @@ -808,6 +791,8 @@ void ZooKeeper::receiveEvent() operations.erase(it); } + std::cerr << "Received response: " << request_info.request->getOpNum() << "\n"; + response = request_info.request->makeResponse(); } @@ -821,7 +806,7 @@ void ZooKeeper::receiveEvent() int32_t actual_length = in->count() - count_before_event; if (length != actual_length) - throw Exception("Response length doesn't match"); + throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length)); if (request_info.callback) request_info.callback(*response); @@ -886,7 +871,7 @@ void ZooKeeper::MultiRequest::writeImpl(WriteBuffer & out) const for (const auto & request : requests) { bool done = false; - int32_t error = 0; + int32_t error = -1; ZooKeeperImpl::write(request->getOpNum(), out); ZooKeeperImpl::write(done, out); @@ -959,6 +944,8 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) ZooKeeperImpl::read(done, in); ZooKeeperImpl::read(error, in); + std::cerr << "Received result for multi: " << op_num << "\n"; + if (done) throw Exception("Not enough results received for multi transaction"); @@ -1126,6 +1113,11 @@ void ZooKeeper::multi( MultiRequest request; request.requests = requests; + for (auto & elem : request.requests) + if (CreateRequest * create = typeid_cast(elem.get())) + if (create->acls.empty()) + create->acls = default_acls; + RequestInfo request_info; request_info.request = std::make_shared(std::move(request)); request_info.callback = [callback](const Response & response) { callback(typeid_cast(response)); }; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index 0811dae2b59..a7129351df2 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -174,7 +174,7 @@ public: struct RemoveRequest final : Request { String path; - int32_t version; + int32_t version = -1; OpNum getOpNum() const override { return 2; } void writeImpl(WriteBuffer &) const override; @@ -235,7 +235,7 @@ public: { String path; String data; - int32_t version; + int32_t version = -1; OpNum getOpNum() const override { return 5; } void writeImpl(WriteBuffer &) const override; @@ -277,7 +277,7 @@ public: struct CheckRequest final : Request { String path; - int32_t version; + int32_t version = -1; OpNum getOpNum() const override { return 13; } void writeImpl(WriteBuffer &) const override; diff --git a/dbms/src/Common/ZooKeeper/tests/CMakeLists.txt b/dbms/src/Common/ZooKeeper/tests/CMakeLists.txt index 53bfb75ee12..a24948240f6 100644 --- a/dbms/src/Common/ZooKeeper/tests/CMakeLists.txt +++ b/dbms/src/Common/ZooKeeper/tests/CMakeLists.txt @@ -1,6 +1,9 @@ add_executable(zkutil_test_commands zkutil_test_commands.cpp) target_link_libraries(zkutil_test_commands clickhouse_common_zookeeper) +add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp) +target_link_libraries(zkutil_test_commands_new_lib clickhouse_common_zookeeper) + add_executable(zkutil_test_lock zkutil_test_lock.cpp) target_link_libraries(zkutil_test_lock clickhouse_common_zookeeper) diff --git a/dbms/src/Common/ZooKeeper/tests/zkutil_test_commands_new_lib.cpp b/dbms/src/Common/ZooKeeper/tests/zkutil_test_commands_new_lib.cpp new file mode 100644 index 00000000000..3bb3f578fa8 --- /dev/null +++ b/dbms/src/Common/ZooKeeper/tests/zkutil_test_commands_new_lib.cpp @@ -0,0 +1,130 @@ +#include +#include +#include +#include +#include +#include + + +using namespace ZooKeeperImpl; + + +int main(int argc, char ** argv) +try +{ + if (argc < 2) + { + std::cerr << "Usage: ./zkutil_test_commands_new_lib host:port,host:port...\n"; + return 1; + } + + Poco::AutoPtr channel(new Poco::ConsoleChannel(std::cerr)); + Poco::Logger::root().setChannel(channel); + Poco::Logger::root().setLevel("trace"); + + std::string addresses_arg = argv[1]; + std::vector addresses_strings; + boost::split(addresses_strings, addresses_arg, boost::is_any_of(",")); + ZooKeeper::Addresses addresses; + addresses.reserve(addresses_strings.size()); + for (const auto & address_string : addresses_strings) + addresses.emplace_back(address_string); + + ZooKeeper zk(addresses, {}, {}, {}, {5, 0}, {0, 50000}); + + Strings children; + + std::cout << "create path" << '\n'; + + zk.create("/test", "old", false, false, {}, [](const ZooKeeper::CreateResponse & response) + { + if (response.error) + std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; + else + std::cerr << "Created path: " << response.path_created << '\n'; + }); + + std::cout << "get path" << '\n'; + + zk.get("/test", + [](const ZooKeeper::GetResponse & response) + { + if (response.error) + std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; + else + std::cerr << "Value: " << response.data << '\n'; + }, +/* [](const ZooKeeper::WatchResponse & response) + { + if (response.error) + std::cerr << "Watch, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; + else + std::cerr << "Watch, path: " << response.path << ", type: " << response.type << '\n'; + }*/ {}); + + std::cout << "set path" << '\n'; + + zk.set("/test", "new", -1, [](const ZooKeeper::SetResponse & response) + { + if (response.error) + std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; + else + std::cerr << "Set\n"; + }); + + std::cout << "remove path" << '\n'; + + zk.remove("/test", -1, [](const ZooKeeper::RemoveResponse & response) + { + if (response.error) + std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; + else + std::cerr << "Removed\n"; + }); + + std::cout << "multi" << '\n'; + + ZooKeeper::Requests ops; + + { + ZooKeeper::CreateRequest create_request; + create_request.path = "/test"; + create_request.data = "multi1"; + ops.emplace_back(std::make_shared(std::move(create_request))); + } + + { + ZooKeeper::SetRequest set_request; + set_request.path = "/test"; + set_request.data = "multi2"; + ops.emplace_back(std::make_shared(std::move(set_request))); + } + + { + ZooKeeper::RemoveRequest remove_request; + remove_request.path = "/test"; + ops.emplace_back(std::make_shared(std::move(remove_request))); + } + + zk.multi(ops, [](const ZooKeeper::MultiResponse & response) + { + if (response.error) + std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; + else + { + for (const auto & elem : response.responses) + if (elem->error) + std::cerr << "Error (elem) " << elem->error << ": " << ZooKeeper::errorMessage(elem->error) << '\n'; + + std::cerr << "Created path: " << typeid_cast(*response.responses[0]).path_created << '\n'; + } + }); + + sleep(5); + return 0; +} +catch (...) +{ + std::cerr << DB::getCurrentExceptionMessage(__PRETTY_FUNCTION__) << '\n'; + return 1; +}