diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index bf407ba96b7..354540c797d 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -33,9 +33,9 @@ namespace ErrorCodes struct PollResult { - bool has_responses; - bool has_requests; - bool error; + size_t ready_responses_count{0}; + bool has_requests{false}; + bool error{false}; }; /// Queue with mutex. As simple as possible. @@ -191,10 +191,17 @@ struct SocketInterruptablePollWrapper result.has_requests = true; else { - /// Skip all of them, we are not interested in exact - /// amount because responses ordered in responses queue. - response_in.ignore(); - result.has_responses = true; + UInt8 dummy; + do + { + /// All ready responses stored in responses queue, + /// but we have to count amount of ready responses in pipe + /// and process them only. Otherwise states of response_in + /// and response queue will be inconsistent and race condition is possible. + readIntBinary(dummy, response_in); + result.ready_responses_count++; + } + while (response_in.available()); } } } @@ -349,23 +356,27 @@ void TestKeeperTCPHandler::runImpl() while (in->available()); } - if (result.has_responses) + /// Process exact amout of responses from pipe + /// otherwise state of responses queue and signaling pipe + /// became inconsistent and race condition is possible. + while (result.ready_responses_count != 0) { Coordination::ZooKeeperResponsePtr response; - while (responses->tryPop(response)) - { - if (response->xid == close_xid) - { - LOG_DEBUG(log, "Session #{} successfully closed", session_id); - return; - } + if (!responses->tryPop(response)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have at least {} ready responses, but queue is empty. It's a bug.", result.ready_responses_count); - if (response->error == Coordination::Error::ZOK) - response->write(*out); - else if (response->xid != Coordination::WATCH_XID) - response->write(*out); - /// skipping bad response for watch + if (response->xid == close_xid) + { + LOG_DEBUG(log, "Session #{} successfully closed", session_id); + return; } + + if (response->error == Coordination::Error::ZOK) + response->write(*out); + else if (response->xid != Coordination::WATCH_XID) + response->write(*out); + /// skipping bad response for watch + result.ready_responses_count--; } if (result.error)