From ac64a1339290be826e10bfa454897f825c87457c Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 17:22:28 +0300 Subject: [PATCH 1/9] Split storage and requests processing --- src/Common/ZooKeeper/TestKeeperStorage.cpp | 325 +++++++----------- src/Common/ZooKeeper/TestKeeperStorage.h | 70 ++-- .../ZooKeeper/TestKeeperStorageDispatcher.cpp | 131 +++++++ .../ZooKeeper/TestKeeperStorageDispatcher.h | 58 ++++ src/Interpreters/Context.cpp | 8 +- src/Interpreters/Context.h | 4 +- src/Server/TestKeeperTCPHandler.cpp | 166 ++++----- src/Server/TestKeeperTCPHandler.h | 14 +- 8 files changed, 410 insertions(+), 366 deletions(-) create mode 100644 src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp create mode 100644 src/Common/ZooKeeper/TestKeeperStorageDispatcher.h diff --git a/src/Common/ZooKeeper/TestKeeperStorage.cpp b/src/Common/ZooKeeper/TestKeeperStorage.cpp index daadba6519e..4f1300cde8c 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.cpp +++ b/src/Common/ZooKeeper/TestKeeperStorage.cpp @@ -39,8 +39,9 @@ static String baseName(const String & path) return path.substr(rslash_pos + 1); } -static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type) +static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches, Coordination::Event event_type) { + TestKeeperStorage::ResponsesForSessions result; auto it = watches.find(path); if (it != watches.end()) { @@ -50,9 +51,8 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & watch_response->zxid = -1; watch_response->type = event_type; watch_response->state = Coordination::State::CONNECTED; - for (auto & watcher : it->second) - if (watcher.watch_callback) - watcher.watch_callback(watch_response); + for (auto watcher_session : it->second) + result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_response}); watches.erase(it); } @@ -67,19 +67,17 @@ static void processWatchesImpl(const String & path, TestKeeperStorage::Watches & watch_list_response->zxid = -1; watch_list_response->type = Coordination::Event::CHILD; watch_list_response->state = Coordination::State::CONNECTED; - for (auto & watcher : it->second) - if (watcher.watch_callback) - watcher.watch_callback(watch_list_response); + for (auto watcher_session : it->second) + result.push_back(TestKeeperStorage::ResponseForSession{watcher_session, watch_list_response}); list_watches.erase(it); } + return result; } TestKeeperStorage::TestKeeperStorage() { container.emplace("/", Node()); - - processing_thread = ThreadFromGlobalPool([this] { processingThread(); }); } using Undo = std::function; @@ -92,7 +90,7 @@ struct TestKeeperStorageRequest : zk_request(zk_request_) {} virtual std::pair process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0; - virtual void processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const {} + virtual TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & /*watches*/, TestKeeperStorage::Watches & /*list_watches*/) const { return {}; } virtual ~TestKeeperStorageRequest() = default; }; @@ -111,9 +109,9 @@ struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest { using TestKeeperStorageRequest::TestKeeperStorageRequest; - void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override + TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override { - processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); + return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); } std::pair process(TestKeeperStorage::Container & container, TestKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override @@ -271,9 +269,9 @@ struct TestKeeperStorageRemoveRequest final : public TestKeeperStorageRequest return { response_ptr, undo }; } - void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override + TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override { - processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); + return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } }; @@ -344,9 +342,9 @@ struct TestKeeperStorageSetRequest final : public TestKeeperStorageRequest return { response_ptr, undo }; } - void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override + TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override { - processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); + return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); } }; @@ -502,10 +500,15 @@ struct TestKeeperStorageMultiRequest final : public TestKeeperStorageRequest } } - void processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override + TestKeeperStorage::ResponsesForSessions processWatches(TestKeeperStorage::Watches & watches, TestKeeperStorage::Watches & list_watches) const override { + TestKeeperStorage::ResponsesForSessions result; for (const auto & generic_request : concrete_requests) - generic_request->processWatches(watches, list_watches); + { + auto responses = generic_request->processWatches(watches, list_watches); + result.insert(result.end(), responses.begin(), responses.end()); + } + return result; } }; @@ -518,160 +521,49 @@ struct TestKeeperStorageCloseRequest final : public TestKeeperStorageRequest } }; -void TestKeeperStorage::processingThread() +TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const RequestsForSessions & expired_requests) { - setThreadName("TestKeeperSProc"); + if (finalized) + throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR); - try + finalized = true; + + ResponsesForSessions finalize_results; + auto finish_watch = [] (const auto & watch_pair) -> ResponsesForSessions { - while (!shutdown) - { - RequestInfo info; + ResponsesForSessions results; + std::shared_ptr response = std::make_shared(); + response->type = Coordination::SESSION; + response->state = Coordination::EXPIRED_SESSION; + response->error = Coordination::Error::ZSESSIONEXPIRED; - UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); + for (auto & watcher_session : watch_pair.second) + results.push_back(ResponseForSession{watcher_session, response}); + return results; + }; - if (requests_queue.tryPop(info, max_wait)) - { - if (shutdown) - break; - - auto zk_request = info.request->zk_request; - if (zk_request->getOpNum() == Coordination::OpNum::Close) - { - auto it = ephemerals.find(info.session_id); - if (it != ephemerals.end()) - { - for (const auto & ephemeral_path : it->second) - { - container.erase(ephemeral_path); - processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED); - } - ephemerals.erase(it); - } - clearDeadWatches(info.session_id); - - /// Finish connection - auto response = std::make_shared(); - response->xid = zk_request->xid; - response->zxid = getZXID(); - info.response_callback(response); - } - else - { - auto [response, _] = info.request->process(container, ephemerals, zxid, info.session_id); - - if (info.watch_callback) - { - if (response->error == Coordination::Error::ZOK) - { - auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList - ? list_watches - : watches; - - watches_type[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback}); - sessions_and_watchers[info.session_id].emplace(zk_request->getPath()); - } - else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) - { - watches[zk_request->getPath()].emplace_back(Watcher{info.session_id, info.watch_callback}); - sessions_and_watchers[info.session_id].emplace(zk_request->getPath()); - } - else - { - std::shared_ptr watch_response = std::make_shared(); - watch_response->path = zk_request->getPath(); - watch_response->xid = -1; - watch_response->error = response->error; - watch_response->type = Coordination::Event::NOTWATCHING; - info.watch_callback(watch_response); - } - } - - if (response->error == Coordination::Error::ZOK) - info.request->processWatches(watches, list_watches); - - response->xid = zk_request->xid; - response->zxid = getZXID(); - - info.response_callback(response); - } - } - } - } - catch (...) + for (auto & path_watch : watches) { - tryLogCurrentException(__PRETTY_FUNCTION__); - finalize(); - } -} - - -void TestKeeperStorage::finalize() -{ - { - std::lock_guard lock(push_request_mutex); - - if (shutdown) - return; - - shutdown = true; - - if (processing_thread.joinable()) - processing_thread.join(); + auto watch_responses = finish_watch(path_watch); + finalize_results.insert(finalize_results.end(), watch_responses.begin(), watch_responses.end()); } - try + watches.clear(); + for (auto & path_watch : list_watches) { - { - auto finish_watch = [] (const auto & watch_pair) - { - Coordination::ZooKeeperWatchResponse response; - response.type = Coordination::SESSION; - response.state = Coordination::EXPIRED_SESSION; - response.error = Coordination::Error::ZSESSIONEXPIRED; + auto list_watch_responses = finish_watch(path_watch); + finalize_results.insert(finalize_results.end(), list_watch_responses.begin(), list_watch_responses.end()); + } + list_watches.clear(); + sessions_and_watchers.clear(); - for (auto & watcher : watch_pair.second) - { - if (watcher.watch_callback) - { - try - { - watcher.watch_callback(std::make_shared(response)); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } - }; - for (auto & path_watch : watches) - finish_watch(path_watch); - watches.clear(); - for (auto & path_watch : list_watches) - finish_watch(path_watch); - list_watches.clear(); - sessions_and_watchers.clear(); - } - RequestInfo info; - while (requests_queue.tryPop(info)) - { - auto response = info.request->zk_request->makeResponse(); - response->error = Coordination::Error::ZSESSIONEXPIRED; - try - { - info.response_callback(response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } - catch (...) + for (const auto & [session_id, zk_request] : expired_requests) { - tryLogCurrentException(__PRETTY_FUNCTION__); + auto response = zk_request->makeResponse(); + response->error = Coordination::Error::ZSESSIONEXPIRED; + finalize_results.push_back(ResponseForSession{session_id, response}); } + return finalize_results; } @@ -731,55 +623,80 @@ TestKeeperWrapperFactory::TestKeeperWrapperFactory() registerTestKeeperRequestWrapper(*this); } -void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback) + +TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id) { - TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(request); - RequestInfo request_info; - request_info.time = clock::now(); - request_info.request = storage_request; - request_info.session_id = session_id; - request_info.response_callback = callback; - - std::lock_guard lock(push_request_mutex); - /// Put close requests without timeouts - if (request->getOpNum() == Coordination::OpNum::Close) - requests_queue.push(std::move(request_info)); - else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds())) - throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); - -} - -void TestKeeperStorage::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback) -{ - TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(request); - RequestInfo request_info; - request_info.time = clock::now(); - request_info.request = storage_request; - request_info.session_id = session_id; - request_info.response_callback = callback; - if (request->has_watch) - request_info.watch_callback = watch_callback; - - std::lock_guard lock(push_request_mutex); - /// Put close requests without timeouts - if (request->getOpNum() == Coordination::OpNum::Close) - requests_queue.push(std::move(request_info)); - else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds())) - throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); -} - -TestKeeperStorage::~TestKeeperStorage() -{ - try + TestKeeperStorage::ResponsesForSessions results; + if (zk_request->getOpNum() == Coordination::OpNum::Close) { - finalize(); + auto it = ephemerals.find(session_id); + if (it != ephemerals.end()) + { + for (const auto & ephemeral_path : it->second) + { + container.erase(ephemeral_path); + auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED); + results.insert(results.end(), responses.begin(), responses.end()); + } + ephemerals.erase(it); + } + clearDeadWatches(session_id); + + /// Finish connection + auto response = std::make_shared(); + response->xid = zk_request->xid; + response->zxid = getZXID(); + results.push_front(ResponseForSession{session_id, response}); } - catch (...) + else { - tryLogCurrentException(__PRETTY_FUNCTION__); + + TestKeeperStorageRequestPtr storage_request = TestKeeperWrapperFactory::instance().get(zk_request); + auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id); + + if (zk_request->has_watch) + { + if (response->error == Coordination::Error::ZOK) + { + auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList + ? list_watches + : watches; + + watches_type[zk_request->getPath()].emplace_back(session_id); + sessions_and_watchers[session_id].emplace(zk_request->getPath()); + } + else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) + { + watches[zk_request->getPath()].emplace_back(session_id); + sessions_and_watchers[session_id].emplace(zk_request->getPath()); + } + else + { + std::shared_ptr watch_response = std::make_shared(); + watch_response->path = zk_request->getPath(); + watch_response->xid = -1; + watch_response->error = response->error; + watch_response->type = Coordination::Event::NOTWATCHING; + results.push_back(ResponseForSession{session_id, watch_response}); + } + } + + if (response->error == Coordination::Error::ZOK) + { + auto watch_responses = storage_request->processWatches(watches, list_watches); + results.insert(results.end(), watch_responses.begin(), watch_responses.end()); + } + + response->xid = zk_request->xid; + response->zxid = getZXID(); + + results.push_front(ResponseForSession{session_id, response}); } + + return results; } + void TestKeeperStorage::clearDeadWatches(int64_t session_id) { auto watches_it = sessions_and_watchers.find(session_id); @@ -793,7 +710,7 @@ void TestKeeperStorage::clearDeadWatches(int64_t session_id) auto & watches_for_path = watch->second; for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();) { - if (w_it->session_id == session_id) + if (*w_it == session_id) w_it = watches_for_path.erase(w_it); else ++w_it; @@ -808,7 +725,7 @@ void TestKeeperStorage::clearDeadWatches(int64_t session_id) auto & list_watches_for_path = list_watch->second; for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();) { - if (w_it->session_id == session_id) + if (*w_it == session_id) w_it = list_watches_for_path.erase(w_it); else ++w_it; diff --git a/src/Common/ZooKeeper/TestKeeperStorage.h b/src/Common/ZooKeeper/TestKeeperStorage.h index afb0a7add82..5afa5032bcf 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.h +++ b/src/Common/ZooKeeper/TestKeeperStorage.h @@ -4,9 +4,9 @@ #include #include #include -#include #include #include +#include namespace zkutil { @@ -18,10 +18,7 @@ using ResponseCallback = std::function session_id_counter{0}; struct Node @@ -34,71 +31,58 @@ public: int32_t seq_num = 0; }; - struct Watcher + struct ResponseForSession { int64_t session_id; - ResponseCallback watch_callback; + Coordination::ZooKeeperResponsePtr response; }; + using ResponsesForSessions = std::deque; + + struct RequestForSession + { + int64_t session_id; + Coordination::ZooKeeperRequestPtr request; + }; + + using RequestsForSessions = std::deque; + using Container = std::map; using Ephemerals = std::unordered_map>; using SessionAndWatcher = std::unordered_map>; + using SessionIDs = std::vector; - using WatchCallbacks = std::vector; - using Watches = std::map; + using Watches = std::map; Container container; Ephemerals ephemerals; SessionAndWatcher sessions_and_watchers; std::atomic zxid{0}; - std::atomic shutdown{false}; + std::atomic finalized{false}; Watches watches; Watches list_watches; /// Watches for 'list' request (watches on children). - using clock = std::chrono::steady_clock; - - struct RequestInfo - { - TestKeeperStorageRequestPtr request; - ResponseCallback response_callback; - ResponseCallback watch_callback; - clock::time_point time; - int64_t session_id; - }; - - std::mutex push_request_mutex; - using RequestsQueue = ConcurrentBoundedQueue; - RequestsQueue requests_queue{1}; - - void finalize(); - - ThreadFromGlobalPool processing_thread; - - void processingThread(); void clearDeadWatches(int64_t session_id); -public: - using AsyncResponse = std::future; - TestKeeperStorage(); - ~TestKeeperStorage(); - struct ResponsePair + int64_t getZXID() { - AsyncResponse response; - std::optional watch_response; - }; - void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback); - void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, ResponseCallback callback, ResponseCallback watch_callback); + return zxid.fetch_add(1); + } + +public: + TestKeeperStorage(); + + ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); + ResponsesForSessions finalize(const RequestsForSessions & expired_requests); int64_t getSessionID() { return session_id_counter.fetch_add(1); } - int64_t getZXID() - { - return zxid.fetch_add(1); - } + + }; } diff --git a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp new file mode 100644 index 00000000000..b1233fc47e3 --- /dev/null +++ b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp @@ -0,0 +1,131 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + + extern const int LOGICAL_ERROR; + extern const int TIMEOUT_EXCEEDED; +} + +} +namespace zkutil +{ + +void TestKeeperStorageDispatcher::processingThread() +{ + setThreadName("TestKeeperSProc"); + try + { + while (!shutdown) + { + RequestInfo info; + + UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); + + if (requests_queue.tryPop(info, max_wait)) + { + if (shutdown) + break; + + auto responses = storage.processRequest(info.request, info.session_id); + for (const auto & response_for_session : responses) + setResponse(response_for_session.session_id, response_for_session.response); + } + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + finalize(); + } +} + +void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) +{ + std::lock_guard lock(session_to_response_callback_mutex); + auto session_writer = session_to_response_callback.find(session_id); + if (session_writer == session_to_response_callback.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id); + + session_writer->second(response); + /// Session closed, no more writes + if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) + session_to_response_callback.erase(session_writer); +} + +void TestKeeperStorageDispatcher::finalize() +{ + { + std::lock_guard lock(push_request_mutex); + + if (shutdown) + return; + + shutdown = true; + + if (processing_thread.joinable()) + processing_thread.join(); + } + + RequestInfo info; + TestKeeperStorage::RequestsForSessions expired_requests; + while (requests_queue.tryPop(info)) + expired_requests.push_back(TestKeeperStorage::RequestForSession{info.session_id, info.request}); + + auto expired_responses = storage.finalize(expired_requests); + + for (const auto & response_for_session : expired_responses) + setResponse(response_for_session.session_id, response_for_session.response); +} + +void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) +{ + + { + std::lock_guard lock(session_to_response_callback_mutex); + if (session_to_response_callback.count(session_id) == 0) + throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id); + } + + RequestInfo request_info; + request_info.time = clock::now(); + request_info.request = request; + request_info.session_id = session_id; + + std::lock_guard lock(push_request_mutex); + /// Put close requests without timeouts + if (request->getOpNum() == Coordination::OpNum::Close) + requests_queue.push(std::move(request_info)); + else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds())) + throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); +} + +TestKeeperStorageDispatcher::TestKeeperStorageDispatcher() +{ + processing_thread = ThreadFromGlobalPool([this] { processingThread(); }); +} + +TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher() +{ + try + { + finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback) +{ + std::lock_guard lock(session_to_response_callback_mutex); + if (!session_to_response_callback.try_emplace(session_id, callback).second) + throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id); +} + +} diff --git a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h new file mode 100644 index 00000000000..27abf17ac73 --- /dev/null +++ b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include +#include + +namespace zkutil +{ + +using ZooKeeperResponseCallback = std::function; + +class TestKeeperStorageDispatcher +{ +private: + Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000}; + + using clock = std::chrono::steady_clock; + + struct RequestInfo + { + Coordination::ZooKeeperRequestPtr request; + clock::time_point time; + int64_t session_id; + }; + + std::mutex push_request_mutex; + + using RequestsQueue = ConcurrentBoundedQueue; + RequestsQueue requests_queue{1}; + std::atomic shutdown{false}; + using SessionToResponseCallback = std::unordered_map; + + std::mutex session_to_response_callback_mutex; + SessionToResponseCallback session_to_response_callback; + + ThreadFromGlobalPool processing_thread; + + TestKeeperStorage storage; + +private: + void processingThread(); + void finalize(); + void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); + +public: + TestKeeperStorageDispatcher(); + ~TestKeeperStorageDispatcher(); + + void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); + int64_t getSessionID() + { + return storage.getSessionID(); + } + void registerSession(int64_t session_id, ZooKeeperResponseCallback callback); +}; + +} diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 2a8fdce869b..ea10024b3cb 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include #include @@ -306,7 +306,7 @@ struct ContextShared ConfigurationPtr zookeeper_config; /// Stores zookeeper configs mutable std::mutex test_keeper_storage_mutex; - mutable std::shared_ptr test_keeper_storage; + mutable std::shared_ptr test_keeper_storage; mutable std::mutex auxiliary_zookeepers_mutex; mutable std::map auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients. ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs @@ -1531,11 +1531,11 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const return shared->zookeeper; } -std::shared_ptr & Context::getTestKeeperStorage() const +std::shared_ptr & Context::getTestKeeperStorage() const { std::lock_guard lock(shared->test_keeper_storage_mutex); if (!shared->test_keeper_storage) - shared->test_keeper_storage = std::make_shared(); + shared->test_keeper_storage = std::make_shared(); return shared->test_keeper_storage; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 79140f0d209..dc8efb058e7 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -40,7 +40,7 @@ namespace Poco namespace zkutil { class ZooKeeper; - class TestKeeperStorage; + class TestKeeperStorageDispatcher; } @@ -513,7 +513,7 @@ public: std::shared_ptr getAuxiliaryZooKeeper(const String & name) const; - std::shared_ptr & getTestKeeperStorage() const; + std::shared_ptr & getTestKeeperStorage() const; /// Set auxiliary zookeepers configuration at server starting or configuration reloading. void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config); diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index aeb7da038b7..e81a2e9ef99 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -32,7 +32,7 @@ namespace ErrorCodes struct PollResult { - std::vector ready_responses; + bool has_responses; bool has_requests; bool error; }; @@ -162,10 +162,10 @@ struct SocketInterruptablePollWrapper { do { - size_t response_position; - readIntBinary(response_position, response_in); - result.ready_responses.push_back(response_position); - } while (response_in.available()); + UInt8 response_byte; + readIntBinary(response_byte, response_in); + result.has_responses = true; + } while (response_in.available()); /// Just to drain all of them } } } @@ -186,11 +186,12 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S , server(server_) , log(&Poco::Logger::get("TestKeeperTCPHandler")) , global_context(server.context()) - , test_keeper_storage(global_context.getTestKeeperStorage()) + , test_keeper_storage_dispatcher(global_context.getTestKeeperStorage()) , operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) - , session_id(test_keeper_storage->getSessionID()) + , session_id(test_keeper_storage_dispatcher->getSessionID()) , poll_wrapper(std::make_unique(socket_)) + , responses(1000) { } @@ -278,6 +279,16 @@ void TestKeeperTCPHandler::runImpl() } sendHandshake(); + + auto response_fd = poll_wrapper->getResponseFD(); + auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) + { + responses.push(response); + UInt8 single_byte = 1; + [[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte)); + }; + test_keeper_storage_dispatcher->registerSession(session_id, response_callback); + session_stopwatch.start(); bool close_received = false; try @@ -291,27 +302,12 @@ void TestKeeperTCPHandler::runImpl() { do { - Coordination::OpNum received_op = receiveRequest(); + auto [received_op, received_xid] = receiveRequest(); if (received_op == Coordination::OpNum::Close) { - auto last_response = responses.find(response_id_counter - 1); - if (last_response == responses.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Just inserted response #{} not found in responses", response_id_counter - 1); - LOG_DEBUG(log, "Received close request for session #{}", session_id); - if (last_response->second.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready) - { - LOG_DEBUG(log, "Cannot sent close for session #{}", session_id); - } - else - { - LOG_DEBUG(log, "Sent close for session #{}", session_id); - last_response->second.get()->write(*out); - } - - close_received = true; - - break; + LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id); + close_xid = received_xid; } else if (received_op == Coordination::OpNum::Heartbeat) { @@ -322,44 +318,36 @@ void TestKeeperTCPHandler::runImpl() while (in->available()); } + if (result.has_responses) + { + Coordination::ZooKeeperResponsePtr response; + while (responses.tryPop(response)) + { + if (response->xid == close_xid) + { + close_received = true; + break; + } + + if (response->error == Coordination::Error::ZOK) + response->write(*out); + else if (response->xid != Coordination::WATCH_XID) + response->write(*out); + /// skipping bad response for watch + } + } + if (close_received) break; - for (size_t response_id : result.ready_responses) - { - auto response_future = responses.find(response_id); - if (response_future == responses.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get unknown response #{}", response_id); - - if (response_future->second.wait_for(0s) != std::future_status::ready) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Response #{} was market as ready but corresponding future not ready yet", response_id); - - auto response = response_future->second.get(); - if (response->error == Coordination::Error::ZOK) - { - response->write(*out); - } - else - { - /// TODO Get rid of this - if (!dynamic_cast(response.get())) - response->write(*out); - } - responses.erase(response_future); - } - if (result.error) throw Exception("Exception happened while reading from socket", ErrorCodes::SYSTEM_ERROR); if (session_stopwatch.elapsedMicroseconds() > static_cast(session_timeout.totalMicroseconds())) { LOG_DEBUG(log, "Session #{} expired", session_id); - auto response = putCloseRequest(); - if (response.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready) + if (!finish()) LOG_DEBUG(log, "Cannot sent close for expired session #{}", session_id); - else - response.get()->write(*out); - break; } } @@ -367,29 +355,33 @@ void TestKeeperTCPHandler::runImpl() catch (const Exception & ex) { LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true)); - auto response = putCloseRequest(); - if (response.wait_for(std::chrono::microseconds(operation_timeout.totalMicroseconds())) != std::future_status::ready) + if (!finish()) LOG_DEBUG(log, "Cannot sent close for session #{}", session_id); - else - response.get()->write(*out); } } -zkutil::TestKeeperStorage::AsyncResponse TestKeeperTCPHandler::putCloseRequest() +bool TestKeeperTCPHandler::finish() { Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); - request->xid = Coordination::CLOSE_XID; - auto promise = std::make_shared>(); - zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response) + request->xid = close_xid; + test_keeper_storage_dispatcher->putRequest(request, session_id); + + Coordination::ZooKeeperResponsePtr response; + bool finished = false; + while (responses.tryPop(response, operation_timeout.totalMilliseconds())) { - promise->set_value(response); - }; - test_keeper_storage->putRequest(request, session_id, callback); - return promise->get_future(); + if (response->xid == close_xid) + { + finished = true; + response->write(*out); + break; + } + } + return finished; } -Coordination::OpNum TestKeeperTCPHandler::receiveRequest() +std::pair TestKeeperTCPHandler::receiveRequest() { int32_t length; Coordination::read(length, *in); @@ -402,47 +394,9 @@ Coordination::OpNum TestKeeperTCPHandler::receiveRequest() Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); request->xid = xid; request->readImpl(*in); - auto promise = std::make_shared>(); - if (opnum != Coordination::OpNum::Close) - { - int response_fd = poll_wrapper->getResponseFD(); - size_t response_num = response_id_counter++; - zkutil::ResponseCallback callback = [response_fd, promise, response_num] (const Coordination::ZooKeeperResponsePtr & response) - { - promise->set_value(response); - [[maybe_unused]] int result = write(response_fd, &response_num, sizeof(response_num)); - }; - if (request->has_watch) - { - auto watch_promise = std::make_shared>(); - size_t watch_response_num = response_id_counter++; - zkutil::ResponseCallback watch_callback = [response_fd, watch_promise, watch_response_num] (const Coordination::ZooKeeperResponsePtr & response) - { - watch_promise->set_value(response); - [[maybe_unused]] int result = write(response_fd, &watch_response_num, sizeof(watch_response_num)); - }; - test_keeper_storage->putRequest(request, session_id, callback, watch_callback); - responses.try_emplace(response_num, promise->get_future()); - responses.try_emplace(watch_response_num, watch_promise->get_future()); - } - else - { - test_keeper_storage->putRequest(request, session_id, callback); - responses.try_emplace(response_num, promise->get_future()); - } - } - else - { - zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response) - { - promise->set_value(response); - }; - test_keeper_storage->putRequest(request, session_id, callback); - responses.try_emplace(response_id_counter++, promise->get_future()); - } - - return opnum; + test_keeper_storage_dispatcher->putRequest(request, session_id); + return std::make_pair(opnum, xid); } } diff --git a/src/Server/TestKeeperTCPHandler.h b/src/Server/TestKeeperTCPHandler.h index 14e38ae6bd5..e2de33a5156 100644 --- a/src/Server/TestKeeperTCPHandler.h +++ b/src/Server/TestKeeperTCPHandler.h @@ -3,10 +3,11 @@ #include #include "IServer.h" #include +#include #include #include #include -#include +#include #include #include #include @@ -27,15 +28,14 @@ private: IServer & server; Poco::Logger * log; Context global_context; - std::shared_ptr test_keeper_storage; + std::shared_ptr test_keeper_storage_dispatcher; Poco::Timespan operation_timeout; Poco::Timespan session_timeout; int64_t session_id; Stopwatch session_stopwatch; SocketInterruptablePollWrapperPtr poll_wrapper; - - size_t response_id_counter = 0; - std::unordered_map responses; + ConcurrentBoundedQueue responses; + Coordination::XID close_xid = Coordination::CLOSE_XID; /// Streams for reading/writing from/to client connection socket. std::shared_ptr in; @@ -46,8 +46,8 @@ private: void sendHandshake(); void receiveHandshake(); - Coordination::OpNum receiveRequest(); - zkutil::TestKeeperStorage::AsyncResponse putCloseRequest(); + std::pair receiveRequest(); + bool finish(); }; } From ace6d906b0b7a2a8b0f45b95021a618f44b9d21e Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 17:45:45 +0300 Subject: [PATCH 2/9] Fix normal close scenario --- src/Common/ZooKeeper/TestKeeperStorage.cpp | 4 ++-- src/Common/ZooKeeper/TestKeeperStorage.h | 4 ++-- src/Server/TestKeeperTCPHandler.cpp | 11 +++++------ 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeperStorage.cpp b/src/Common/ZooKeeper/TestKeeperStorage.cpp index 4f1300cde8c..e7300939821 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.cpp +++ b/src/Common/ZooKeeper/TestKeeperStorage.cpp @@ -646,7 +646,7 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const auto response = std::make_shared(); response->xid = zk_request->xid; response->zxid = getZXID(); - results.push_front(ResponseForSession{session_id, response}); + results.push_back(ResponseForSession{session_id, response}); } else { @@ -690,7 +690,7 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::processRequest(const response->xid = zk_request->xid; response->zxid = getZXID(); - results.push_front(ResponseForSession{session_id, response}); + results.push_back(ResponseForSession{session_id, response}); } return results; diff --git a/src/Common/ZooKeeper/TestKeeperStorage.h b/src/Common/ZooKeeper/TestKeeperStorage.h index 5afa5032bcf..2196273b3ba 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.h +++ b/src/Common/ZooKeeper/TestKeeperStorage.h @@ -37,7 +37,7 @@ public: Coordination::ZooKeeperResponsePtr response; }; - using ResponsesForSessions = std::deque; + using ResponsesForSessions = std::vector; struct RequestForSession { @@ -45,7 +45,7 @@ public: Coordination::ZooKeeperRequestPtr request; }; - using RequestsForSessions = std::deque; + using RequestsForSessions = std::vector; using Container = std::map; using Ephemerals = std::unordered_map>; diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index e81a2e9ef99..90aec9ce66f 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -298,7 +298,7 @@ void TestKeeperTCPHandler::runImpl() using namespace std::chrono_literals; PollResult result = poll_wrapper->poll(session_timeout); - if (result.has_requests) + if (result.has_requests && !close_received) { do { @@ -308,6 +308,8 @@ void TestKeeperTCPHandler::runImpl() { LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id); close_xid = received_xid; + close_received = true; + break; } else if (received_op == Coordination::OpNum::Heartbeat) { @@ -325,8 +327,8 @@ void TestKeeperTCPHandler::runImpl() { if (response->xid == close_xid) { - close_received = true; - break; + LOG_DEBUG(log, "Session #{} successfuly closed", session_id); + return; } if (response->error == Coordination::Error::ZOK) @@ -337,9 +339,6 @@ void TestKeeperTCPHandler::runImpl() } } - if (close_received) - break; - if (result.error) throw Exception("Exception happened while reading from socket", ErrorCodes::SYSTEM_ERROR); From 0ee5629527dc35ffb707bc2ddececfcc082f04bb Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 18:10:49 +0300 Subject: [PATCH 3/9] Fix style --- src/Common/ZooKeeper/TestKeeperStorage.cpp | 1 - src/Common/ya.make | 1 + src/Server/TestKeeperTCPHandler.cpp | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeperStorage.cpp b/src/Common/ZooKeeper/TestKeeperStorage.cpp index e7300939821..e364b0efca9 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.cpp +++ b/src/Common/ZooKeeper/TestKeeperStorage.cpp @@ -14,7 +14,6 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; - extern const int TIMEOUT_EXCEEDED; extern const int BAD_ARGUMENTS; } diff --git a/src/Common/ya.make b/src/Common/ya.make index 5b5da618bbe..4f2f1892a88 100644 --- a/src/Common/ya.make +++ b/src/Common/ya.make @@ -85,6 +85,7 @@ SRCS( ZooKeeper/IKeeper.cpp ZooKeeper/TestKeeper.cpp ZooKeeper/TestKeeperStorage.cpp + ZooKeeper/TestKeeperStorageDispatcher.cpp ZooKeeper/ZooKeeper.cpp ZooKeeper/ZooKeeperCommon.cpp ZooKeeper/ZooKeeperConstants.cpp diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index 90aec9ce66f..f928c10c856 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -27,7 +27,6 @@ namespace ErrorCodes { extern const int SYSTEM_ERROR; extern const int UNEXPECTED_PACKET_FROM_CLIENT; - extern const int LOGICAL_ERROR; } struct PollResult From 6c6bf60937e76fcf27433a796b3d4be0c51ecd32 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 18:23:32 +0300 Subject: [PATCH 4/9] Rename function --- src/Interpreters/Context.cpp | 16 ++++++++-------- src/Interpreters/Context.h | 2 +- src/Server/TestKeeperTCPHandler.cpp | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index ea10024b3cb..6cf977dac34 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -305,8 +305,8 @@ struct ContextShared mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper. ConfigurationPtr zookeeper_config; /// Stores zookeeper configs - mutable std::mutex test_keeper_storage_mutex; - mutable std::shared_ptr test_keeper_storage; + mutable std::mutex test_keeper_storage_dispatcher_mutex; + mutable std::shared_ptr test_keeper_storage_dispatcher; mutable std::mutex auxiliary_zookeepers_mutex; mutable std::map auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients. ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs @@ -447,7 +447,7 @@ struct ContextShared /// Stop zookeeper connection zookeeper.reset(); /// Stop test_keeper storage - test_keeper_storage.reset(); + test_keeper_storage_dispatcher.reset(); } bool hasTraceCollector() const @@ -1531,13 +1531,13 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const return shared->zookeeper; } -std::shared_ptr & Context::getTestKeeperStorage() const +std::shared_ptr & Context::getTestKeeperStorageDispatcher() const { - std::lock_guard lock(shared->test_keeper_storage_mutex); - if (!shared->test_keeper_storage) - shared->test_keeper_storage = std::make_shared(); + std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex); + if (!shared->test_keeper_storage_dispatcher) + shared->test_keeper_storage_dispatcher = std::make_shared(); - return shared->test_keeper_storage; + return shared->test_keeper_storage_dispatcher; } zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index dc8efb058e7..9c8d5252373 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -513,7 +513,7 @@ public: std::shared_ptr getAuxiliaryZooKeeper(const String & name) const; - std::shared_ptr & getTestKeeperStorage() const; + std::shared_ptr & getTestKeeperStorageDispatcher() const; /// Set auxiliary zookeepers configuration at server starting or configuration reloading. void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config); diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index f928c10c856..b4192b6c9fb 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -185,7 +185,7 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S , server(server_) , log(&Poco::Logger::get("TestKeeperTCPHandler")) , global_context(server.context()) - , test_keeper_storage_dispatcher(global_context.getTestKeeperStorage()) + , test_keeper_storage_dispatcher(global_context.getTestKeeperStorageDispatcher()) , operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) , session_id(test_keeper_storage_dispatcher->getSessionID()) From 7e71a5b1b0c68f8ededa72c51351440472ca8ad8 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 18:25:08 +0300 Subject: [PATCH 5/9] Fix new lines --- src/Common/ZooKeeper/TestKeeperStorage.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeperStorage.h b/src/Common/ZooKeeper/TestKeeperStorage.h index 2196273b3ba..2df505d3e34 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.h +++ b/src/Common/ZooKeeper/TestKeeperStorage.h @@ -81,8 +81,6 @@ public: { return session_id_counter.fetch_add(1); } - - }; } From 8cdfbd996bbb0c06e72f1c2aec9a20c179b2dd48 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 18:51:52 +0300 Subject: [PATCH 6/9] Fix header --- src/Common/ZooKeeper/TestKeeperStorage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/ZooKeeper/TestKeeperStorage.h b/src/Common/ZooKeeper/TestKeeperStorage.h index 2df505d3e34..21b1ce16c32 100644 --- a/src/Common/ZooKeeper/TestKeeperStorage.h +++ b/src/Common/ZooKeeper/TestKeeperStorage.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include namespace zkutil { From 56f19e4790b235337611456cc44e4486df47e970 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 18:52:28 +0300 Subject: [PATCH 7/9] Remove unused headers --- src/Server/TestKeeperTCPHandler.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Server/TestKeeperTCPHandler.h b/src/Server/TestKeeperTCPHandler.h index e2de33a5156..2115f1cf11f 100644 --- a/src/Server/TestKeeperTCPHandler.h +++ b/src/Server/TestKeeperTCPHandler.h @@ -11,7 +11,6 @@ #include #include #include -#include namespace DB { From 374cee47e040e977e67b255d1685186cd285ef19 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 19:22:40 +0300 Subject: [PATCH 8/9] Fix typo --- src/Server/TestKeeperTCPHandler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index b4192b6c9fb..8b9047c531c 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -326,7 +326,7 @@ void TestKeeperTCPHandler::runImpl() { if (response->xid == close_xid) { - LOG_DEBUG(log, "Session #{} successfuly closed", session_id); + LOG_DEBUG(log, "Session #{} successfully closed", session_id); return; } From 70679e4ee1964693d419a35d23bc2bdbdf8fcbb6 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 20 Jan 2021 15:11:26 +0300 Subject: [PATCH 9/9] Fix test keeper handler --- src/Server/TestKeeperTCPHandler.cpp | 70 +++++++++++++++++------------ src/Server/TestKeeperTCPHandler.h | 9 ++-- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index 8b9047c531c..7b02996019e 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include #ifdef POCO_HAVE_FD_EPOLL #include @@ -36,6 +38,36 @@ struct PollResult bool error; }; +/// Queue with mutex. As simple as possible. +class ThreadSafeResponseQueue +{ +private: + mutable std::mutex queue_mutex; + std::queue queue; +public: + void push(const Coordination::ZooKeeperResponsePtr & response) + { + std::lock_guard lock(queue_mutex); + queue.push(response); + } + bool tryPop(Coordination::ZooKeeperResponsePtr & response) + { + std::lock_guard lock(queue_mutex); + if (!queue.empty()) + { + response = queue.front(); + queue.pop(); + return true; + } + return false; + } + size_t size() const + { + std::lock_guard lock(queue_mutex); + return queue.size(); + } +}; + struct SocketInterruptablePollWrapper { int sockfd; @@ -159,12 +191,10 @@ struct SocketInterruptablePollWrapper result.has_requests = true; else { - do - { - UInt8 response_byte; - readIntBinary(response_byte, response_in); - result.has_responses = true; - } while (response_in.available()); /// Just to drain all of them + /// Skip all of them, we are not interested in exact + /// amount because responses ordered in responses queue. + response_in.ignore(); + result.has_responses = true; } } } @@ -190,7 +220,7 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S , session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) , session_id(test_keeper_storage_dispatcher->getSessionID()) , poll_wrapper(std::make_unique(socket_)) - , responses(1000) + , responses(std::make_unique()) { } @@ -282,7 +312,7 @@ void TestKeeperTCPHandler::runImpl() auto response_fd = poll_wrapper->getResponseFD(); auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) { - responses.push(response); + responses->push(response); UInt8 single_byte = 1; [[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte)); }; @@ -322,7 +352,7 @@ void TestKeeperTCPHandler::runImpl() if (result.has_responses) { Coordination::ZooKeeperResponsePtr response; - while (responses.tryPop(response)) + while (responses->tryPop(response)) { if (response->xid == close_xid) { @@ -344,8 +374,7 @@ void TestKeeperTCPHandler::runImpl() if (session_stopwatch.elapsedMicroseconds() > static_cast(session_timeout.totalMicroseconds())) { LOG_DEBUG(log, "Session #{} expired", session_id); - if (!finish()) - LOG_DEBUG(log, "Cannot sent close for expired session #{}", session_id); + finish(); break; } } @@ -353,30 +382,15 @@ void TestKeeperTCPHandler::runImpl() catch (const Exception & ex) { LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true)); - if (!finish()) - LOG_DEBUG(log, "Cannot sent close for session #{}", session_id); + finish(); } - } -bool TestKeeperTCPHandler::finish() +void TestKeeperTCPHandler::finish() { Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = close_xid; test_keeper_storage_dispatcher->putRequest(request, session_id); - - Coordination::ZooKeeperResponsePtr response; - bool finished = false; - while (responses.tryPop(response, operation_timeout.totalMilliseconds())) - { - if (response->xid == close_xid) - { - finished = true; - response->write(*out); - break; - } - } - return finished; } std::pair TestKeeperTCPHandler::receiveRequest() diff --git a/src/Server/TestKeeperTCPHandler.h b/src/Server/TestKeeperTCPHandler.h index 2115f1cf11f..46b4454b319 100644 --- a/src/Server/TestKeeperTCPHandler.h +++ b/src/Server/TestKeeperTCPHandler.h @@ -3,7 +3,6 @@ #include #include "IServer.h" #include -#include #include #include #include @@ -17,6 +16,8 @@ namespace DB struct SocketInterruptablePollWrapper; using SocketInterruptablePollWrapperPtr = std::unique_ptr; +class ThreadSafeResponseQueue; +using ThreadSafeResponseQueuePtr = std::unique_ptr; class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection { @@ -33,7 +34,9 @@ private: int64_t session_id; Stopwatch session_stopwatch; SocketInterruptablePollWrapperPtr poll_wrapper; - ConcurrentBoundedQueue responses; + + ThreadSafeResponseQueuePtr responses; + Coordination::XID close_xid = Coordination::CLOSE_XID; /// Streams for reading/writing from/to client connection socket. @@ -46,7 +49,7 @@ private: void receiveHandshake(); std::pair receiveRequest(); - bool finish(); + void finish(); }; }