Merge branch 'fix_race_test_keeper' into in_memory_raft

This commit is contained in:
alesapin 2021-01-21 15:45:41 +03:00
commit b8cdd8e375
3 changed files with 15 additions and 1 deletions

View File

@ -49,7 +49,7 @@ void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordina
std::lock_guard lock(session_to_response_callback_mutex);
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id);
return;
session_writer->second(response);
/// Session closed, no more writes
@ -128,4 +128,12 @@ void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperR
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
void TestKeeperStorageDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);
if (session_it != session_to_response_callback.end())
session_to_response_callback.erase(session_it);
}
}

View File

@ -53,6 +53,8 @@ public:
return storage.getSessionID();
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)
void finishSession(int64_t session_id);
};
}

View File

@ -390,7 +390,11 @@ void TestKeeperTCPHandler::finish()
{
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = close_xid;
/// Put close request (so storage will remove all info about session)
test_keeper_storage_dispatcher->putRequest(request, session_id);
/// We don't need any callbacks because session can be already dead and
/// nobody wait for response
test_keeper_storage_dispatcher->finishSession(session_id);
}
std::pair<Coordination::OpNum, Coordination::XID> TestKeeperTCPHandler::receiveRequest()