Improve performance and fix condition variable wait

This commit is contained in:
alesapin 2021-02-24 14:54:14 +03:00
parent b0065334e2
commit a1901383ae
3 changed files with 81 additions and 89 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit c250d5ad58c82e751264df40a94da682a2fc3519
Subproject commit 3af9f53d9e2e36f0c975a4cd665b446b875c4dde

View File

@ -46,7 +46,7 @@ NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, co
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, responses_queue(responses_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuRaftStateMachine"))
, log(&Poco::Logger::get("NuKeeperStateMachine"))
{
LOG_DEBUG(log, "Created nukeeper state machine");
}

View File

@ -40,7 +40,7 @@ namespace ErrorCodes
struct PollResult
{
bool has_response{false};
size_t responses_count{0};
bool has_requests{false};
bool error{false};
};
@ -94,101 +94,90 @@ struct SocketInterruptablePollWrapper
PollResult poll(Poco::Timespan remaining_time, const std::shared_ptr<ReadBufferFromPocoSocket> & in)
{
PollResult result{};
if (response_in.available() != 0)
{
UInt8 dummy;
readIntBinary(dummy, response_in);
result.has_response = true;
}
if (in->available() != 0)
result.has_requests = true;
if (result.has_response)
return result;
bool socket_ready = false;
bool fd_ready = false;
#if defined(POCO_HAVE_FD_EPOLL)
int rc;
epoll_event evout[2];
evout[0].data.fd = evout[1].data.fd = -1;
do
{
Poco::Timestamp start;
rc = epoll_wait(epollfd, evout, 2, remaining_time.totalMilliseconds());
if (rc < 0 && errno == EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remaining_time)
remaining_time -= waited;
else
remaining_time = 0;
}
}
while (rc < 0 && errno == EINTR);
for (int i = 0; i < rc; ++i)
{
if (evout[i].data.fd == sockfd)
socket_ready = true;
if (evout[i].data.fd == pipe.fds_rw[0])
fd_ready = true;
}
#else
pollfd poll_buf[2];
poll_buf[0].fd = sockfd;
poll_buf[0].events = POLLIN;
poll_buf[1].fd = pipe.fds_rw[0];
poll_buf[1].events = POLLIN;
int rc;
do
{
Poco::Timestamp start;
rc = ::poll(poll_buf, 2, remaining_time.totalMilliseconds());
if (rc < 0 && errno == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remaining_time)
remaining_time -= waited;
else
remaining_time = 0;
}
}
while (rc < 0 && errno == POCO_EINTR);
if (rc >= 1 && poll_buf[0].revents & POLLIN)
if (in->available() != 0)
socket_ready = true;
if (rc == 2 && poll_buf[1].revents & POLLIN)
if (response_in.available() != 0)
fd_ready = true;
int rc = 0;
if (!fd_ready)
{
#if defined(POCO_HAVE_FD_EPOLL)
epoll_event evout[2];
evout[0].data.fd = evout[1].data.fd = -1;
do
{
Poco::Timestamp start;
rc = epoll_wait(epollfd, evout, 2, remaining_time.totalMilliseconds());
if (rc < 0 && errno == EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remaining_time)
remaining_time -= waited;
else
remaining_time = 0;
}
}
while (rc < 0 && errno == EINTR);
for (int i = 0; i < rc; ++i)
{
if (evout[i].data.fd == sockfd)
socket_ready = true;
if (evout[i].data.fd == pipe.fds_rw[0])
fd_ready = true;
}
#else
pollfd poll_buf[2];
poll_buf[0].fd = sockfd;
poll_buf[0].events = POLLIN;
poll_buf[1].fd = pipe.fds_rw[0];
poll_buf[1].events = POLLIN;
do
{
Poco::Timestamp start;
rc = ::poll(poll_buf, 2, remaining_time.totalMilliseconds());
if (rc < 0 && errno == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remaining_time)
remaining_time -= waited;
else
remaining_time = 0;
}
}
while (rc < 0 && errno == POCO_EINTR);
if (rc >= 1 && poll_buf[0].revents & POLLIN)
socket_ready = true;
if (rc == 2 && poll_buf[1].revents & POLLIN)
fd_ready = true;
#endif
}
PollResult result{};
result.has_requests = socket_ready;
if (fd_ready)
{
UInt8 dummy;
readIntBinary(dummy, response_in);
result.responses_count = 1;
auto available = response_in.available();
response_in.ignore(available);
result.responses_count += available;
}
if (rc < 0)
{
result.error = true;
return result;
}
else if (rc == 0)
{
return result;
}
else
{
if (socket_ready)
{
result.has_requests = true;
}
if (fd_ready)
{
UInt8 dummy;
readIntBinary(dummy, response_in);
result.has_response = true;
}
}
return result;
}
@ -366,7 +355,7 @@ void NuKeeperTCPHandler::runImpl()
/// Process exact amount of responses from pipe
/// otherwise state of responses queue and signaling pipe
/// became inconsistent and race condition is possible.
if (result.has_response)
while (result.responses_count != 0)
{
Coordination::ZooKeeperResponsePtr response;
@ -378,6 +367,7 @@ void NuKeeperTCPHandler::runImpl()
LOG_DEBUG(log, "Session #{} successfully closed", session_id);
return;
}
response->write(*out);
if (response->error == Coordination::Error::ZSESSIONEXPIRED)
{
@ -385,6 +375,8 @@ void NuKeeperTCPHandler::runImpl()
nu_keeper_storage_dispatcher->finishSession(session_id);
return;
}
result.responses_count--;
}
if (result.error)