From 86317fe0f966fe8d2b96a355cc630c6c3f62fd04 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 8 Apr 2018 07:25:13 +0300 Subject: [PATCH] ZooKeeper: Fixed error [#CLICKHOUSE-2] --- contrib/zstd | 2 +- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 218 ++++++++++---------- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 5 +- 3 files changed, 117 insertions(+), 108 deletions(-) diff --git a/contrib/zstd b/contrib/zstd index f4340f46b23..255597502c3 160000 --- a/contrib/zstd +++ b/contrib/zstd @@ -1 +1 @@ -Subproject commit f4340f46b2387bc8de7d5320c0b83bb1499933ad +Subproject commit 255597502c3a4ef150abc964e376d4202a8c2929 diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 37ff3078d75..9684b4a8b3a 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -759,41 +759,6 @@ void ZooKeeper::sendThread() tryLogCurrentException(__PRETTY_FUNCTION__); finalize(true, false); } - - /// Drain queue - RequestInfo info; - while (requests_queue.tryPop(info)) - { - if (info.callback) - { - ResponsePtr response = info.request->makeResponse(); - response->error = ZSESSIONEXPIRED; - try - { - info.callback(*response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - - } - if (info.watch) - { - WatchResponse response; - response.type = SESSION; - response.state = EXPIRED_SESSION; - response.error = ZSESSIONEXPIRED; - try - { - info.watch(response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } } @@ -1030,102 +995,139 @@ void ZooKeeper::receiveEvent() void ZooKeeper::finalize(bool error_send, bool error_receive) { - bool expired_prev = false; - if (expired.compare_exchange_strong(expired_prev, true)) + std::lock_guard lock(finalize_mutex); + + if (expired) + return; + expired = true; + + active_session_metric_increment.destroy(); + + try { - active_session_metric_increment.destroy(); - - try + if (!error_send) { - if (!error_send) - { - /// Send close event. This also signals sending thread to wakeup and then stop. - try - { - close(); - } - catch (...) - { - /// This happens for example, when "Cannot push request to queue within operation timeout". - tryLogCurrentException(__PRETTY_FUNCTION__); - } - send_thread.join(); - } - + /// Send close event. This also signals sending thread to wakeup and then stop. try { - /// This will also wakeup the receiving thread. - socket.shutdown(); + close(); } catch (...) { - /// We must continue to execute all callbacks, because the user is waiting for them. + /// This happens for example, when "Cannot push request to queue within operation timeout". tryLogCurrentException(__PRETTY_FUNCTION__); } + send_thread.join(); + } - if (!error_receive) - receive_thread.join(); + try + { + /// This will also wakeup the receiving thread. + socket.shutdown(); + } + catch (...) + { + /// We must continue to execute all callbacks, because the user is waiting for them. + tryLogCurrentException(__PRETTY_FUNCTION__); + } + if (!error_receive) + receive_thread.join(); + + { + std::lock_guard lock(operations_mutex); + + for (auto & op : operations) { - std::lock_guard lock(operations_mutex); - - for (auto & op : operations) + RequestInfo & request_info = op.second; + ResponsePtr response = request_info.request->makeResponse(); + response->error = ZSESSIONEXPIRED; + if (request_info.callback) { - RequestInfo & request_info = op.second; - ResponsePtr response = request_info.request->makeResponse(); - response->error = ZSESSIONEXPIRED; - if (request_info.callback) + try + { + request_info.callback(*response); + } + catch (...) + { + /// We must continue to all other callbacks, because the user is waiting for them. + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + } + + CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size()); + operations.clear(); + } + + { + std::lock_guard lock(watches_mutex); + + for (auto & path_watches : watches) + { + WatchResponse response; + response.type = SESSION; + response.state = EXPIRED_SESSION; + response.error = ZSESSIONEXPIRED; + + for (auto & callback : path_watches.second) + { + if (callback) { try { - request_info.callback(*response); + callback(response); } catch (...) { - /// We must continue to all other callbacks, because the user is waiting for them. tryLogCurrentException(__PRETTY_FUNCTION__); } } } - - CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size()); - operations.clear(); } + CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size()); + watches.clear(); + } + + /// Drain queue + RequestInfo info; + while (requests_queue.tryPop(info)) + { + if (info.callback) { - std::lock_guard lock(watches_mutex); - - for (auto & path_watches : watches) + ResponsePtr response = info.request->makeResponse(); + response->error = ZSESSIONEXPIRED; + try { - WatchResponse response; - response.type = SESSION; - response.state = EXPIRED_SESSION; - response.error = ZSESSIONEXPIRED; - - for (auto & callback : path_watches.second) - { - if (callback) - { - try - { - callback(response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } + info.callback(*response); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); } - CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size()); - watches.clear(); + } + if (info.watch) + { + WatchResponse response; + response.type = SESSION; + response.state = EXPIRED_SESSION; + response.error = ZSESSIONEXPIRED; + try + { + info.watch(response); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } } } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -1329,10 +1331,6 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) void ZooKeeper::pushRequest(RequestInfo && info) { - /// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop. - if (expired && info.request->xid != close_xid) - throw Exception("Session expired", ZSESSIONEXPIRED); - try { info.request->addRootPath(root_path); @@ -1346,7 +1344,15 @@ void ZooKeeper::pushRequest(RequestInfo && info) throw Exception("XID overflow", ZSESSIONEXPIRED); } - ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); + /// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls + /// to avoid forgotten operations in the queue when session is expired. + /// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest' + /// and the queue will be drained in 'finalize'. + std::lock_guard lock(finalize_mutex); + + /// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop. + if (expired && info.request->xid != close_xid) + throw Exception("Session expired", ZSESSIONEXPIRED); if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); @@ -1356,6 +1362,8 @@ void ZooKeeper::pushRequest(RequestInfo && info) finalize(false, false); throw; } + + ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index b86e2bbf1e2..8a65d09b529 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -570,7 +570,10 @@ private: std::optional out; int64_t session_id = 0; + std::atomic xid {1}; + std::atomic expired {false}; + std::mutex finalize_mutex; using clock = std::chrono::steady_clock; @@ -601,8 +604,6 @@ private: std::thread send_thread; std::thread receive_thread; - std::atomic expired {false}; - void connect( const Addresses & addresses, Poco::Timespan connection_timeout);