Rewriting ZooKeeper library [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-19 22:07:50 +03:00
parent 72ededeab5
commit 831d38fd59
3 changed files with 123 additions and 52 deletions

View File

@ -419,17 +419,20 @@ ZooKeeper::~ZooKeeper()
{
try
{
stop = true;
/// Send close event. This also signals sending thread to wakeup and then stop.
if (!expired)
close();
if (send_thread.joinable())
send_thread.join();
/// This will also wakeup receiving event.
socket.shutdown();
if (receive_thread.joinable())
receive_thread.join();
if (!expired)
close();
/// Fire all remaining callbacks and watches.
finalize();
}
catch (...)
@ -604,22 +607,13 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
}
void ZooKeeper::close()
{
CloseRequest request;
request.xid = close_xid;
request.write(*out);
expired = true;
}
void ZooKeeper::sendThread()
{
auto prev_heartbeat_time = std::chrono::steady_clock::now();
try
{
while (!stop)
while (!expired)
{
auto now = std::chrono::steady_clock::now();
auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(session_timeout.totalMilliseconds() / 3);
@ -631,6 +625,9 @@ void ZooKeeper::sendThread()
if (requests.tryPop(request, max_wait))
{
request->write(*out);
if (request->xid == close_xid)
break;
}
else
{
@ -645,11 +642,14 @@ void ZooKeeper::sendThread()
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
expired = true;
stop = true;
}
/// TODO drain queue
expired = true;
/// Drain queue
RequestPtr request;
while (requests.tryPop(request))
;
}
@ -657,20 +657,23 @@ void ZooKeeper::receiveThread()
{
try
{
while (!stop)
while (!expired)
{
if (!in->poll(session_timeout.totalMicroseconds()))
throw Exception("Nothing is received in session timeout");
if (expired)
break;
receiveEvent();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
expired = true;
stop = true;
}
expired = true;
}
@ -696,7 +699,7 @@ ZooKeeper::ResponsePtr ZooKeeper::SetRequest::makeResponse() const { return std:
ZooKeeper::ResponsePtr ZooKeeper::ListRequest::makeResponse() const { return std::make_shared<ListResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::CheckRequest::makeResponse() const { return std::make_shared<CheckResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::MultiRequest::makeResponse() const { return std::make_shared<MultiResponse>(requests); }
ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { throw Exception("Received response for close request"); }
ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { return std::make_shared<CloseResponse>(); }
void addRootPath(String & path, const String & root_path)
{
@ -996,6 +999,20 @@ void ZooKeeper::ListResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(stat, in);
}
void ZooKeeper::ErrorResponse::readImpl(ReadBuffer & in)
{
int32_t read_error;
ZooKeeperImpl::read(read_error, in);
if (read_error != error)
throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")");
}
void ZooKeeper::CloseResponse::readImpl(ReadBuffer &)
{
throw Exception("Received response for close request");
}
ZooKeeper::MultiResponse::MultiResponse(const Requests & requests)
{
responses.reserve(requests.size());
@ -1006,7 +1023,43 @@ ZooKeeper::MultiResponse::MultiResponse(const Requests & requests)
void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
{
for (const auto & response : responses)
for (auto & response : responses)
{
OpNum op_num;
bool done;
int32_t op_error;
ZooKeeperImpl::read(op_num, in);
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(op_error, in);
std::cerr << "Received result for multi: " << op_num << "\n";
if (done)
throw Exception("Not enough results received for multi transaction");
/// op_num == -1 is special for multi transaction.
/// For unknown reason, error code is duplicated in header and in response body.
if (op_num == -1)
response = std::make_shared<ErrorResponse>();
if (op_error)
{
response->error = op_error;
/// Set error for whole transaction.
/// If some operations fail, ZK send global error as zero and then send details about each operation.
/// It will set error code for first failed operation and it will set special "runtime inconsistency" code for other operations.
if (!error && op_error != ZRUNTIMEINCONSISTENCY)
error = op_error;
}
if (!op_error || op_num == -1)
response->readImpl(in);
}
/// Footer.
{
OpNum op_num;
bool done;
@ -1016,31 +1069,13 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(error, in);
std::cerr << "Received result for multi: " << op_num << "\n";
if (done)
throw Exception("Not enough results received for multi transaction");
if (error)
response->error = error;
else
response->readImpl(in);
if (!done)
throw Exception("Too many results received for multi transaction");
if (op_num != -1)
throw Exception("Unexpected op_num received at the end of results for multi transaction");
if (error != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction");
}
OpNum op_num;
bool done;
int32_t error;
ZooKeeperImpl::read(op_num, in);
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(error, in);
if (!done)
throw Exception("Too many results received for multi transaction");
if (op_num != -1)
throw Exception("Unexpected op_num received at the end of results for multi transaction");
if (error != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction");
}
@ -1050,7 +1085,10 @@ void ZooKeeper::pushRequest(RequestInfo && info)
throw Exception("Session expired");
info.request->addRootPath(root_path);
info.request->xid = xid.fetch_add(1);
if (!info.request->xid)
info.request->xid = xid.fetch_add(1);
{
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
@ -1215,4 +1253,16 @@ void ZooKeeper::multi(
}
void ZooKeeper::close()
{
CloseRequest request;
request.xid = close_xid;
RequestInfo request_info;
request_info.request = std::make_shared<CloseRequest>(std::move(request));
pushRequest(std::move(request_info));
}
}

View File

@ -96,7 +96,7 @@ public:
struct Request
{
XID xid;
XID xid = 0;
bool has_watch = false;
virtual ~Request() {};
@ -165,6 +165,11 @@ public:
String getPath() const override { return {}; }
};
struct CloseResponse final : Response
{
void readImpl(ReadBuffer &) override;
};
struct CreateRequest final : Request
{
String path;
@ -319,6 +324,12 @@ public:
void removeRootPath(const String & root_path) override;
};
/// This response may be received only as an element of responses in MultiResponse.
struct ErrorResponse final : Response
{
void readImpl(ReadBuffer &) override;
};
/// Connection to addresses is performed in order. If you want, shuffle them manually.
ZooKeeper(
@ -493,7 +504,6 @@ private:
std::thread send_thread;
std::thread receive_thread;
std::atomic<bool> stop {false};
std::atomic<bool> expired {false};
void connect(

View File

@ -1,5 +1,6 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/Event.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/typeid_cast.h>
#include <iostream>
@ -114,14 +115,22 @@ try
std::cout << "remove\n";
zk.remove("/test", -1, [](const ZooKeeper::RemoveResponse & response)
Poco::Event event(true);
zk.remove("/test", -1, [&](const ZooKeeper::RemoveResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Removed\n";
event.set();
});
event.wait();
/// Surprising enough, ZooKeeper can execute multi transaction out of order. So, we must to wait for "remove" to execute before sending "multi".
std::cout << "multi\n";
ZooKeeper::Requests ops;
@ -146,7 +155,7 @@ try
ops.emplace_back(std::make_shared<ZooKeeper::RemoveRequest>(std::move(remove_request)));
}
zk.multi(ops, [](const ZooKeeper::MultiResponse & response)
zk.multi(ops, [&](const ZooKeeper::MultiResponse & response)
{
if (response.error)
std::cerr << "Error (multi) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
@ -158,9 +167,11 @@ try
std::cerr << "Created path: " << typeid_cast<const ZooKeeper::CreateResponse &>(*response.responses[0]).path_created << '\n';
}
event.set();
});
sleep(5);
event.wait();
return 0;
}
catch (...)