From 5302b478a4b512d080068563d4b5b983e4b13d77 Mon Sep 17 00:00:00 2001 From: Mike Kot Date: Thu, 6 Jul 2023 17:12:24 +0000 Subject: [PATCH] proper reconfig batch handling --- src/Common/ZooKeeper/IKeeper.cpp | 1 - src/Coordination/KeeperDispatcher.cpp | 32 +++++++++++++++++---------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/Common/ZooKeeper/IKeeper.cpp b/src/Common/ZooKeeper/IKeeper.cpp index 50160279506..f0a07241735 100644 --- a/src/Common/ZooKeeper/IKeeper.cpp +++ b/src/Common/ZooKeeper/IKeeper.cpp @@ -110,7 +110,6 @@ const char * errorMessage(Error code) case Error::ZCLOSING: return "ZooKeeper is closing"; case Error::ZNOTHING: return "(not error) no server responses to process"; case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored"; - case Error::ZRECONFIGINPROGRESS: return "Another reconfiguration is progress"; } UNREACHABLE(); diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index b956bba4031..daa65de0d89 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -82,6 +82,7 @@ void KeeperDispatcher::requestThread() /// requests into a batch we must check that the new request is not read request. Otherwise we have to /// process all already accumulated write requests, wait them synchronously and only after that process /// read request. So reads are some kind of "separator" for writes. + /// Also there is a special reconfig request also being a separator. try { if (requests_queue->tryPop(request, max_wait)) @@ -90,20 +91,17 @@ void KeeperDispatcher::requestThread() if (shutdown_called) break; - if (request.request->getOpNum() == Coordination::OpNum::Reconfig) - { - server->getKeeperStateMachine()->reconfigure(request); - continue; - } - KeeperStorage::RequestsForSessions current_batch; size_t current_batch_bytes_size = 0; bool has_read_request = false; + bool has_reconfig_request = false; - /// If new request is not read request or we must to process it through quorum. + /// If new request is not read request or reconfig request we must process it through quorum. /// Otherwise we will process it locally. - if (coordination_settings->quorum_reads || !request.request->isReadRequest()) + if (request.request->getOpNum() == Coordination::OpNum::Reconfig) + has_reconfig_request = true; + else if (coordination_settings->quorum_reads || !request.request->isReadRequest()) { current_batch_bytes_size += request.request->bytesSize(); current_batch.emplace_back(request); @@ -122,7 +120,10 @@ void KeeperDispatcher::requestThread() read_request_queue[last_request.session_id][last_request.request->xid].push_back(request); } else if (request.request->getOpNum() == Coordination::OpNum::Reconfig) - server->getKeeperStateMachine()->reconfigure(request); + { + has_reconfig_request = true; + return false; + } else { current_batch_bytes_size += request.request->bytesSize(); @@ -138,6 +139,7 @@ void KeeperDispatcher::requestThread() /// TODO: Deprecate max_requests_quick_batch_size and use only max_requests_batch_size and max_requests_batch_bytes_size size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size; while (!shutdown_called && !has_read_request && + !has_reconfig_request && current_batch.size() < max_quick_batch_size && current_batch_bytes_size < max_batch_bytes_size && try_get_request()) ; @@ -150,8 +152,10 @@ void KeeperDispatcher::requestThread() }; /// Waiting until previous append will be successful, or batch is big enough - while (!shutdown_called && !has_read_request && !prev_result_done() && - current_batch.size() <= max_batch_size && current_batch_bytes_size < max_batch_bytes_size) + while (!shutdown_called && !has_read_request && + !has_reconfig_request && !prev_result_done() && + current_batch.size() <= max_batch_size + && current_batch_bytes_size < max_batch_bytes_size) { try_get_request(); } @@ -175,7 +179,8 @@ void KeeperDispatcher::requestThread() if (result) { - if (has_read_request) /// If we will execute read request next, than we have to process result now + /// If we will execute read or reconfig next, we have to process result now + if (has_read_request || has_reconfig_request) forceWaitAndProcessResult(result, current_batch); } else @@ -189,6 +194,9 @@ void KeeperDispatcher::requestThread() prev_result = result; } + if (has_reconfig_request) + server->getKeeperStateMachine()->reconfigure(request); + /// Read request always goes after write batch (last request) if (has_read_request) {