Fix livelock

This commit is contained in:
alesapin 2020-11-12 17:43:16 +03:00
parent 5e1c84e04f
commit e19d1430db

View File

@ -1070,6 +1070,7 @@ void ZooKeeper::sendThread()
setThreadName("ZooKeeperSend"); setThreadName("ZooKeeperSend");
auto prev_heartbeat_time = clock::now(); auto prev_heartbeat_time = clock::now();
bool tried_to_send_close = false;
try try
{ {
@ -1099,6 +1100,15 @@ void ZooKeeper::sendThread()
std::lock_guard lock(operations_mutex); std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info; operations[info.request->xid] = info;
} }
else
{
/// We set this variable only once. If we will
/// successfully send close, than this thread will just
/// finish. If we will got an exception while sending
/// close, than thread will also finish and finalization
/// will be completed by some other thread.
tried_to_send_close = true;
}
if (info.watch) if (info.watch)
{ {
@ -1135,6 +1145,12 @@ void ZooKeeper::sendThread()
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
/// If we have tried to send close and got an exception than
/// finalization is already started by receiveThread and we cannot do
/// anything better than just exit.
///
/// Otherwise we should correctly finalize
if (!tried_to_send_close)
finalize(true, false); finalize(true, false);
} }
} }
@ -1346,6 +1362,16 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
if (expired) if (expired)
return; return;
auto expire_session = [&]
{
std::lock_guard lock(push_request_mutex);
if (!expired)
{
expired = true;
active_session_metric_increment.destroy();
}
};
try try
{ {
if (!error_send) if (!error_send)
@ -1359,8 +1385,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
{ {
/// This happens for example, when "Cannot push request to queue within operation timeout". /// This happens for example, when "Cannot push request to queue within operation timeout".
/// Just mark session expired in case of error on close request. /// Just mark session expired in case of error on close request.
std::lock_guard lock(push_request_mutex); expire_session();
expired = true;
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
@ -1369,12 +1394,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
} }
/// Set expired flag after we sent close event /// Set expired flag after we sent close event
{ expire_session();
std::lock_guard lock(push_request_mutex);
expired = true;
}
active_session_metric_increment.destroy();
try try
{ {