mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
More correct epoll usage
This commit is contained in:
parent
3d954c4314
commit
5c9420c077
@ -70,14 +70,14 @@ struct SocketInterruptablePollWrapper
|
||||
if (epollfd < 0)
|
||||
throwFromErrno("Cannot epoll_create", ErrorCodes::SYSTEM_ERROR);
|
||||
|
||||
socket_event.events = EPOLLIN | EPOLLERR;
|
||||
socket_event.events = EPOLLIN | EPOLLERR | EPOLLPRI;
|
||||
socket_event.data.fd = sockfd;
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &socket_event) < 0)
|
||||
{
|
||||
::close(epollfd);
|
||||
throwFromErrno("Cannot insert socket into epoll queue", ErrorCodes::SYSTEM_ERROR);
|
||||
}
|
||||
pipe_event.events = EPOLLIN | EPOLLERR;
|
||||
pipe_event.events = EPOLLIN | EPOLLERR | EPOLLPRI;
|
||||
pipe_event.data.fd = pipe.fds_rw[0];
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, pipe.fds_rw[0], &pipe_event) < 0)
|
||||
{
|
||||
@ -108,11 +108,12 @@ struct SocketInterruptablePollWrapper
|
||||
if (result.has_response)
|
||||
return result;
|
||||
|
||||
std::array<int, 2> outputs = {-1, -1};
|
||||
bool socket_ready = false;
|
||||
bool fd_ready = false;
|
||||
#if defined(POCO_HAVE_FD_EPOLL)
|
||||
int rc;
|
||||
epoll_event evout[2];
|
||||
memset(evout, 0, sizeof(evout));
|
||||
evout[0].data.fd = evout[1].data.fd = -1;
|
||||
do
|
||||
{
|
||||
Poco::Timestamp start;
|
||||
@ -129,10 +130,13 @@ struct SocketInterruptablePollWrapper
|
||||
}
|
||||
while (rc < 0 && errno == EINTR);
|
||||
|
||||
if (rc >= 1 && evout[0].events & EPOLLIN)
|
||||
outputs[0] = evout[0].data.fd;
|
||||
if (rc == 2 && evout[1].events & EPOLLIN)
|
||||
outputs[1] = evout[1].data.fd;
|
||||
for (int i = 0; i < rc; ++i)
|
||||
{
|
||||
if (evout[i].data.fd == sockfd)
|
||||
socket_ready = true;
|
||||
if (evout[i].data.fd == pipe.fds_rw[0])
|
||||
fd_ready = true;
|
||||
}
|
||||
#else
|
||||
pollfd poll_buf[2];
|
||||
poll_buf[0].fd = sockfd;
|
||||
@ -156,10 +160,11 @@ struct SocketInterruptablePollWrapper
|
||||
}
|
||||
}
|
||||
while (rc < 0 && errno == POCO_EINTR);
|
||||
|
||||
if (rc >= 1 && poll_buf[0].revents & POLLIN)
|
||||
outputs[0] = sockfd;
|
||||
socket_ready = true;
|
||||
if (rc == 2 && poll_buf[1].revents & POLLIN)
|
||||
outputs[1] = pipe.fds_rw[0];
|
||||
fd_ready = true;
|
||||
#endif
|
||||
|
||||
if (rc < 0)
|
||||
@ -173,19 +178,15 @@ struct SocketInterruptablePollWrapper
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto fd : outputs)
|
||||
if (socket_ready)
|
||||
{
|
||||
if (fd != -1)
|
||||
{
|
||||
if (fd == sockfd)
|
||||
result.has_requests = true;
|
||||
else
|
||||
{
|
||||
UInt8 dummy;
|
||||
readIntBinary(dummy, response_in);
|
||||
result.has_response = true;
|
||||
}
|
||||
}
|
||||
result.has_requests = true;
|
||||
}
|
||||
if (fd_ready)
|
||||
{
|
||||
UInt8 dummy;
|
||||
readIntBinary(dummy, response_in);
|
||||
result.has_response = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@ -368,6 +369,7 @@ void NuKeeperTCPHandler::runImpl()
|
||||
if (result.has_response)
|
||||
{
|
||||
Coordination::ZooKeeperResponsePtr response;
|
||||
|
||||
if (!responses->tryPop(response))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user