ClickHouse/src/DataStreams/RemoteQueryExecutorReadContext.h

273 lines
8.0 KiB
C++
Raw Normal View History

2020-12-04 11:00:32 +00:00
#pragma once
2020-12-14 16:16:08 +00:00
#if defined(OS_LINUX)
2020-12-04 11:00:32 +00:00
#include <sys/epoll.h>
2020-12-10 09:30:43 +00:00
#include <Common/Fiber.h>
#include <Common/FiberStack.h>
#include <Common/TimerDescriptor.h>
2020-12-04 11:00:32 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_SOCKET;
extern const int CANNOT_OPEN_FILE;
extern const int SOCKET_TIMEOUT;
}
class RemoteQueryExecutorReadContext
{
public:
using Self = RemoteQueryExecutorReadContext;
std::atomic_bool is_read_in_progress = false;
2020-12-04 11:00:32 +00:00
Packet packet;
std::exception_ptr exception;
2020-12-15 10:08:13 +00:00
FiberStack stack;
2020-12-04 11:00:32 +00:00
boost::context::fiber fiber;
2020-12-22 08:55:21 +00:00
/// This mutex for fiber is needed because fiber could be destroyed in cancel method from another thread.
2020-12-14 16:16:08 +00:00
std::mutex fiber_lock;
2020-12-04 11:00:32 +00:00
Poco::Timespan receive_timeout;
MultiplexedConnections & connections;
Poco::Net::Socket * last_used_socket = nullptr;
2020-12-04 11:00:32 +00:00
2020-12-22 08:55:21 +00:00
/// Here we have three descriptors we are going to wait:
/// * socket_fd is a descriptor of connection. It may be changed in case of reading from several replicas.
/// * timer is a timerfd descriptor to manually check socket timeout
/// * pipe_fd is a pipe we use to cancel query and socket polling by executor.
/// We put those descriptors into our own epoll_fd which is used by external executor.
2020-12-04 11:00:32 +00:00
TimerDescriptor timer{CLOCK_MONOTONIC, 0};
2020-12-04 13:35:24 +00:00
int socket_fd = -1;
2020-12-04 11:00:32 +00:00
int epoll_fd;
2020-12-10 12:20:18 +00:00
int pipe_fd[2];
2020-12-04 11:00:32 +00:00
explicit RemoteQueryExecutorReadContext(MultiplexedConnections & connections_) : connections(connections_)
{
2020-12-04 13:35:24 +00:00
epoll_fd = epoll_create(2);
if (-1 == epoll_fd)
2020-12-04 11:00:32 +00:00
throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE);
2020-12-10 12:20:18 +00:00
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE);
{
epoll_event socket_event;
socket_event.events = EPOLLIN | EPOLLPRI;
socket_event.data.fd = pipe_fd[0];
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd[0], &socket_event))
throwFromErrno("Cannot add pipe descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
2020-12-04 11:00:32 +00:00
{
epoll_event timer_event;
timer_event.events = EPOLLIN | EPOLLPRI;
timer_event.data.fd = timer.getDescriptor();
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event))
throwFromErrno("Cannot add timer descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
auto routine = Routine{connections, *this};
2020-12-14 16:16:08 +00:00
fiber = boost::context::fiber(std::allocator_arg_t(), stack, std::move(routine));
2020-12-04 11:00:32 +00:00
}
2020-12-04 13:35:24 +00:00
void setSocket(Poco::Net::Socket & socket)
2020-12-04 11:00:32 +00:00
{
2020-12-04 13:35:24 +00:00
int fd = socket.impl()->sockfd();
if (fd == socket_fd)
return;
epoll_event socket_event;
socket_event.events = EPOLLIN | EPOLLPRI;
socket_event.data.fd = fd;
if (socket_fd != -1)
2020-12-04 11:00:32 +00:00
{
2020-12-04 13:35:24 +00:00
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socket_fd, &socket_event))
throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
2020-12-04 11:00:32 +00:00
}
2020-12-04 13:35:24 +00:00
socket_fd = fd;
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &socket_event))
throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
receive_timeout = socket.impl()->getReceiveTimeout();
2020-12-04 11:00:32 +00:00
}
2020-12-10 12:20:18 +00:00
bool checkTimeout() const
2020-12-04 11:00:32 +00:00
{
try
{
2020-12-10 12:20:18 +00:00
return checkTimeoutImpl();
2020-12-04 11:00:32 +00:00
}
catch (DB::Exception & e)
{
if (last_used_socket)
e.addMessage(" while reading from socket ({})", last_used_socket->peerAddress().toString());
2020-12-04 13:35:24 +00:00
throw;
2020-12-04 11:00:32 +00:00
}
}
2020-12-10 12:20:18 +00:00
bool checkTimeoutImpl() const
2020-12-04 11:00:32 +00:00
{
2020-12-10 12:20:18 +00:00
epoll_event events[3];
events[0].data.fd = events[1].data.fd = events[2].data.fd = -1;
2020-12-04 11:00:32 +00:00
/// Wait for epoll_fd will not block if it was polled externally.
2020-12-10 12:20:18 +00:00
int num_events = epoll_wait(epoll_fd, events, 3, 0);
2020-12-04 11:00:32 +00:00
if (num_events == -1)
throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET);
bool is_socket_ready = false;
2020-12-10 12:20:18 +00:00
bool is_pipe_alarmed = false;
2020-12-04 11:00:32 +00:00
bool has_timer_alarm = false;
for (int i = 0; i < num_events; ++i)
{
if (events[i].data.fd == socket_fd)
is_socket_ready = true;
if (events[i].data.fd == timer.getDescriptor())
has_timer_alarm = true;
2020-12-10 12:20:18 +00:00
if (events[i].data.fd == pipe_fd[0])
is_pipe_alarmed = true;
2020-12-04 11:00:32 +00:00
}
2020-12-10 12:20:18 +00:00
if (is_pipe_alarmed)
return false;
2020-12-04 11:00:32 +00:00
if (has_timer_alarm && !is_socket_ready)
{
/// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception.
timer.drain();
throw NetException("Timeout exceeded", ErrorCodes::SOCKET_TIMEOUT);
}
2020-12-10 12:20:18 +00:00
return true;
2020-12-04 11:00:32 +00:00
}
void setTimer() const
{
/// Did not get packet yet. Init timeout for the next async reading.
timer.reset();
if (receive_timeout.totalMicroseconds())
timer.setRelative(receive_timeout);
}
2020-12-14 16:16:08 +00:00
bool resumeRoutine()
2020-12-04 11:00:32 +00:00
{
if (is_read_in_progress.load(std::memory_order_relaxed) && !checkTimeout())
2020-12-10 12:20:18 +00:00
return false;
2020-12-04 13:35:24 +00:00
{
2020-12-14 16:16:08 +00:00
std::lock_guard guard(fiber_lock);
2020-12-04 13:35:24 +00:00
if (!fiber)
return false;
fiber = std::move(fiber).resume();
}
2020-12-04 11:00:32 +00:00
if (exception)
std::rethrow_exception(std::move(exception));
2020-12-04 13:35:24 +00:00
return true;
}
2020-12-14 16:16:08 +00:00
void cancel()
2020-12-04 13:35:24 +00:00
{
2020-12-14 16:16:08 +00:00
std::lock_guard guard(fiber_lock);
2020-12-22 08:55:21 +00:00
/// It is safe to just destroy fiber - we are not in the process of reading from socket.
2020-12-04 13:35:24 +00:00
boost::context::fiber to_destroy = std::move(fiber);
2020-12-10 12:20:18 +00:00
2020-12-22 08:55:21 +00:00
/// Send something to pipe to cancel executor waiting.
2020-12-10 12:20:18 +00:00
uint64_t buf = 0;
while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))
{
if (errno == EAGAIN)
break;
if (errno != EINTR)
throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET);
}
2020-12-04 11:00:32 +00:00
}
~RemoteQueryExecutorReadContext()
{
/// socket_fd is closed by Poco::Net::Socket
/// timer_fd is closed by TimerDescriptor
close(epoll_fd);
}
struct Routine
{
MultiplexedConnections & connections;
Self & read_context;
struct ReadCallback
{
Self & read_context;
Fiber & fiber;
void operator()(Poco::Net::Socket & socket)
{
try
{
read_context.setSocket(socket);
}
catch (DB::Exception & e)
{
e.addMessage(" while reading from socket ({})", socket.peerAddress().toString());
throw;
}
read_context.is_read_in_progress.store(true, std::memory_order_relaxed);
fiber = std::move(fiber).resume();
read_context.is_read_in_progress.store(false, std::memory_order_relaxed);
}
};
2020-12-04 11:00:32 +00:00
2020-12-10 09:30:43 +00:00
Fiber operator()(Fiber && sink) const
2020-12-04 11:00:32 +00:00
{
try
{
while (true)
{
read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink});
2020-12-04 11:00:32 +00:00
sink = std::move(sink).resume();
}
}
2020-12-04 13:35:24 +00:00
catch (const boost::context::detail::forced_unwind &)
{
2020-12-18 15:12:31 +00:00
/// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited
/// It should not be caught or it will segfault.
/// Other exceptions must be caught
2020-12-04 13:35:24 +00:00
throw;
}
2020-12-04 11:00:32 +00:00
catch (...)
{
read_context.exception = std::current_exception();
}
return std::move(sink);
}
};
};
2020-12-14 16:16:08 +00:00
}
#else
namespace DB
{
class RemoteQueryExecutorReadContext
{
2020-12-15 10:08:13 +00:00
public:
void cancel() {}
2020-12-14 16:16:08 +00:00
};
2020-12-04 11:00:32 +00:00
}
2020-12-14 16:16:08 +00:00
#endif