From 92d60a7f38a359a9b717a6f2c5ee8c5a1613c7ea Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 3 Dec 2020 20:03:13 +0300 Subject: [PATCH] Add AsyncTaskQueue. --- src/DataStreams/RemoteQueryExecutor.cpp | 15 ++-- src/Processors/Executors/AsyncTaskQueue.cpp | 73 +++++++++++++++++++ src/Processors/Executors/AsyncTaskQueue.h | 35 +++++++++ src/Processors/Executors/PipelineExecutor.cpp | 4 - src/Processors/IProcessor.h | 9 +-- 5 files changed, 119 insertions(+), 17 deletions(-) create mode 100644 src/Processors/Executors/AsyncTaskQueue.cpp create mode 100644 src/Processors/Executors/AsyncTaskQueue.h diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index a902541b875..100f20b4123 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -39,7 +39,7 @@ struct RemoteQueryExecutor::ReadContext TimerDescriptor timer{CLOCK_MONOTONIC, 0}; int socket_fd; - int epoll_df; + int epoll_fd; explicit ReadContext(MultiplexedConnections & connections_) : connections(connections_) { @@ -47,7 +47,8 @@ struct RemoteQueryExecutor::ReadContext socket_fd = socket.impl()->sockfd(); receive_timeout = socket.impl()->getReceiveTimeout(); - if (-1 == epoll_create(2)) + socket_fd = epoll_create(2); + if (-1 == socket_fd) throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE); { @@ -55,7 +56,7 @@ struct RemoteQueryExecutor::ReadContext socket_event.events = EPOLLIN | EPOLLPRI; socket_event.data.fd = socket_fd; - if (-1 == epoll_ctl(epoll_df, EPOLL_CTL_ADD, socket_event.data.fd, &socket_event)) + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_event.data.fd, &socket_event)) throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); } @@ -64,7 +65,7 @@ struct RemoteQueryExecutor::ReadContext timer_event.events = EPOLLIN | EPOLLPRI; timer_event.data.fd = timer.getDescriptor(); - if (-1 == epoll_ctl(epoll_df, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event)) + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event)) throwFromErrno("Cannot add timer descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); } @@ -100,7 +101,7 @@ struct RemoteQueryExecutor::ReadContext epoll_event events[2]; /// Wait for epoll_fd will not block if it was polled externally. - int num_events = epoll_wait(epoll_df, events, 2, 0); + int num_events = epoll_wait(epoll_fd, events, 2, 0); if (num_events == -1) throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET); @@ -144,7 +145,7 @@ struct RemoteQueryExecutor::ReadContext { /// socket_fd is closed by Poco::Net::Socket /// timer_fd is closed by TimerDescriptor - close(epoll_df); + close(epoll_fd); } struct Routine @@ -388,7 +389,7 @@ std::variant RemoteQueryExecutor::read(std::unique_ptr if (read_context->is_read_in_progress) { read_context->setTimer(); - return read_context->epoll_df; + return read_context->epoll_fd; } else { diff --git a/src/Processors/Executors/AsyncTaskQueue.cpp b/src/Processors/Executors/AsyncTaskQueue.cpp new file mode 100644 index 00000000000..417b3ce25f1 --- /dev/null +++ b/src/Processors/Executors/AsyncTaskQueue.cpp @@ -0,0 +1,73 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_READ_FROM_SOCKET; +} + + +AsyncTaskQueue::AsyncTaskQueue() +{ + epoll_fd = epoll_create(1); + if (-1 == epoll_fd) + throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE); +} + +AsyncTaskQueue::~AsyncTaskQueue() +{ + close(epoll_fd); +} + +void AsyncTaskQueue::addTask(void * data, int fd) +{ + epoll_event socket_event; + socket_event.events = EPOLLIN | EPOLLPRI; + socket_event.data.fd = fd; + socket_event.data.ptr = data; + + if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_event.data.fd, &socket_event)) + throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE); + + ++num_tasks; + if (num_tasks == 1) + condvar.notify_one(); +} + +void * AsyncTaskQueue::wait(std::unique_lock lock) +{ + condvar.wait(lock, [&] { return !empty() || is_finished; }); + + if (is_finished) + return nullptr; + + lock.unlock(); + + epoll_event event; + int num_events = 0; + + while (num_events == 0) + { + num_events = epoll_wait(epoll_fd, &event, 1, 0); + if (num_events == -1) + throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET); + } + + lock.lock(); + --num_tasks; + return event.data.ptr; +} + +void AsyncTaskQueue::finish() +{ + is_finished = true; + condvar.notify_one(); +} + +} diff --git a/src/Processors/Executors/AsyncTaskQueue.h b/src/Processors/Executors/AsyncTaskQueue.h new file mode 100644 index 00000000000..b66fe932d46 --- /dev/null +++ b/src/Processors/Executors/AsyncTaskQueue.h @@ -0,0 +1,35 @@ +#include +#include +#include + +namespace DB +{ + +class AsyncTaskQueue +{ +private: + int epoll_fd; + size_t num_tasks; + std::atomic_bool is_finished = false; + std::condition_variable condvar; + +public: + AsyncTaskQueue(); + ~AsyncTaskQueue(); + + size_t size() const { return num_tasks; } + bool empty() const { return num_tasks == 0; } + + /// Add new task to queue. + void addTask(void * data, int fd); + + /// Wait for any descriptor. If no descriptors in queue, blocks. + /// Returns ptr which was inserted into queue or nullptr if finished was called. + /// Lock is used to wait on condvar. + void * wait(std::unique_lock lock); + + /// Interrupt waiting. + void finish(); +}; + +} diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index 517e07a3ba4..a9f6562693e 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -254,10 +254,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue // addAsyncJob(pid); // break; } - case IProcessor::Status::Wait: - { - throw Exception("Wait is temporary not supported.", ErrorCodes::LOGICAL_ERROR); - } case IProcessor::Status::ExpandPipeline: { need_expand_pipeline = true; diff --git a/src/Processors/IProcessor.h b/src/Processors/IProcessor.h index c774b43a9b2..08adc151424 100644 --- a/src/Processors/IProcessor.h +++ b/src/Processors/IProcessor.h @@ -146,13 +146,10 @@ public: /// You may call 'work' method and processor will do some work synchronously. Ready, - /// You may call 'schedule' method and processor will initiate some background work. + /// You may call 'schedule' method and processor will return descriptor. + /// You need to poll this descriptor and call work() afterwards. Async, - /// Processor is doing some work in background. - /// You may wait for next event or do something else and then you should call 'prepare' again. - Wait, - /// Processor wants to add other processors to pipeline. /// New processors must be obtained by expandPipeline() call. ExpandPipeline, @@ -207,7 +204,7 @@ public: * Note that it can fire many events in EventCounter while doing its job, * and you have to wait for next event (or do something else) every time when 'prepare' returned Wait. */ - virtual void schedule(EventCounter & /*watch*/) + virtual int schedule() { throw Exception("Method 'schedule' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); }