diff --git a/src/Coordination/NuKeeperServer.cpp b/src/Coordination/NuKeeperServer.cpp index 3910376ebda..aa1747ca3e6 100644 --- a/src/Coordination/NuKeeperServer.cpp +++ b/src/Coordination/NuKeeperServer.cpp @@ -43,7 +43,7 @@ void NuKeeperServer::addServer(int server_id_, const std::string & server_uri_, } -void NuKeeperServer::startup() +void NuKeeperServer::startup(int64_t operation_timeout_ms) { nuraft::raft_params params; params.heart_beat_interval_ = 500; @@ -51,8 +51,10 @@ void NuKeeperServer::startup() params.election_timeout_upper_bound_ = 2000; params.reserved_log_items_ = 5000; params.snapshot_distance_ = 5000; - params.client_req_timeout_ = 10000; + params.client_req_timeout_ = operation_timeout_ms; params.auto_forwarding_ = true; + /// For some reason may lead to a very long timeouts + params.use_bg_thread_for_urgent_commit_ = false; params.return_method_ = nuraft::raft_params::blocking; nuraft::asio_service::options asio_opts{}; @@ -197,6 +199,7 @@ int64_t NuKeeperServer::getSessionID(int64_t session_timeout_ms) std::lock_guard lock(append_entries_mutex); auto result = raft_instance->append_entries({entry}); + if (!result->get_accepted()) throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send session_id request to RAFT"); diff --git a/src/Coordination/NuKeeperServer.h b/src/Coordination/NuKeeperServer.h index 358a4212967..6151cd095e0 100644 --- a/src/Coordination/NuKeeperServer.h +++ b/src/Coordination/NuKeeperServer.h @@ -34,7 +34,7 @@ private: public: NuKeeperServer(int server_id_, const std::string & hostname_, int port_); - void startup(); + void startup(int64_t operation_timeout_ms); NuKeeperStorage::ResponsesForSessions putRequest(const NuKeeperStorage::RequestForSession & request); diff --git a/src/Coordination/NuKeeperStorageDispatcher.cpp b/src/Coordination/NuKeeperStorageDispatcher.cpp index fbf54106316..e327272cab1 100644 --- a/src/Coordination/NuKeeperStorageDispatcher.cpp +++ b/src/Coordination/NuKeeperStorageDispatcher.cpp @@ -111,6 +111,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati int myport; int32_t my_priority = 1; + operation_timeout = Poco::Timespan(0, config.getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000); Poco::Util::AbstractConfiguration::Keys keys; config.keys("test_keeper_server.raft_configuration", keys); bool my_can_become_leader = true; @@ -141,7 +142,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati server = std::make_unique(myid, myhostname, myport); try { - server->startup(); + server->startup(operation_timeout.totalMilliseconds()); if (shouldBuildQuorum(myid, my_priority, my_can_become_leader, server_configs)) { for (const auto & [id, hostname, port, can_become_leader, priority] : server_configs) diff --git a/src/Server/NuKeeperTCPHandler.cpp b/src/Server/NuKeeperTCPHandler.cpp index 706b57ee71d..31ffc744aaa 100644 --- a/src/Server/NuKeeperTCPHandler.cpp +++ b/src/Server/NuKeeperTCPHandler.cpp @@ -331,7 +331,9 @@ void NuKeeperTCPHandler::runImpl() { try { + LOG_INFO(log, "Requesting session ID for the new client"); session_id = nu_keeper_storage_dispatcher->getSessionID(session_timeout.totalMilliseconds()); + LOG_INFO(log, "Received session ID {}", session_id); } catch (const Exception & e) {