From 46ba6498218af0fd5a9bd4bc121ac5ff85a603a4 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Fri, 8 Oct 2021 16:32:02 +0300 Subject: [PATCH] Fixed tests --- src/Coordination/KeeperDispatcher.cpp | 11 +++++++---- src/Coordination/KeeperStateMachine.cpp | 7 ++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 3823a4129e3..a28e8d96915 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -11,6 +11,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int TIMEOUT_EXCEEDED; + extern const int SYSTEM_ERROR; } KeeperDispatcher::KeeperDispatcher() @@ -239,7 +240,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ if (request->getOpNum() == Coordination::OpNum::Close) { if (!requests_queue->push(std::move(request_info))) - throw Exception("Cannot push request to queue", ErrorCodes::LOGICAL_ERROR); + throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); } else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds())) { @@ -304,7 +305,9 @@ void KeeperDispatcher::shutdown() if (requests_queue) { requests_queue->finish(); - request_thread.join(); + + if (request_thread.joinable()) + request_thread.join(); } responses_queue.finish(); @@ -380,7 +383,7 @@ void KeeperDispatcher::sessionCleanerTask() { std::lock_guard lock(push_request_mutex); if (!requests_queue->push(std::move(request_info))) - LOG_INFO(log, "Cannot push request to queue"); + LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions"); } /// Remove session from registered sessions @@ -416,7 +419,7 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession response->zxid = 0; response->error = error; if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response})) - throw Exception(ErrorCodes::LOGICAL_ERROR, + throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push error response xid {} zxid {} error message {} to responses queue", response->xid, response->zxid, diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index b1c8aab697e..0a15b504128 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -12,6 +12,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int SYSTEM_ERROR; } namespace @@ -130,7 +131,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); for (auto & response_for_session : responses_for_sessions) if (!responses_queue.push(response_for_session)) - LOG_WARNING(log, "Could not push response {} into responses queue", response_for_session.session_id); + throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id); } last_committed_idx = log_idx; @@ -221,7 +222,7 @@ void KeeperStateMachine::create_snapshot( LOG_DEBUG(log, "In memory snapshot {} created, queueing task to flash to disk", s.get_last_log_idx()); /// Flush snapshot to disk in a separate thread. if (!snapshots_queue.push(std::move(snapshot_task))) - LOG_DEBUG(log, "Cannot push snapshot task into queue"); + LOG_WARNING(log, "Cannot push snapshot task into queue"); } void KeeperStateMachine::save_logical_snp_obj( @@ -308,7 +309,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); for (const auto & response : responses) if (!responses_queue.push(response)) - LOG_WARNING(log, "Could not push response {} into responses queue", response.session_id); + throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id); } std::vector KeeperStateMachine::getDeadSessions()