Fix one more race in TestKeeper

This commit is contained in:
alesapin 2021-01-22 13:02:48 +03:00
parent eb4163c1fa
commit 83862799cd

View File

@ -33,9 +33,9 @@ namespace ErrorCodes
struct PollResult
{
bool has_responses;
bool has_requests;
bool error;
size_t ready_responses_count{0};
bool has_requests{false};
bool error{false};
};
/// Queue with mutex. As simple as possible.
@ -191,10 +191,17 @@ struct SocketInterruptablePollWrapper
result.has_requests = true;
else
{
/// Skip all of them, we are not interested in exact
/// amount because responses ordered in responses queue.
response_in.ignore();
result.has_responses = true;
UInt8 dummy;
do
{
/// All ready responses stored in responses queue,
/// but we have to count amount of ready responses in pipe
/// and process them only. Otherwise states of response_in
/// and response queue will be inconsistent and race condition is possible.
readIntBinary(dummy, response_in);
result.ready_responses_count++;
}
while (response_in.available());
}
}
}
@ -349,23 +356,27 @@ void TestKeeperTCPHandler::runImpl()
while (in->available());
}
if (result.has_responses)
/// Process exact amout of responses from pipe
/// otherwise state of responses queue and signaling pipe
/// became inconsistent and race condition is possible.
while (result.ready_responses_count != 0)
{
Coordination::ZooKeeperResponsePtr response;
while (responses->tryPop(response))
{
if (response->xid == close_xid)
{
LOG_DEBUG(log, "Session #{} successfully closed", session_id);
return;
}
if (!responses->tryPop(response))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have at least {} ready responses, but queue is empty. It's a bug.", result.ready_responses_count);
if (response->error == Coordination::Error::ZOK)
response->write(*out);
else if (response->xid != Coordination::WATCH_XID)
response->write(*out);
/// skipping bad response for watch
if (response->xid == close_xid)
{
LOG_DEBUG(log, "Session #{} successfully closed", session_id);
return;
}
if (response->error == Coordination::Error::ZOK)
response->write(*out);
else if (response->xid != Coordination::WATCH_XID)
response->write(*out);
/// skipping bad response for watch
result.ready_responses_count--;
}
if (result.error)