Rewriting ZooKeeper library [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-19 03:32:59 +03:00
parent 0d11b75def
commit 5146ab8acc
2 changed files with 170 additions and 55 deletions

View File

@ -374,9 +374,9 @@ static constexpr int32_t protocol_version = 0;
static constexpr ZooKeeper::XID watch_xid = -1;
static constexpr ZooKeeper::XID ping_xid = -2;
//static constexpr ZooKeeper::XID auth_xid = -4;
static constexpr ZooKeeper::XID auth_xid = -4;
static constexpr ZooKeeper::XID close_xid = -3;
static constexpr ZooKeeper::XID close_xid = 0x7FFFFFFF;
const char * ZooKeeper::errorMessage(int32_t code)
@ -417,16 +417,25 @@ const char * ZooKeeper::errorMessage(int32_t code)
ZooKeeper::~ZooKeeper()
{
stop = true;
try
{
stop = true;
if (send_thread.joinable())
send_thread.join();
if (send_thread.joinable())
send_thread.join();
if (receive_thread.joinable())
receive_thread.join();
if (receive_thread.joinable())
receive_thread.join();
if (!expired)
close();
if (!expired)
close();
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -564,10 +573,35 @@ void ZooKeeper::receiveHandshake()
}
/*void ZooKeeper::sendAuth(XID xid, const String & auth_scheme, const String & auth_data)
void ZooKeeper::sendAuth(const String & scheme, const String & data)
{
// TODO
}*/
AuthRequest request;
request.scheme = scheme;
request.data = data;
request.xid = auth_xid;
request.write(*out);
int32_t length;
int32_t xid;
int64_t zxid;
int32_t err;
read(length);
size_t count_before_event = in->count();
read(xid);
read(zxid);
read(err);
if (xid != auth_xid)
throw Exception("Unexpected event recievent in reply to auth request: " + toString(xid));
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length));
if (err)
throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)));
}
void ZooKeeper::close()
@ -581,7 +615,6 @@ void ZooKeeper::close()
void ZooKeeper::sendThread()
{
XID xid = 2; /// TODO deal with xid overflow /// NOTE xid = 1 is reserved for auth request.
auto prev_heartbeat_time = std::chrono::steady_clock::now();
try
@ -594,25 +627,10 @@ void ZooKeeper::sendThread()
if (next_heartbeat_time > now)
max_wait = std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count();
RequestInfo request_info;
if (requests.tryPop(request_info, max_wait))
RequestPtr request;
if (requests.tryPop(request, max_wait))
{
request_info.request->addRootPath(root_path);
request_info.request->xid = xid;
{
std::lock_guard lock(operations_mutex);
operations[xid] = request_info;
}
if (request_info.watch)
{
request_info.request->has_watch = true;
std::lock_guard lock(watches_mutex);
watches[request_info.request->getPath()].emplace_back(std::move(request_info.watch));
}
request_info.request->write(*out);
request->write(*out);
}
else
{
@ -622,8 +640,6 @@ void ZooKeeper::sendThread()
request.xid = ping_xid;
request.write(*out);
}
++xid;
}
}
catch (...)
@ -671,6 +687,7 @@ void ZooKeeper::Request::write(WriteBuffer & out) const
ZooKeeper::ResponsePtr ZooKeeper::HeartbeatRequest::makeResponse() const { return std::make_shared<HeartbeatResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::AuthRequest::makeResponse() const { return std::make_shared<AuthResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::CreateRequest::makeResponse() const { return std::make_shared<CreateResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::RemoveRequest::makeResponse() const { return std::make_shared<RemoveResponse>(); }
ZooKeeper::ResponsePtr ZooKeeper::ExistsRequest::makeResponse() const { return std::make_shared<ExistsResponse>(); }
@ -771,7 +788,8 @@ void ZooKeeper::receiveEvent()
throw Exception("Received event for unknown watch");
for (auto & callback : it->second)
callback(watch_response);
if (callback)
callback(watch_response);
watches.erase(it);
};
@ -813,6 +831,49 @@ void ZooKeeper::receiveEvent()
}
void ZooKeeper::finalize()
{
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZCONNECTIONLOSS;
if (request_info.callback)
request_info.callback(*response);
}
operations.clear();
}
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZCONNECTIONLOSS;
for (auto & callback : path_watches.second)
if (callback)
callback(response);
}
watches.clear();
}
}
void ZooKeeper::AuthRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperImpl::write(type, out);
ZooKeeperImpl::write(scheme, out);
ZooKeeperImpl::write(data, out);
}
void ZooKeeper::CreateRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperImpl::write(path, out);
@ -974,7 +1035,24 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
void ZooKeeper::pushRequest(RequestInfo && info)
{
if (!requests.tryPush(info, session_timeout.totalMilliseconds()))
if (expired)
throw Exception("Session expired");
info.request->addRootPath(root_path);
info.request->xid = xid.fetch_add(1);
{
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (info.watch)
{
info.request->has_watch = true;
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (!requests.tryPush(info.request, session_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within session timeout");
}

View File

@ -138,6 +138,23 @@ public:
using WatchCallback = std::function<void(const WatchResponse &)>;
struct AuthRequest final : Request
{
int32_t type = 0; /// ignored by the server
String scheme;
String data;
OpNum getOpNum() const override { return 100; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
String getPath() const override { return {}; }
};
struct AuthResponse final : Response
{
void readImpl(ReadBuffer &) override {};
};
struct CloseRequest final : Request
{
OpNum getOpNum() const override { return -11; }
@ -169,8 +186,6 @@ public:
void removeRootPath(const String & root_path) override;
};
using CreateCallback = std::function<void(const CreateResponse &)>;
struct RemoveRequest final : Request
{
String path;
@ -188,8 +203,6 @@ public:
void readImpl(ReadBuffer &) override {}
};
using RemoveCallback = std::function<void(const RemoveResponse &)>;
struct ExistsRequest final : Request
{
String path;
@ -208,8 +221,6 @@ public:
void readImpl(ReadBuffer &) override;
};
using ExistsCallback = std::function<void(const ExistsResponse &)>;
struct GetRequest final : Request
{
String path;
@ -229,8 +240,6 @@ public:
void readImpl(ReadBuffer &) override;
};
using GetCallback = std::function<void(const GetResponse &)>;
struct SetRequest final : Request
{
String path;
@ -251,8 +260,6 @@ public:
void readImpl(ReadBuffer &) override;
};
using SetCallback = std::function<void(const SetResponse &)>;
struct ListRequest final : Request
{
String path;
@ -272,8 +279,6 @@ public:
void readImpl(ReadBuffer &) override;
};
using ListCallback = std::function<void(const ListResponse &)>;
struct CheckRequest final : Request
{
String path;
@ -291,8 +296,6 @@ public:
void readImpl(ReadBuffer &) override {};
};
using CheckCallback = std::function<void(const CheckResponse &)>;
struct MultiRequest final : Request
{
Requests requests;
@ -314,8 +317,6 @@ public:
void removeRootPath(const String & root_path) override;
};
using MultiCallback = std::function<void(const MultiResponse &)>;
/// Connection to addresses is performed in order. If you want, shuffle them manually.
ZooKeeper(
@ -331,6 +332,16 @@ public:
/// If not valid, you can only destroy the object. All other methods will throw exception.
bool isValid() const { return !expired; }
using CreateCallback = std::function<void(const CreateResponse &)>;
using RemoveCallback = std::function<void(const RemoveResponse &)>;
using ExistsCallback = std::function<void(const ExistsResponse &)>;
using GetCallback = std::function<void(const GetResponse &)>;
using SetCallback = std::function<void(const SetResponse &)>;
using ListCallback = std::function<void(const ListResponse &)>;
using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
void create(
const String & path,
const String & data,
@ -377,7 +388,7 @@ public:
void close();
enum ZOO_ERRORS
enum Error
{
ZOK = 0,
@ -421,6 +432,27 @@ public:
static const char * errorMessage(int32_t code);
/// For watches.
enum State
{
EXPIRED_SESSION = -112,
AUTH_FAILED = -113,
CONNECTING = 1,
ASSOCIATING = 2,
CONNECTED = 3,
NOTCONNECTED = 999
};
enum Event
{
CREATED = 1,
DELETED = 2,
CHANGED = 3,
CHILD = 4,
SESSION = -1,
NOTWATCHING = -2
};
private:
String root_path;
ACLs default_acls;
@ -431,6 +463,8 @@ private:
std::optional<ReadBufferFromPocoSocket> in;
std::optional<WriteBufferFromPocoSocket> out;
std::atomic<XID> xid {1}; /// TODO deal with xid overflow
struct RequestInfo
{
RequestPtr request;
@ -438,10 +472,10 @@ private:
WatchCallback watch;
};
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
using RequestsQueue = ConcurrentBoundedQueue<RequestPtr>;
RequestsQueue requests{1};
void pushRequest(RequestInfo && info);
void pushRequest(RequestInfo && request);
using Operations = std::map<XID, RequestInfo>;
@ -467,13 +501,16 @@ private:
void sendHandshake();
void receiveHandshake();
void sendAuth(const String & auth_scheme, const String & auth_data);
void sendAuth(const String & scheme, const String & data);
void receiveEvent();
void sendThread();
void receiveThread();
/// Call all remaining callbacks and watches, passing errors to them.
void finalize();
template <typename T>
void write(const T &);