Better request/response logic

This commit is contained in:
alesapin 2021-02-19 14:06:17 +03:00
parent 6c9322bb2e
commit 3d954c4314

View File

@ -40,7 +40,7 @@ namespace ErrorCodes
struct PollResult
{
size_t ready_responses_count{0};
bool has_response{false};
bool has_requests{false};
bool error{false};
};
@ -92,8 +92,22 @@ struct SocketInterruptablePollWrapper
return pipe.fds_rw[1];
}
PollResult poll(Poco::Timespan remaining_time)
PollResult poll(Poco::Timespan remaining_time, const std::shared_ptr<ReadBufferFromPocoSocket> & in)
{
PollResult result{};
if (response_in.available() != 0)
{
UInt8 dummy;
readIntBinary(dummy, response_in);
result.has_response = true;
}
if (in->available() != 0)
result.has_requests = true;
if (result.has_response)
return result;
std::array<int, 2> outputs = {-1, -1};
#if defined(POCO_HAVE_FD_EPOLL)
int rc;
@ -148,7 +162,6 @@ struct SocketInterruptablePollWrapper
outputs[1] = pipe.fds_rw[0];
#endif
PollResult result{};
if (rc < 0)
{
result.error = true;
@ -169,16 +182,8 @@ struct SocketInterruptablePollWrapper
else
{
UInt8 dummy;
do
{
/// All ready responses stored in responses queue,
/// but we have to count amount of ready responses in pipe
/// and process them only. Otherwise states of response_in
/// and response queue will be inconsistent and race condition is possible.
readIntBinary(dummy, response_in);
result.ready_responses_count++;
}
while (response_in.available());
readIntBinary(dummy, response_in);
result.has_response = true;
}
}
}
@ -339,42 +344,32 @@ void NuKeeperTCPHandler::runImpl()
{
using namespace std::chrono_literals;
PollResult result = poll_wrapper->poll(session_timeout);
PollResult result = poll_wrapper->poll(session_timeout, in);
if (result.has_requests && !close_received)
{
size_t requests_read = 0;
do
auto [received_op, received_xid] = receiveRequest();
if (received_op == Coordination::OpNum::Close)
{
auto [received_op, received_xid] = receiveRequest();
if (received_op == Coordination::OpNum::Close)
{
LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id);
close_xid = received_xid;
close_received = true;
break;
}
else if (received_op == Coordination::OpNum::Heartbeat)
{
LOG_TRACE(log, "Received heartbeat for session #{}", session_id);
session_stopwatch.restart();
}
if (requests_read > 50)
break;
requests_read++;
LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id);
close_xid = received_xid;
close_received = true;
}
else if (received_op == Coordination::OpNum::Heartbeat)
{
LOG_TRACE(log, "Received heartbeat for session #{}", session_id);
session_stopwatch.restart();
}
while (in->available());
}
/// Process exact amount of responses from pipe
/// otherwise state of responses queue and signaling pipe
/// became inconsistent and race condition is possible.
while (result.ready_responses_count != 0)
if (result.has_response)
{
Coordination::ZooKeeperResponsePtr response;
if (!responses->tryPop(response))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have at least {} ready responses, but queue is empty. It's a bug.", result.ready_responses_count);
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");
if (response->xid == close_xid)
{
@ -388,7 +383,6 @@ void NuKeeperTCPHandler::runImpl()
nu_keeper_storage_dispatcher->finishSession(session_id);
return;
}
result.ready_responses_count--;
}
if (result.error)