diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index 9c699ee298a..278d36f9245 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -326,6 +326,12 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in) } } +bool ZooKeeperMultiRequest::isReadRequest() const +{ + /// Possibly we can do better + return false; +} + void ZooKeeperMultiResponse::readImpl(ReadBuffer & in) { for (auto & response : responses) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 9adb0c06e4c..b2c18c31798 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -60,6 +60,7 @@ struct ZooKeeperRequest : virtual Request static std::shared_ptr read(ReadBuffer & in); virtual ZooKeeperResponsePtr makeResponse() const = 0; + virtual bool isReadRequest() const = 0; }; using ZooKeeperRequestPtr = std::shared_ptr; @@ -71,6 +72,7 @@ struct ZooKeeperHeartbeatRequest final : ZooKeeperRequest void writeImpl(WriteBuffer &) const override {} void readImpl(ReadBuffer &) override {} ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return true; } }; struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse @@ -104,6 +106,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } }; struct ZooKeeperAuthResponse final : ZooKeeperResponse @@ -122,6 +125,7 @@ struct ZooKeeperCloseRequest final : ZooKeeperRequest void readImpl(ReadBuffer &) override {} ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } }; struct ZooKeeperCloseResponse final : ZooKeeperResponse @@ -146,6 +150,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } }; struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse @@ -167,6 +172,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } }; struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse @@ -183,6 +189,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return !has_watch; } }; struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse @@ -199,6 +206,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return !has_watch; } }; struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse @@ -217,6 +225,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } }; struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse @@ -232,6 +241,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return !has_watch; } }; struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest @@ -261,6 +271,7 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return !has_watch; } }; struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse @@ -290,6 +301,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest void readImpl(ReadBuffer & in) override; ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override; }; struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse diff --git a/src/Coordination/NuKeeperServer.cpp b/src/Coordination/NuKeeperServer.cpp index 6d70eff1121..8b8288424d9 100644 --- a/src/Coordination/NuKeeperServer.cpp +++ b/src/Coordination/NuKeeperServer.cpp @@ -146,34 +146,41 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(n TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests) { - std::vector> entries; - for (const auto & [session_id, request] : requests) + if (requests.size() == 1 && requests[0].request->isReadRequest()) { - ops_mapping[session_id][request->xid] = request->makeResponse(); - entries.push_back(getZooKeeperLogEntry(session_id, request)); + return state_machine->processReadRequest(requests[0]); } - - auto result = raft_instance->append_entries(entries); - if (!result->get_accepted()) - throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send requests to RAFT, mostly because we are not leader, code {}, message: '{}'", result->get_result_code(), result->get_result_str()); - - if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) + else { - TestKeeperStorage::ResponsesForSessions responses; + std::vector> entries; for (const auto & [session_id, request] : requests) { - auto response = request->makeResponse(); - response->xid = request->xid; - response->zxid = 0; /// FIXME what we can do with it? - response->error = Coordination::Error::ZOPERATIONTIMEOUT; - responses.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response}); + ops_mapping[session_id][request->xid] = request->makeResponse(); + entries.push_back(getZooKeeperLogEntry(session_id, request)); } - return responses; - } - else if (result->get_result_code() != nuraft::cmd_result_code::OK) - throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str()); - return readZooKeeperResponses(result->get()); + auto result = raft_instance->append_entries(entries); + if (!result->get_accepted()) + throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send requests to RAFT, mostly because we are not leader, code {}, message: '{}'", result->get_result_code(), result->get_result_str()); + + if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) + { + TestKeeperStorage::ResponsesForSessions responses; + for (const auto & [session_id, request] : requests) + { + auto response = request->makeResponse(); + response->xid = request->xid; + response->zxid = 0; /// FIXME what we can do with it? + response->error = Coordination::Error::ZOPERATIONTIMEOUT; + responses.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response}); + } + return responses; + } + else if (result->get_result_code() != nuraft::cmd_result_code::OK) + throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str()); + + return readZooKeeperResponses(result->get()); + } } diff --git a/src/Coordination/NuKeeperStateMachine.cpp b/src/Coordination/NuKeeperStateMachine.cpp index 52c82f44784..9f4572c02e0 100644 --- a/src/Coordination/NuKeeperStateMachine.cpp +++ b/src/Coordination/NuKeeperStateMachine.cpp @@ -223,4 +223,10 @@ int NuKeeperStateMachine::read_logical_snp_obj( return 0; } +TestKeeperStorage::ResponsesForSessions NuKeeperStateMachine::processReadRequest(const TestKeeperStorage::RequestForSession & request_for_session) +{ + std::lock_guard lock(storage_lock); + return storage.processRequest(request_for_session.request, request_for_session.session_id); +} + } diff --git a/src/Coordination/NuKeeperStateMachine.h b/src/Coordination/NuKeeperStateMachine.h index a120e3f1cf6..368e088a2f9 100644 --- a/src/Coordination/NuKeeperStateMachine.h +++ b/src/Coordination/NuKeeperStateMachine.h @@ -47,6 +47,8 @@ public: return storage; } + TestKeeperStorage::ResponsesForSessions processReadRequest(const TestKeeperStorage::RequestForSession & requests); + private: struct StorageSnapshot {