ZooKeeper: Fixed error [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-04-08 07:25:13 +03:00
parent 1e7d616a58
commit 86317fe0f9
3 changed files with 117 additions and 108 deletions

2
contrib/zstd vendored

@ -1 +1 @@
Subproject commit f4340f46b2387bc8de7d5320c0b83bb1499933ad
Subproject commit 255597502c3a4ef150abc964e376d4202a8c2929

View File

@ -759,41 +759,6 @@ void ZooKeeper::sendThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(true, false);
}
/// Drain queue
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
try
{
info.callback(*response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
try
{
info.watch(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
@ -1030,102 +995,139 @@ void ZooKeeper::receiveEvent()
void ZooKeeper::finalize(bool error_send, bool error_receive)
{
bool expired_prev = false;
if (expired.compare_exchange_strong(expired_prev, true))
std::lock_guard lock(finalize_mutex);
if (expired)
return;
expired = true;
active_session_metric_increment.destroy();
try
{
active_session_metric_increment.destroy();
try
if (!error_send)
{
if (!error_send)
{
/// Send close event. This also signals sending thread to wakeup and then stop.
try
{
close();
}
catch (...)
{
/// This happens for example, when "Cannot push request to queue within operation timeout".
tryLogCurrentException(__PRETTY_FUNCTION__);
}
send_thread.join();
}
/// Send close event. This also signals sending thread to wakeup and then stop.
try
{
/// This will also wakeup the receiving thread.
socket.shutdown();
close();
}
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
/// This happens for example, when "Cannot push request to queue within operation timeout".
tryLogCurrentException(__PRETTY_FUNCTION__);
}
send_thread.join();
}
if (!error_receive)
receive_thread.join();
try
{
/// This will also wakeup the receiving thread.
socket.shutdown();
}
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!error_receive)
receive_thread.join();
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
try
{
request_info.callback(*response);
}
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
{
if (callback)
{
try
{
request_info.callback(*response);
callback(response);
}
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
/// Drain queue
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
try
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
{
if (callback)
{
try
{
callback(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
info.callback(*response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
try
{
info.watch(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -1329,10 +1331,6 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
void ZooKeeper::pushRequest(RequestInfo && info)
{
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired && info.request->xid != close_xid)
throw Exception("Session expired", ZSESSIONEXPIRED);
try
{
info.request->addRootPath(root_path);
@ -1346,7 +1344,15 @@ void ZooKeeper::pushRequest(RequestInfo && info)
throw Exception("XID overflow", ZSESSIONEXPIRED);
}
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(finalize_mutex);
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired && info.request->xid != close_xid)
throw Exception("Session expired", ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
@ -1356,6 +1362,8 @@ void ZooKeeper::pushRequest(RequestInfo && info)
finalize(false, false);
throw;
}
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
}

View File

@ -570,7 +570,10 @@ private:
std::optional<WriteBufferFromPocoSocket> out;
int64_t session_id = 0;
std::atomic<XID> xid {1};
std::atomic<bool> expired {false};
std::mutex finalize_mutex;
using clock = std::chrono::steady_clock;
@ -601,8 +604,6 @@ private:
std::thread send_thread;
std::thread receive_thread;
std::atomic<bool> expired {false};
void connect(
const Addresses & addresses,
Poco::Timespan connection_timeout);