diff --git a/src/Client/MultiplexedConnections.h b/src/Client/MultiplexedConnections.h index 46312ae339d..2ab2b60570e 100644 --- a/src/Client/MultiplexedConnections.h +++ b/src/Client/MultiplexedConnections.h @@ -106,7 +106,7 @@ private: /// in separate thread. mutable std::mutex cancel_mutex; - friend class RemoteQueryExecutorReadContext; + friend struct RemoteQueryExecutorRoutine; }; } diff --git a/src/Common/FiberStack.h b/src/Common/FiberStack.h index b54b1a7de10..c509540cc9e 100644 --- a/src/Common/FiberStack.h +++ b/src/Common/FiberStack.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.cpp b/src/DataStreams/RemoteQueryExecutorReadContext.cpp new file mode 100644 index 00000000000..bc47b049407 --- /dev/null +++ b/src/DataStreams/RemoteQueryExecutorReadContext.cpp @@ -0,0 +1,238 @@ +#if defined(OS_LINUX) + +#include +#include +#include +#include +#include + +namespace DB +{ + +struct RemoteQueryExecutorRoutine +{ + MultiplexedConnections & connections; + RemoteQueryExecutorReadContext & read_context; + + struct ReadCallback + { + RemoteQueryExecutorReadContext & read_context; + Fiber & fiber; + + void operator()(Poco::Net::Socket & socket) + { + try + { + read_context.setSocket(socket); + } + catch (DB::Exception & e) + { + e.addMessage(" while reading from socket ({})", socket.peerAddress().toString()); + throw; + } + + read_context.is_read_in_progress.store(true, std::memory_order_relaxed); + fiber = std::move(fiber).resume(); + read_context.is_read_in_progress.store(false, std::memory_order_relaxed); + } + }; + + Fiber operator()(Fiber && sink) const + { + try + { + while (true) + { + read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}); + sink = std::move(sink).resume(); + } + } + catch (const boost::context::detail::forced_unwind &) + { + /// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited + /// It should not be caught or it will segfault. + /// Other exceptions must be caught + throw; + } + catch (...) + { + read_context.exception = std::current_exception(); + } + + return std::move(sink); + } +}; + +namespace ErrorCodes +{ + extern const int CANNOT_READ_FROM_SOCKET; + extern const int CANNOT_OPEN_FILE; + extern const int SOCKET_TIMEOUT; +} + +RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(MultiplexedConnections & connections_) + : connections(connections_) +{ + epoll_fd = epoll_create(2); + if (-1 == epoll_fd) + throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE); + + if (-1 == pipe2(pipe_fd, O_NONBLOCK)) + throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE); + + { + epoll_event socket_event; + socket_event.events = EPOLLIN | EPOLLPRI; + socket_event.data.fd = pipe_fd[0]; + + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd[0], &socket_event)) + throwFromErrno("Cannot add pipe descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + } + + { + epoll_event timer_event; + timer_event.events = EPOLLIN | EPOLLPRI; + timer_event.data.fd = timer.getDescriptor(); + + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event)) + throwFromErrno("Cannot add timer descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + } + + auto routine = RemoteQueryExecutorRoutine{connections, *this}; + fiber = boost::context::fiber(std::allocator_arg_t(), stack, std::move(routine)); +} + +void RemoteQueryExecutorReadContext::setSocket(Poco::Net::Socket & socket) +{ + int fd = socket.impl()->sockfd(); + if (fd == socket_fd) + return; + + epoll_event socket_event; + socket_event.events = EPOLLIN | EPOLLPRI; + socket_event.data.fd = fd; + + if (socket_fd != -1) + { + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socket_fd, &socket_event)) + throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + } + + socket_fd = fd; + + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &socket_event)) + throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + + receive_timeout = socket.impl()->getReceiveTimeout(); +} + +bool RemoteQueryExecutorReadContext::checkTimeout() const +{ + try + { + return checkTimeoutImpl(); + } + catch (DB::Exception & e) + { + if (last_used_socket) + e.addMessage(" while reading from socket ({})", last_used_socket->peerAddress().toString()); + throw; + } +} + +bool RemoteQueryExecutorReadContext::checkTimeoutImpl() const +{ + epoll_event events[3]; + events[0].data.fd = events[1].data.fd = events[2].data.fd = -1; + + /// Wait for epoll_fd will not block if it was polled externally. + int num_events = epoll_wait(epoll_fd, events, 3, 0); + if (num_events == -1) + throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET); + + bool is_socket_ready = false; + bool is_pipe_alarmed = false; + bool has_timer_alarm = false; + + for (int i = 0; i < num_events; ++i) + { + if (events[i].data.fd == socket_fd) + is_socket_ready = true; + if (events[i].data.fd == timer.getDescriptor()) + has_timer_alarm = true; + if (events[i].data.fd == pipe_fd[0]) + is_pipe_alarmed = true; + } + + if (is_pipe_alarmed) + return false; + + if (has_timer_alarm && !is_socket_ready) + { + /// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception. + timer.drain(); + throw NetException("Timeout exceeded", ErrorCodes::SOCKET_TIMEOUT); + } + + return true; +} + +void RemoteQueryExecutorReadContext::setTimer() const +{ + /// Did not get packet yet. Init timeout for the next async reading. + timer.reset(); + + if (receive_timeout.totalMicroseconds()) + timer.setRelative(receive_timeout); +} + +bool RemoteQueryExecutorReadContext::resumeRoutine() +{ + if (is_read_in_progress.load(std::memory_order_relaxed) && !checkTimeout()) + return false; + + { + std::lock_guard guard(fiber_lock); + if (!fiber) + return false; + + fiber = std::move(fiber).resume(); + } + + if (exception) + std::rethrow_exception(std::move(exception)); + + return true; +} + +void RemoteQueryExecutorReadContext::cancel() +{ + std::lock_guard guard(fiber_lock); + /// It is safe to just destroy fiber - we are not in the process of reading from socket. + boost::context::fiber to_destroy = std::move(fiber); + + /// Send something to pipe to cancel executor waiting. + uint64_t buf = 0; + while (-1 == write(pipe_fd[1], &buf, sizeof(buf))) + { + if (errno == EAGAIN) + break; + + if (errno != EINTR) + throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET); + } +} + +RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext() +{ + /// socket_fd is closed by Poco::Net::Socket + if (epoll_fd != -1) + close(epoll_fd); + if (pipe_fd[0] != -1) + close(pipe_fd[0]); + if (pipe_fd[1] != -1) + close(pipe_fd[1]); +} + +} +#endif diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.h b/src/DataStreams/RemoteQueryExecutorReadContext.h index 6d7099899ae..89dc2813a9a 100644 --- a/src/DataStreams/RemoteQueryExecutorReadContext.h +++ b/src/DataStreams/RemoteQueryExecutorReadContext.h @@ -2,26 +2,27 @@ #if defined(OS_LINUX) -#include +#include +#include #include #include #include +#include +#include + +namespace Poco::Net +{ +class Socket; +} namespace DB { -namespace ErrorCodes -{ - extern const int CANNOT_READ_FROM_SOCKET; - extern const int CANNOT_OPEN_FILE; - extern const int SOCKET_TIMEOUT; -} +class MultiplexedConnections; class RemoteQueryExecutorReadContext { public: - using Self = RemoteQueryExecutorReadContext; - std::atomic_bool is_read_in_progress = false; Packet packet; @@ -42,224 +43,26 @@ public: /// We put those descriptors into our own epoll_fd which is used by external executor. TimerDescriptor timer{CLOCK_MONOTONIC, 0}; int socket_fd = -1; - int epoll_fd; - int pipe_fd[2]; + int epoll_fd = -1; + int pipe_fd[2] = { -1, -1 }; - explicit RemoteQueryExecutorReadContext(MultiplexedConnections & connections_) : connections(connections_) - { - epoll_fd = epoll_create(2); - if (-1 == epoll_fd) - throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE); + explicit RemoteQueryExecutorReadContext(MultiplexedConnections & connections_); + ~RemoteQueryExecutorReadContext(); - if (-1 == pipe2(pipe_fd, O_NONBLOCK)) - throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE); + bool checkTimeout() const; + bool checkTimeoutImpl() const; - { - epoll_event socket_event; - socket_event.events = EPOLLIN | EPOLLPRI; - socket_event.data.fd = pipe_fd[0]; + void setSocket(Poco::Net::Socket & socket); + void setTimer() const; - if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd[0], &socket_event)) - throwFromErrno("Cannot add pipe descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); - } - - { - epoll_event timer_event; - timer_event.events = EPOLLIN | EPOLLPRI; - timer_event.data.fd = timer.getDescriptor(); - - if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event)) - throwFromErrno("Cannot add timer descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); - } - - auto routine = Routine{connections, *this}; - fiber = boost::context::fiber(std::allocator_arg_t(), stack, std::move(routine)); - } - - void setSocket(Poco::Net::Socket & socket) - { - int fd = socket.impl()->sockfd(); - if (fd == socket_fd) - return; - - epoll_event socket_event; - socket_event.events = EPOLLIN | EPOLLPRI; - socket_event.data.fd = fd; - - if (socket_fd != -1) - { - if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socket_fd, &socket_event)) - throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); - } - - socket_fd = fd; - - if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &socket_event)) - throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); - - receive_timeout = socket.impl()->getReceiveTimeout(); - } - - bool checkTimeout() const - { - try - { - return checkTimeoutImpl(); - } - catch (DB::Exception & e) - { - if (last_used_socket) - e.addMessage(" while reading from socket ({})", last_used_socket->peerAddress().toString()); - throw; - } - } - - bool checkTimeoutImpl() const - { - epoll_event events[3]; - events[0].data.fd = events[1].data.fd = events[2].data.fd = -1; - - /// Wait for epoll_fd will not block if it was polled externally. - int num_events = epoll_wait(epoll_fd, events, 3, 0); - if (num_events == -1) - throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET); - - bool is_socket_ready = false; - bool is_pipe_alarmed = false; - bool has_timer_alarm = false; - - for (int i = 0; i < num_events; ++i) - { - if (events[i].data.fd == socket_fd) - is_socket_ready = true; - if (events[i].data.fd == timer.getDescriptor()) - has_timer_alarm = true; - if (events[i].data.fd == pipe_fd[0]) - is_pipe_alarmed = true; - } - - if (is_pipe_alarmed) - return false; - - if (has_timer_alarm && !is_socket_ready) - { - /// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception. - timer.drain(); - throw NetException("Timeout exceeded", ErrorCodes::SOCKET_TIMEOUT); - } - - return true; - } - - void setTimer() const - { - /// Did not get packet yet. Init timeout for the next async reading. - timer.reset(); - - if (receive_timeout.totalMicroseconds()) - timer.setRelative(receive_timeout); - } - - bool resumeRoutine() - { - if (is_read_in_progress.load(std::memory_order_relaxed) && !checkTimeout()) - return false; - - { - std::lock_guard guard(fiber_lock); - if (!fiber) - return false; - - fiber = std::move(fiber).resume(); - } - - if (exception) - std::rethrow_exception(std::move(exception)); - - return true; - } - - void cancel() - { - std::lock_guard guard(fiber_lock); - /// It is safe to just destroy fiber - we are not in the process of reading from socket. - boost::context::fiber to_destroy = std::move(fiber); - - /// Send something to pipe to cancel executor waiting. - uint64_t buf = 0; - while (-1 == write(pipe_fd[1], &buf, sizeof(buf))) - { - if (errno == EAGAIN) - break; - - if (errno != EINTR) - throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET); - } - } - - ~RemoteQueryExecutorReadContext() - { - /// socket_fd is closed by Poco::Net::Socket - /// timer_fd is closed by TimerDescriptor - close(epoll_fd); - } - - struct Routine - { - MultiplexedConnections & connections; - Self & read_context; - - struct ReadCallback - { - Self & read_context; - Fiber & fiber; - - void operator()(Poco::Net::Socket & socket) - { - try - { - read_context.setSocket(socket); - } - catch (DB::Exception & e) - { - e.addMessage(" while reading from socket ({})", socket.peerAddress().toString()); - throw; - } - - read_context.is_read_in_progress.store(true, std::memory_order_relaxed); - fiber = std::move(fiber).resume(); - read_context.is_read_in_progress.store(false, std::memory_order_relaxed); - } - }; - - Fiber operator()(Fiber && sink) const - { - try - { - while (true) - { - read_context.packet = connections.receivePacketUnlocked(ReadCallback{read_context, sink}); - sink = std::move(sink).resume(); - } - } - catch (const boost::context::detail::forced_unwind &) - { - /// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited - /// It should not be caught or it will segfault. - /// Other exceptions must be caught - throw; - } - catch (...) - { - read_context.exception = std::current_exception(); - } - - return std::move(sink); - } - }; + bool resumeRoutine(); + void cancel(); }; + } + #else + namespace DB { class RemoteQueryExecutorReadContext diff --git a/src/DataStreams/ya.make b/src/DataStreams/ya.make index 8648f233f26..0af72f25d3d 100644 --- a/src/DataStreams/ya.make +++ b/src/DataStreams/ya.make @@ -39,6 +39,7 @@ SRCS( RemoteBlockInputStream.cpp RemoteBlockOutputStream.cpp RemoteQueryExecutor.cpp + RemoteQueryExecutorReadContext.cpp SizeLimits.cpp SquashingBlockInputStream.cpp SquashingBlockOutputStream.cpp