Rewriting ZooKeeper library [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-19 00:53:56 +03:00
parent c68af7f09a
commit 347a83029a
3 changed files with 315 additions and 31 deletions

View File

@ -132,6 +132,97 @@ state \x00\x00\x00\x03 CONNECTED_STATE_DEF 3
path \x00\x00\x00\x05
\x2f\x74\x65\x73\x74 /test
Example of multi request:
request length \x00\x00\x00\x82 130
xid \x5a\xae\xd6\x16
op_num \x00\x00\x00\x0e 14
for every command:
int32_t type; \x00\x00\x00\x01 create
bool done; \x00 false
int32_t err; \xff\xff\xff\xff -1
path \x00\x00\x00\x05
\x2f\x74\x65\x73\x74 /test
data \x00\x00\x00\x06
\x6d\x75\x6c\x74\x69\x31 multi1
acl \x00\x00\x00\x01
\x00\x00\x00\x1f
\x00\x00\x00\x05
\x77\x6f\x72\x6c\x64 world
\x00\x00\x00\x06
\x61\x6e\x79\x6f\x6e\x65 anyone
flags \x00\x00\x00\x00
int32_t type; \x00\x00\x00\x05 set
bool done \x00 false
int32_t err; \xff\xff\xff\xff -1
path \x00\x00\x00\x05
\x2f\x74\x65\x73\x74
data \x00\x00\x00\x06
\x6d\x75\x6c\x74\x69\x32 multi2
version \xff\xff\xff\xff
int32_t type \x00\x00\x00\x02 remove
bool done \x00
int32_t err \xff\xff\xff\xff -1
path \x00\x00\x00\x05
\x2f\x74\x65\x73\x74
version \xff\xff\xff\xff
after commands:
int32_t type \xff\xff\xff\xff -1
bool done \x01 true
int32_t err \xff\xff\xff\xff
Example of multi response:
response length \x00\x00\x00\x81 129
xid \x5a\xae\xd6\x16
zxid \x00\x00\x00\x00\x00\x01\x87\xe1
err \x00\x00\x00\x00
in a loop:
type \x00\x00\x00\x01 create
done \x00
err \x00\x00\x00\x00
path_created \x00\x00\x00\x05
\x2f\x74\x65\x73\x74
type \x00\x00\x00\x05 set
done \x00
err \x00\x00\x00\x00
stat \x00\x00\x00\x00\x00\x01\x87\xe1
\x00\x00\x00\x00\x00\x01\x87\xe1
\x00\x00\x01\x62\x3a\xf4\x35\x0c
\x00\x00\x01\x62\x3a\xf4\x35\x0c
\x00\x00\x00\x01
\x00\x00\x00\x00
\x00\x00\x00\x00
\x00\x00\x00\x00\x00\x00\x00\x00
\x00\x00\x00\x06
\x00\x00\x00\x00
\x00\x00\x00\x00\x00\x01\x87\xe1
type \x00\x00\x00\x02 remove
done \x00
err \x00\x00\x00\x00
after:
type \xff\xff\xff\xff
done \x01
err \xff\xff\xff\xff
*/
@ -155,6 +246,11 @@ void write(int32_t x, WriteBuffer & out)
writeBinary(x, out);
}
void write(bool x, WriteBuffer & out)
{
writeBinary(x, out);
}
void write(const String & s, WriteBuffer & out)
{
write(int32_t(s.size()), out);
@ -200,6 +296,11 @@ void read(int32_t & x, ReadBuffer & in)
x = __builtin_bswap32(x);
}
void read(bool & x, ReadBuffer & in)
{
readBinary(x, in);
}
void read(String & s, ReadBuffer & in)
{
static constexpr int32_t max_string_size = 1 << 20;
@ -508,6 +609,7 @@ void ZooKeeper::sendThread()
if (request_info.watch)
{
request_info.request->has_watch = true;
std::lock_guard lock(watches_mutex);
watches[request_info.request->getPath()].emplace_back(std::move(request_info.watch));
}
@ -577,6 +679,8 @@ ZooKeeper::ResponsePtr ZooKeeper::ExistsRequest::makeResponse() const { return s
ZooKeeper::ResponsePtr ZooKeeper::GetRequest::makeResponse() const { return std::make_shared<GetResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::SetRequest::makeResponse() const { return std::make_shared<SetResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::ListRequest::makeResponse() const { return std::make_shared<ListResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::CheckRequest::makeResponse() const { return std::make_shared<CheckResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::MultiRequest::makeResponse() const { return std::make_shared<MultiResponse>(requests); }
ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { throw Exception("Received response for close request"); }
void addRootPath(String & path, const String & root_path)
@ -611,10 +715,22 @@ void ZooKeeper::ExistsRequest::addRootPath(const String & root_path) { ZooKeeper
void ZooKeeper::GetRequest::addRootPath(const String & root_path) { ZooKeeperImpl::addRootPath(path, root_path); }
void ZooKeeper::SetRequest::addRootPath(const String & root_path) { ZooKeeperImpl::addRootPath(path, root_path); }
void ZooKeeper::ListRequest::addRootPath(const String & root_path) { ZooKeeperImpl::addRootPath(path, root_path); }
void ZooKeeper::CheckRequest::addRootPath(const String & root_path) { ZooKeeperImpl::addRootPath(path, root_path); }
void ZooKeeper::MultiRequest::addRootPath(const String & root_path)
{
for (auto & request : requests)
request->addRootPath(root_path);
}
void ZooKeeper::CreateResponse::removeRootPath(const String & root_path) { ZooKeeperImpl::removeRootPath(path_created, root_path); }
void ZooKeeper::WatchResponse::removeRootPath(const String & root_path) { ZooKeeperImpl::removeRootPath(path, root_path); }
void ZooKeeper::MultiResponse::removeRootPath(const String & root_path)
{
for (auto & response : responses)
response->removeRootPath(root_path);
}
void ZooKeeper::receiveEvent()
@ -737,11 +853,13 @@ void ZooKeeper::RemoveRequest::writeImpl(WriteBuffer & out) const
void ZooKeeper::ExistsRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperImpl::write(path, out);
ZooKeeperImpl::write(has_watch, out);
}
void ZooKeeper::GetRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperImpl::write(path, out);
ZooKeeperImpl::write(has_watch, out);
}
void ZooKeeper::SetRequest::writeImpl(WriteBuffer & out) const
@ -754,6 +872,36 @@ void ZooKeeper::SetRequest::writeImpl(WriteBuffer & out) const
void ZooKeeper::ListRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperImpl::write(path, out);
ZooKeeperImpl::write(has_watch, out);
}
void ZooKeeper::CheckRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperImpl::write(path, out);
ZooKeeperImpl::write(version, out);
}
void ZooKeeper::MultiRequest::writeImpl(WriteBuffer & out) const
{
for (const auto & request : requests)
{
bool done = false;
int32_t error = 0;
ZooKeeperImpl::write(request->getOpNum(), out);
ZooKeeperImpl::write(done, out);
ZooKeeperImpl::write(error, out);
request->writeImpl(out);
}
OpNum op_num = -1;
bool done = true;
int32_t error = -1;
ZooKeeperImpl::write(op_num, out);
ZooKeeperImpl::write(done, out);
ZooKeeperImpl::write(error, out);
}
@ -791,6 +939,51 @@ void ZooKeeper::ListResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(names, in);
}
ZooKeeper::MultiResponse::MultiResponse(const Requests & requests)
{
responses.reserve(requests.size());
for (const auto & request : requests)
responses.emplace_back(request->makeResponse());
}
void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
{
for (const auto & response : responses)
{
OpNum op_num;
bool done;
int32_t error;
ZooKeeperImpl::read(op_num, in);
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(error, in);
if (done)
throw Exception("Not enough results received for multi transaction");
if (error)
response->error = error;
else
response->readImpl(in);
}
OpNum op_num;
bool done;
int32_t error;
ZooKeeperImpl::read(op_num, in);
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(error, in);
if (!done)
throw Exception("Too many results received for multi transaction");
if (op_num != -1)
throw Exception("Unexpected op_num received at the end of results for multi transaction");
if (error != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction");
}
void ZooKeeper::pushRequest(RequestInfo && info)
{
@ -909,4 +1102,36 @@ void ZooKeeper::list(
}
void ZooKeeper::check(
const String & path,
int32_t version,
CheckCallback callback)
{
CheckRequest request;
request.path = path;
request.version = version;
RequestInfo request_info;
request_info.request = std::make_shared<CheckRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const CheckResponse &>(response)); };
pushRequest(std::move(request_info));
}
void ZooKeeper::multi(
const Requests & requests,
MultiCallback callback)
{
MultiRequest request;
request.requests = requests;
RequestInfo request_info;
request_info.request = std::make_shared<MultiRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const MultiResponse &>(response)); };
pushRequest(std::move(request_info));
}
}

View File

@ -78,6 +78,7 @@ public:
using XID = int32_t;
using OpNum = int32_t;
struct Response
{
int32_t error = 0;
@ -88,11 +89,13 @@ public:
};
using ResponsePtr = std::shared_ptr<Response>;
using Responses = std::vector<ResponsePtr>;
using ResponseCallback = std::function<void(const Response &)>;
struct Request
{
XID xid;
bool has_watch = false;
virtual ~Request() {};
virtual OpNum getOpNum() const = 0;
@ -271,6 +274,49 @@ public:
using ListCallback = std::function<void(const ListResponse &)>;
struct CheckRequest final : Request
{
String path;
int32_t version;
OpNum getOpNum() const override { return 13; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct CheckResponse final : Response
{
void readImpl(ReadBuffer &) override {};
};
using CheckCallback = std::function<void(const CheckResponse &)>;
struct MultiRequest final : Request
{
Requests requests;
OpNum getOpNum() const override { return 14; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
};
struct MultiResponse final : Response
{
Responses responses;
MultiResponse(const Requests & requests);
void readImpl(ReadBuffer &) override;
void removeRootPath(const String & root_path) override;
};
using MultiCallback = std::function<void(const MultiResponse &)>;
/// Connection to addresses is performed in order. If you want, shuffle them manually.
ZooKeeper(
const Addresses & addresses,
@ -319,7 +365,14 @@ public:
ListCallback callback,
WatchCallback watch);
void multi();
void check(
const String & path,
int32_t version,
CheckCallback callback);
void multi(
const Requests & requests,
MultiCallback callback);
void close();

View File

@ -4,11 +4,16 @@
using namespace zkutil;
int main()
{
int main(int argc, char ** argv)
try
{
ZooKeeper zk("mtfilter01t:2181,metrika-test:2181,mtweb01t:2181", "", 5000);
if (argc < 2)
{
std::cerr << "Usage: ./zkutil_test_commands host:port,host:port...\n";
return 1;
}
ZooKeeper zk(argv[1], "", 5000);
Strings children;
std::cout << "create path" << std::endl;
@ -32,10 +37,11 @@ int main()
std::cout << "multi" << std::endl;
OpResultsPtr res = zk.multi(ops);
std::cout << "path created: " << dynamic_cast<Op::Create &>(*ops[0]).getPathCreated() << std::endl;
return 0;
}
catch (KeeperException & e)
{
std::cerr << "KeeperException " << e.what() << " " << e.message() << std::endl;
}
return 0;
return 1;
}