Fix obvious deadlock

This commit is contained in:
alesapin 2021-01-26 10:47:04 +03:00
parent 045935151f
commit 10cec45e53
2 changed files with 30 additions and 19 deletions

View File

@ -46,7 +46,7 @@ void NuKeeperServer::startup()
params.election_timeout_upper_bound_ = 400; params.election_timeout_upper_bound_ = 400;
params.reserved_log_items_ = 5000; params.reserved_log_items_ = 5000;
params.snapshot_distance_ = 5000; params.snapshot_distance_ = 5000;
params.client_req_timeout_ = 3000; params.client_req_timeout_ = 10000;
params.return_method_ = nuraft::raft_params::blocking; params.return_method_ = nuraft::raft_params::blocking;
raft_instance = launcher.init( raft_instance = launcher.init(
@ -145,10 +145,23 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKe
auto result = raft_instance->append_entries(entries); auto result = raft_instance->append_entries(entries);
if (!result->get_accepted()) if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send requests to RAFT, mostly because we are not leader"); throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send requests to RAFT, mostly because we are not leader, code {}, message: '{}'", result->get_result_code(), result->get_result_str());
if (result->get_result_code() != nuraft::cmd_result_code::OK) if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
throw Exception(ErrorCodes::RAFT_ERROR, "Requests failed"); {
TestKeeperStorage::ResponsesForSessions responses;
for (const auto & [session_id, request] : requests)
{
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0; /// FIXME what we can do with it?
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
}
return responses;
}
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str());
return readZooKeeperResponses(result->get()); return readZooKeeperResponses(result->get());
} }

View File

@ -14,30 +14,28 @@ namespace ErrorCodes
void TestKeeperStorageDispatcher::processingThread() void TestKeeperStorageDispatcher::processingThread()
{ {
setThreadName("TestKeeperSProc"); setThreadName("TestKeeperSProc");
try while (!shutdown)
{ {
while (!shutdown) TestKeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
if (requests_queue.tryPop(request, max_wait))
{ {
TestKeeperStorage::RequestForSession request; if (shutdown)
break;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); try
if (requests_queue.tryPop(request, max_wait))
{ {
if (shutdown)
break;
auto responses = server->putRequests({request}); auto responses = server->putRequests({request});
for (const auto & response_for_session : responses) for (const auto & response_for_session : responses)
setResponse(response_for_session.session_id, response_for_session.response); setResponse(response_for_session.session_id, response_for_session.response);
} }
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize();
}
} }
void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)