Fixed tests

This commit is contained in:
Maksim Kita 2021-10-08 16:32:02 +03:00
parent 04047f76c7
commit 46ba649821
2 changed files with 11 additions and 7 deletions

View File

@ -11,6 +11,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int SYSTEM_ERROR;
}
KeeperDispatcher::KeeperDispatcher()
@ -239,7 +240,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
if (request->getOpNum() == Coordination::OpNum::Close)
{
if (!requests_queue->push(std::move(request_info)))
throw Exception("Cannot push request to queue", ErrorCodes::LOGICAL_ERROR);
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
}
else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
{
@ -304,7 +305,9 @@ void KeeperDispatcher::shutdown()
if (requests_queue)
{
requests_queue->finish();
request_thread.join();
if (request_thread.joinable())
request_thread.join();
}
responses_queue.finish();
@ -380,7 +383,7 @@ void KeeperDispatcher::sessionCleanerTask()
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->push(std::move(request_info)))
LOG_INFO(log, "Cannot push request to queue");
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
}
/// Remove session from registered sessions
@ -416,7 +419,7 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession
response->zxid = 0;
response->error = error;
if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}))
throw Exception(ErrorCodes::LOGICAL_ERROR,
throw Exception(ErrorCodes::SYSTEM_ERROR,
"Could not push error response xid {} zxid {} error message {} to responses queue",
response->xid,
response->zxid,

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYSTEM_ERROR;
}
namespace
@ -130,7 +131,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
for (auto & response_for_session : responses_for_sessions)
if (!responses_queue.push(response_for_session))
LOG_WARNING(log, "Could not push response {} into responses queue", response_for_session.session_id);
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id);
}
last_committed_idx = log_idx;
@ -221,7 +222,7 @@ void KeeperStateMachine::create_snapshot(
LOG_DEBUG(log, "In memory snapshot {} created, queueing task to flash to disk", s.get_last_log_idx());
/// Flush snapshot to disk in a separate thread.
if (!snapshots_queue.push(std::move(snapshot_task)))
LOG_DEBUG(log, "Cannot push snapshot task into queue");
LOG_WARNING(log, "Cannot push snapshot task into queue");
}
void KeeperStateMachine::save_logical_snp_obj(
@ -308,7 +309,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
for (const auto & response : responses)
if (!responses_queue.push(response))
LOG_WARNING(log, "Could not push response {} into responses queue", response.session_id);
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id);
}
std::vector<int64_t> KeeperStateMachine::getDeadSessions()