Add AsyncTaskQueue.

This commit is contained in:
Nikolai Kochetov 2020-12-03 20:03:13 +03:00
parent 082a496364
commit 92d60a7f38
5 changed files with 119 additions and 17 deletions

View File

@ -39,7 +39,7 @@ struct RemoteQueryExecutor::ReadContext
TimerDescriptor timer{CLOCK_MONOTONIC, 0};
int socket_fd;
int epoll_df;
int epoll_fd;
explicit ReadContext(MultiplexedConnections & connections_) : connections(connections_)
{
@ -47,7 +47,8 @@ struct RemoteQueryExecutor::ReadContext
socket_fd = socket.impl()->sockfd();
receive_timeout = socket.impl()->getReceiveTimeout();
if (-1 == epoll_create(2))
socket_fd = epoll_create(2);
if (-1 == socket_fd)
throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE);
{
@ -55,7 +56,7 @@ struct RemoteQueryExecutor::ReadContext
socket_event.events = EPOLLIN | EPOLLPRI;
socket_event.data.fd = socket_fd;
if (-1 == epoll_ctl(epoll_df, EPOLL_CTL_ADD, socket_event.data.fd, &socket_event))
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_event.data.fd, &socket_event))
throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
@ -64,7 +65,7 @@ struct RemoteQueryExecutor::ReadContext
timer_event.events = EPOLLIN | EPOLLPRI;
timer_event.data.fd = timer.getDescriptor();
if (-1 == epoll_ctl(epoll_df, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event))
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_event.data.fd, &timer_event))
throwFromErrno("Cannot add timer descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
@ -100,7 +101,7 @@ struct RemoteQueryExecutor::ReadContext
epoll_event events[2];
/// Wait for epoll_fd will not block if it was polled externally.
int num_events = epoll_wait(epoll_df, events, 2, 0);
int num_events = epoll_wait(epoll_fd, events, 2, 0);
if (num_events == -1)
throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET);
@ -144,7 +145,7 @@ struct RemoteQueryExecutor::ReadContext
{
/// socket_fd is closed by Poco::Net::Socket
/// timer_fd is closed by TimerDescriptor
close(epoll_df);
close(epoll_fd);
}
struct Routine
@ -388,7 +389,7 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
if (read_context->is_read_in_progress)
{
read_context->setTimer();
return read_context->epoll_df;
return read_context->epoll_fd;
}
else
{

View File

@ -0,0 +1,73 @@
#include <Processors/Executors/AsyncTaskQueue.h>
#include <Common/Exception.h>
#include <sys/epoll.h>
#include <unistd.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_READ_FROM_SOCKET;
}
AsyncTaskQueue::AsyncTaskQueue()
{
epoll_fd = epoll_create(1);
if (-1 == epoll_fd)
throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE);
}
AsyncTaskQueue::~AsyncTaskQueue()
{
close(epoll_fd);
}
void AsyncTaskQueue::addTask(void * data, int fd)
{
epoll_event socket_event;
socket_event.events = EPOLLIN | EPOLLPRI;
socket_event.data.fd = fd;
socket_event.data.ptr = data;
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_event.data.fd, &socket_event))
throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
++num_tasks;
if (num_tasks == 1)
condvar.notify_one();
}
void * AsyncTaskQueue::wait(std::unique_lock<std::mutex> lock)
{
condvar.wait(lock, [&] { return !empty() || is_finished; });
if (is_finished)
return nullptr;
lock.unlock();
epoll_event event;
int num_events = 0;
while (num_events == 0)
{
num_events = epoll_wait(epoll_fd, &event, 1, 0);
if (num_events == -1)
throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET);
}
lock.lock();
--num_tasks;
return event.data.ptr;
}
void AsyncTaskQueue::finish()
{
is_finished = true;
condvar.notify_one();
}
}

View File

@ -0,0 +1,35 @@
#include <cstddef>
#include <mutex>
#include <atomic>
namespace DB
{
class AsyncTaskQueue
{
private:
int epoll_fd;
size_t num_tasks;
std::atomic_bool is_finished = false;
std::condition_variable condvar;
public:
AsyncTaskQueue();
~AsyncTaskQueue();
size_t size() const { return num_tasks; }
bool empty() const { return num_tasks == 0; }
/// Add new task to queue.
void addTask(void * data, int fd);
/// Wait for any descriptor. If no descriptors in queue, blocks.
/// Returns ptr which was inserted into queue or nullptr if finished was called.
/// Lock is used to wait on condvar.
void * wait(std::unique_lock<std::mutex> lock);
/// Interrupt waiting.
void finish();
};
}

View File

@ -254,10 +254,6 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
// addAsyncJob(pid);
// break;
}
case IProcessor::Status::Wait:
{
throw Exception("Wait is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
}
case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;

View File

@ -146,13 +146,10 @@ public:
/// You may call 'work' method and processor will do some work synchronously.
Ready,
/// You may call 'schedule' method and processor will initiate some background work.
/// You may call 'schedule' method and processor will return descriptor.
/// You need to poll this descriptor and call work() afterwards.
Async,
/// Processor is doing some work in background.
/// You may wait for next event or do something else and then you should call 'prepare' again.
Wait,
/// Processor wants to add other processors to pipeline.
/// New processors must be obtained by expandPipeline() call.
ExpandPipeline,
@ -207,7 +204,7 @@ public:
* Note that it can fire many events in EventCounter while doing its job,
* and you have to wait for next event (or do something else) every time when 'prepare' returned Wait.
*/
virtual void schedule(EventCounter & /*watch*/)
virtual int schedule()
{
throw Exception("Method 'schedule' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
}