Rewriting ZooKeeper library [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-19 02:11:57 +03:00
parent 347a83029a
commit 0d11b75def
4 changed files with 151 additions and 26 deletions

View File

@ -259,7 +259,6 @@ void write(const String & s, WriteBuffer & out)
template <size_t N> void write(std::array<char, N> s, WriteBuffer & out)
{
std::cerr << __PRETTY_FUNCTION__ << "\n";
write(int32_t(N), out);
out.write(s.data(), N);
}
@ -361,7 +360,6 @@ template <typename T> void read(std::vector<T> & arr, ReadBuffer & in)
template <typename T>
void ZooKeeper::write(const T & x)
{
std::cerr << __PRETTY_FUNCTION__ << "\n";
ZooKeeperImpl::write(x, *out);
}
@ -746,25 +744,6 @@ void ZooKeeper::receiveEvent()
read(zxid);
read(err);
if (xid == ping_xid)
{
if (err)
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)));
return;
}
if (xid == watch_xid)
{
WatchResponse response;
if (err)
response.error = err;
else
{
response.readImpl(*in);
response.removeRootPath(root_path);
}
}
RequestInfo request_info;
ResponsePtr response;
@ -774,6 +753,8 @@ void ZooKeeper::receiveEvent()
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)));
response = std::make_shared<HeartbeatResponse>();
std::cerr << "Received heartbeat\n";
}
else if (xid == watch_xid)
{
@ -794,6 +775,8 @@ void ZooKeeper::receiveEvent()
watches.erase(it);
};
std::cerr << "Received watch\n";
}
else
{
@ -808,6 +791,8 @@ void ZooKeeper::receiveEvent()
operations.erase(it);
}
std::cerr << "Received response: " << request_info.request->getOpNum() << "\n";
response = request_info.request->makeResponse();
}
@ -821,7 +806,7 @@ void ZooKeeper::receiveEvent()
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match");
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length));
if (request_info.callback)
request_info.callback(*response);
@ -886,7 +871,7 @@ void ZooKeeper::MultiRequest::writeImpl(WriteBuffer & out) const
for (const auto & request : requests)
{
bool done = false;
int32_t error = 0;
int32_t error = -1;
ZooKeeperImpl::write(request->getOpNum(), out);
ZooKeeperImpl::write(done, out);
@ -959,6 +944,8 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(error, in);
std::cerr << "Received result for multi: " << op_num << "\n";
if (done)
throw Exception("Not enough results received for multi transaction");
@ -1126,6 +1113,11 @@ void ZooKeeper::multi(
MultiRequest request;
request.requests = requests;
for (auto & elem : request.requests)
if (CreateRequest * create = typeid_cast<CreateRequest *>(elem.get()))
if (create->acls.empty())
create->acls = default_acls;
RequestInfo request_info;
request_info.request = std::make_shared<MultiRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const MultiResponse &>(response)); };

View File

@ -174,7 +174,7 @@ public:
struct RemoveRequest final : Request
{
String path;
int32_t version;
int32_t version = -1;
OpNum getOpNum() const override { return 2; }
void writeImpl(WriteBuffer &) const override;
@ -235,7 +235,7 @@ public:
{
String path;
String data;
int32_t version;
int32_t version = -1;
OpNum getOpNum() const override { return 5; }
void writeImpl(WriteBuffer &) const override;
@ -277,7 +277,7 @@ public:
struct CheckRequest final : Request
{
String path;
int32_t version;
int32_t version = -1;
OpNum getOpNum() const override { return 13; }
void writeImpl(WriteBuffer &) const override;

View File

@ -1,6 +1,9 @@
add_executable(zkutil_test_commands zkutil_test_commands.cpp)
target_link_libraries(zkutil_test_commands clickhouse_common_zookeeper)
add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp)
target_link_libraries(zkutil_test_commands_new_lib clickhouse_common_zookeeper)
add_executable(zkutil_test_lock zkutil_test_lock.cpp)
target_link_libraries(zkutil_test_lock clickhouse_common_zookeeper)

View File

@ -0,0 +1,130 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/typeid_cast.h>
#include <iostream>
#include <boost/algorithm/string.hpp>
using namespace ZooKeeperImpl;
int main(int argc, char ** argv)
try
{
if (argc < 2)
{
std::cerr << "Usage: ./zkutil_test_commands_new_lib host:port,host:port...\n";
return 1;
}
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
std::string addresses_arg = argv[1];
std::vector<std::string> addresses_strings;
boost::split(addresses_strings, addresses_arg, boost::is_any_of(","));
ZooKeeper::Addresses addresses;
addresses.reserve(addresses_strings.size());
for (const auto & address_string : addresses_strings)
addresses.emplace_back(address_string);
ZooKeeper zk(addresses, {}, {}, {}, {5, 0}, {0, 50000});
Strings children;
std::cout << "create path" << '\n';
zk.create("/test", "old", false, false, {}, [](const ZooKeeper::CreateResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Created path: " << response.path_created << '\n';
});
std::cout << "get path" << '\n';
zk.get("/test",
[](const ZooKeeper::GetResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Value: " << response.data << '\n';
},
/* [](const ZooKeeper::WatchResponse & response)
{
if (response.error)
std::cerr << "Watch, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Watch, path: " << response.path << ", type: " << response.type << '\n';
}*/ {});
std::cout << "set path" << '\n';
zk.set("/test", "new", -1, [](const ZooKeeper::SetResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Set\n";
});
std::cout << "remove path" << '\n';
zk.remove("/test", -1, [](const ZooKeeper::RemoveResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Removed\n";
});
std::cout << "multi" << '\n';
ZooKeeper::Requests ops;
{
ZooKeeper::CreateRequest create_request;
create_request.path = "/test";
create_request.data = "multi1";
ops.emplace_back(std::make_shared<ZooKeeper::CreateRequest>(std::move(create_request)));
}
{
ZooKeeper::SetRequest set_request;
set_request.path = "/test";
set_request.data = "multi2";
ops.emplace_back(std::make_shared<ZooKeeper::SetRequest>(std::move(set_request)));
}
{
ZooKeeper::RemoveRequest remove_request;
remove_request.path = "/test";
ops.emplace_back(std::make_shared<ZooKeeper::RemoveRequest>(std::move(remove_request)));
}
zk.multi(ops, [](const ZooKeeper::MultiResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
{
for (const auto & elem : response.responses)
if (elem->error)
std::cerr << "Error (elem) " << elem->error << ": " << ZooKeeper::errorMessage(elem->error) << '\n';
std::cerr << "Created path: " << typeid_cast<const ZooKeeper::CreateResponse &>(*response.responses[0]).path_created << '\n';
}
});
sleep(5);
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(__PRETTY_FUNCTION__) << '\n';
return 1;
}