From 690045c67fa033a6b039571e95805fc10f630545 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 16 Apr 2021 21:31:23 +0300 Subject: [PATCH] Fix nasty bug --- contrib/NuRaft | 2 +- src/Common/ZooKeeper/ZooKeeperCommon.cpp | 11 +++++ src/Common/ZooKeeper/ZooKeeperCommon.h | 6 +++ src/Coordination/KeeperServer.h | 4 +- src/Coordination/KeeperStateMachine.cpp | 2 +- src/Coordination/KeeperStorageDispatcher.cpp | 33 +++++++++++---- .../src/jepsen/clickhouse_keeper/db.clj | 40 ++++++++++--------- 7 files changed, 68 insertions(+), 30 deletions(-) diff --git a/contrib/NuRaft b/contrib/NuRaft index d2feb5978b9..377f8e77491 160000 --- a/contrib/NuRaft +++ b/contrib/NuRaft @@ -1 +1 @@ -Subproject commit d2feb5978b979729a07c3ca76eaa4ab94cef4ceb +Subproject commit 377f8e77491d9f66ce8e32e88aae19dffe8dc4d7 diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index df1841d97e3..5e4125d5b36 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -205,6 +205,13 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +String ZooKeeperSetRequest::toString() const +{ + WriteBufferFromOwnString result; + result << "type: Set " << "xid: " << xid << " path: " << path << "data: " << data << '\n'; + return result.str(); +} + void ZooKeeperSetResponse::readImpl(ReadBuffer & in) { Coordination::read(stat, in); @@ -459,12 +466,14 @@ void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const { Coordination::write(internal_id, out); Coordination::write(session_timeout_ms, out); + Coordination::write(server_id, out); } void ZooKeeperSessionIDRequest::readImpl(ReadBuffer & in) { Coordination::read(internal_id, in); Coordination::read(session_timeout_ms, in); + Coordination::read(server_id, in); } Coordination::ZooKeeperResponsePtr ZooKeeperSessionIDRequest::makeResponse() const @@ -476,12 +485,14 @@ void ZooKeeperSessionIDResponse::readImpl(ReadBuffer & in) { Coordination::read(internal_id, in); Coordination::read(session_id, in); + Coordination::read(server_id, in); } void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const { Coordination::write(internal_id, out); Coordination::write(session_id, out); + Coordination::write(server_id, out); } void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index af66c9de7fc..af88a11b0f1 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -61,6 +61,7 @@ struct ZooKeeperRequest : virtual Request virtual ZooKeeperResponsePtr makeResponse() const = 0; virtual bool isReadRequest() const = 0; + virtual String toString() const { return "not implemented"; } }; using ZooKeeperRequestPtr = std::shared_ptr; @@ -274,6 +275,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest bool isReadRequest() const override { return false; } size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); } + String toString() const override; }; struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse @@ -396,6 +398,8 @@ struct ZooKeeperSessionIDRequest final : ZooKeeperRequest { int64_t internal_id; int64_t session_timeout_ms; + /// Who requested this session + int32_t server_id; Coordination::OpNum getOpNum() const override { return OpNum::SessionID; } String getPath() const override { return {}; } @@ -412,6 +416,8 @@ struct ZooKeeperSessionIDResponse final : ZooKeeperResponse { int64_t internal_id; int64_t session_id; + /// Who requested this session + int32_t server_id; void readImpl(ReadBuffer & in) override; diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 1e38ef8f2c0..11900ebb213 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -17,7 +17,7 @@ using RaftAppendResult = nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n std::shared_ptr response = std::make_shared(); response->internal_id = session_id_request.internal_id; response->session_id = session_id; + response->server_id = session_id_request.server_id; KeeperStorage::ResponseForSession response_for_session; response_for_session.session_id = -1; @@ -121,7 +122,6 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n } } - last_committed_idx = log_idx; return nullptr; } diff --git a/src/Coordination/KeeperStorageDispatcher.cpp b/src/Coordination/KeeperStorageDispatcher.cpp index 88f9b76aca9..0abf45b688d 100644 --- a/src/Coordination/KeeperStorageDispatcher.cpp +++ b/src/Coordination/KeeperStorageDispatcher.cpp @@ -63,6 +63,8 @@ void KeeperStorageDispatcher::requestThread() current_batch.emplace_back(request); /// Waiting until previous append will be successful, or batch is big enough + /// has_result == false && get_result_code == OK means that our request still not processed. + /// Sometimes NuRaft set errorcode without setting result, so we check both here. while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size) { /// Trying to get batch requests as fast as possible @@ -74,8 +76,11 @@ void KeeperStorageDispatcher::requestThread() has_read_request = true; break; } + else + { - current_batch.emplace_back(request); + current_batch.emplace_back(request); + } } if (shutdown_called) @@ -95,12 +100,12 @@ void KeeperStorageDispatcher::requestThread() /// Process collected write requests batch if (!current_batch.empty()) { - prev_result = server->putRequestBatch(current_batch); + auto result = server->putRequestBatch(current_batch); - if (prev_result) + if (result) { if (has_read_request) /// If we will execute read request next, than we have to process result now - forceWaitAndProcessResult(std::move(prev_result), std::move(current_batch)); + forceWaitAndProcessResult(std::move(result), std::move(current_batch)); } else { @@ -108,6 +113,7 @@ void KeeperStorageDispatcher::requestThread() } prev_batch = current_batch; + prev_result = result; } /// Read request always goes after write batch (last request) @@ -181,8 +187,9 @@ void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID) { const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast(*response); + /// Nobody waits for this session id - if (!new_session_id_response_callback.count(session_id_resp.internal_id)) + if (session_id_resp.server_id != server->getServerID() || !new_session_id_response_callback.count(session_id_resp.internal_id)) return; auto callback = new_session_id_response_callback[session_id_resp.internal_id]; @@ -196,6 +203,7 @@ void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination return; session_writer->second(response); + /// Session closed, no more writes if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) { @@ -394,7 +402,7 @@ void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult && resu result->get(); /// If we get some errors, than send them to clients - if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) + if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZOPERATIONTIMEOUT); else if (result->get_result_code() != nuraft::cmd_result_code::OK) addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZRUNTIMEINCONSISTENCY); @@ -408,6 +416,8 @@ int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms) std::shared_ptr request = std::make_shared(); request->internal_id = internal_session_id_counter.fetch_add(1); request->session_timeout_ms = session_timeout_ms; + request->server_id = server->getServerID(); + request_info.request = request; request_info.session_id = -1; @@ -415,16 +425,23 @@ int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms) auto future = promise->get_future(); { std::lock_guard lock(session_to_response_callback_mutex); - new_session_id_response_callback[request->internal_id] = [promise] (const Coordination::ZooKeeperResponsePtr & response) + new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response) { if (response->getOpNum() != Coordination::OpNum::SessionID) promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum())))); + auto session_id_response = dynamic_cast(*response); + if (session_id_response.internal_id != internal_id) + { + promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR, + "Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id))); + } + if (response->error != Coordination::Error::ZOK) promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error))); - promise->set_value(dynamic_cast(*response).session_id); + promise->set_value(session_id_response.session_id); }; } diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj index 2dfcbf23035..fdb6b233fec 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj @@ -128,22 +128,24 @@ (c/exec :rm :-rf pid-file-path) (c/exec :rm :-rf data-dir) (c/exec :rm :-rf logs-dir) - (c/exec :rm :-rf configs-dir)))));db/LogFiles - ;(log-files [_ test node] - ; (c/su - ; (if (cu/exists? pid-file-path) - ; (do - ; (info node "Collecting traces") - ; (collect-traces test node)) - ; (info node "Pid files doesn't exists")) - ; (kill-clickhouse! node test) - ; (if (cu/exists? coordination-data-dir) - ; (do - ; (info node "Coordination files exists, going to compress") - ; (c/cd data-dir - ; (c/exec :tar :czf "coordination.tar.gz" "coordination"))))) - ; (let [common-logs [stderr-file (str logs-dir "/clickhouse-server.log") (str data-dir "/coordination.tar.gz")] - ; gdb-log (str logs-dir "/gdb.log")] - ; (if (cu/exists? (str logs-dir "/gdb.log")) - ; (conj common-logs gdb-log) - ; common-logs))))) + (c/exec :rm :-rf configs-dir))) + + db/LogFiles + (log-files [_ test node] + (c/su + ;(if (cu/exists? pid-file-path) + ;(do + ; (info node "Collecting traces") + ; (collect-traces test node)) + ;(info node "Pid files doesn't exists")) + (kill-clickhouse! node test) + (if (cu/exists? coordination-data-dir) + (do + (info node "Coordination files exists, going to compress") + (c/cd data-dir + (c/exec :tar :czf "coordination.tar.gz" "coordination"))))) + (let [common-logs [stderr-file (str logs-dir "/clickhouse-server.log") (str data-dir "/coordination.tar.gz")] + gdb-log (str logs-dir "/gdb.log")] + (if (cu/exists? (str logs-dir "/gdb.log")) + (conj common-logs gdb-log) + common-logs)))))