From fc0bca1ff25b699cb3561ab45e801e357ee8bfdf Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 26 Nov 2020 15:26:41 +0300 Subject: [PATCH] Fix bug with buffered read --- src/Server/TestKeeperTCPHandler.cpp | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index 5f36decad12..b9bf549fa7b 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -89,6 +89,7 @@ struct SocketInterruptablePollWrapper #if defined(POCO_HAVE_FD_EPOLL) int rc; epoll_event evout[2]; + memset(evout, 0, sizeof(evout)); do { Poco::Timestamp start; @@ -284,16 +285,20 @@ void TestKeeperTCPHandler::runImpl() auto state = poll_wrapper->poll(session_timeout); if (state & SocketInterruptablePollWrapper::HAS_REQUEST) { - auto received_op = receiveRequest(); - if (received_op == Coordination::OpNum::Close) + do { - LOG_DEBUG(log, "Received close request for session #{}", session_id); - break; - } - else if (received_op == Coordination::OpNum::Heartbeat) - { - session_stopwatch.restart(); - } + auto received_op = receiveRequest(); + if (received_op == Coordination::OpNum::Close) + { + LOG_DEBUG(log, "Received close request for session #{}", session_id); + break; + } + else if (received_op == Coordination::OpNum::Heartbeat) + { + LOG_TRACE(log, "Received heartbeat for session #{}", session_id); + session_stopwatch.restart(); + } + } while (in->available()); } if (state & SocketInterruptablePollWrapper::HAS_RESPONSE) @@ -360,7 +365,7 @@ Coordination::OpNum TestKeeperTCPHandler::receiveRequest() Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); request->xid = xid; request->readImpl(*in); - int response_fd = poll_wrapper->getResponseFD(); + int response_fd = poll_wrapper->getResponseFD(); if (opnum != Coordination::OpNum::Close) { auto promise = std::make_shared>();