Use same thread for async task continuation.

This commit is contained in:
Nikolai Kochetov 2020-12-07 14:16:23 +03:00
parent 156f44808f
commit 6a8384e6a4
4 changed files with 66 additions and 46 deletions

View File

@ -25,33 +25,28 @@ AsyncTaskQueue::~AsyncTaskQueue()
close(epoll_fd);
}
void AsyncTaskQueue::addTask(void * data, int fd)
void AsyncTaskQueue::addTask(size_t thread_number, void * data, int fd)
{
auto it = tasks.insert(tasks.end(), TaskData{thread_number, data, fd, {}});
it->self = it;
epoll_event socket_event;
socket_event.events = EPOLLIN | EPOLLPRI;
socket_event.data.ptr = data;
socket_event.data.ptr = &(*it);
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &socket_event))
throwFromErrno("Cannot add socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
++num_tasks;
if (num_tasks == 1)
if (size() == 1)
condvar.notify_one();
}
void AsyncTaskQueue::removeTask(int fd) const
{
epoll_event event;
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &event))
throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
void * AsyncTaskQueue::wait(std::unique_lock<std::mutex> & lock)
AsyncTaskQueue::TaskData AsyncTaskQueue::wait(std::unique_lock<std::mutex> & lock)
{
condvar.wait(lock, [&] { return !empty() || is_finished; });
if (is_finished)
return nullptr;
return {};
lock.unlock();
@ -63,18 +58,24 @@ void * AsyncTaskQueue::wait(std::unique_lock<std::mutex> & lock)
num_events = epoll_wait(epoll_fd, &event, 1, 0);
if (num_events == -1)
throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET);
}
lock.lock();
--num_tasks;
return event.data.ptr;
auto it = static_cast<TaskData *>(event.data.ptr)->self;
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_DEL, it->fd, &event))
throwFromErrno("Cannot remove socket descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
auto res = *it;
tasks.erase(it);
return res;
}
void AsyncTaskQueue::finish()
{
is_finished = true;
num_tasks = 0;
tasks.clear();
condvar.notify_one();
}

View File

@ -1,35 +1,46 @@
#include <cstddef>
#include <mutex>
#include <atomic>
#include <list>
namespace DB
{
class AsyncTaskQueue
{
public:
struct TaskData
{
size_t thread_num;
void * data = nullptr;
int fd = -1;
std::list<TaskData>::iterator self;
explicit operator bool() const { return data; }
};
private:
int epoll_fd;
size_t num_tasks;
std::atomic_bool is_finished = false;
std::condition_variable condvar;
std::list<TaskData> tasks;
public:
AsyncTaskQueue();
~AsyncTaskQueue();
size_t size() const { return num_tasks; }
bool empty() const { return num_tasks == 0; }
size_t size() const { return tasks.size(); }
bool empty() const { return tasks.empty(); }
/// Add new task to queue.
void addTask(void * data, int fd);
/// Remove task.
void removeTask(int fd) const;
void addTask(size_t thread_number, void * data, int fd);
/// Wait for any descriptor. If no descriptors in queue, blocks.
/// Returns ptr which was inserted into queue or nullptr if finished was called.
/// Lock is used to wait on condvar.
void * wait(std::unique_lock<std::mutex> & lock);
TaskData wait(std::unique_lock<std::mutex> & lock);
/// Interrupt waiting.
void finish();

View File

@ -497,11 +497,18 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
{
std::unique_lock lock(task_queue_mutex);
if (!task_queue.empty())
if (!context->async_tasks.empty())
{
node = context->async_tasks.front();
context->async_tasks.pop();
--num_waiting_async_tasks;
}
else if (!task_queue.empty())
node = task_queue.pop(thread_num);
if (!task_queue.empty() && !threads_queue.empty() /*&& task_queue.quota() > threads_queue.size()*/)
if (node)
{
if (!task_queue.empty() && !threads_queue.empty())
{
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
@ -517,7 +524,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
break;
}
if (threads_queue.size() + 1 == num_threads && async_task_queue.empty())
if (threads_queue.size() + 1 == num_threads && async_task_queue.empty() && num_waiting_async_tasks == 0)
{
lock.unlock();
finish();
@ -527,13 +534,12 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
if (num_threads == 1)
{
/// If we execute in single thread, wait for async tasks here.
void * res = async_task_queue.wait(lock);
auto res = async_task_queue.wait(lock);
if (!res)
throw Exception("Empty task was returned from async task queue", ErrorCodes::LOGICAL_ERROR);
node = static_cast<ExecutingGraph::Node *>(res);
async_task_queue.removeTask(node->processor->schedule());
continue;
node = static_cast<ExecutingGraph::Node *>(res.data);
break;
}
threads_queue.push(thread_num);
@ -615,7 +621,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
while (!async_queue.empty() && !finished)
{
async_task_queue.addTask(async_queue.front(), async_queue.front()->processor->schedule());
async_task_queue.addTask(thread_num, async_queue.front(), async_queue.front()->processor->schedule());
async_queue.pop();
}
@ -625,7 +631,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
queue.pop();
}
if (!threads_queue.empty() && !finished)
if (!threads_queue.empty() && !task_queue.empty() && !finished)
{
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
@ -704,10 +710,8 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
}
while (!async_queue.empty())
{
async_task_queue.addTask(async_queue.front(), async_queue.front()->processor->schedule());
async_queue.pop();
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Async is only possible after work() call. Processor {}",
async_queue.front()->processor->getName());
}
}
}
@ -771,16 +775,17 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{
/// Wait for async tasks.
std::unique_lock lock(task_queue_mutex);
size_t next_thread = 0;
while (void * task = async_task_queue.wait(lock))
while (auto task = async_task_queue.wait(lock))
{
auto * node = static_cast<ExecutingGraph::Node *>(task);
async_task_queue.removeTask(node->processor->schedule());
task_queue.push(node, next_thread);
auto * node = static_cast<ExecutingGraph::Node *>(task.data);
executor_contexts[task.thread_num]->async_tasks.push(node);
++num_waiting_async_tasks;
++next_thread;
if (next_thread >= num_threads)
next_thread = 0;
if (threads_queue.has(task.thread_num))
{
threads_queue.pop(task.thread_num);
wakeUpExecutor(task.thread_num);
}
}
}

View File

@ -62,6 +62,7 @@ private:
/// If multiple threads are using, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
AsyncTaskQueue async_task_queue;
size_t num_waiting_async_tasks = 0;
ThreadsQueue threads_queue;
std::mutex task_queue_mutex;
@ -96,6 +97,8 @@ private:
/// This can be solved by using atomic shard ptr.
std::list<ExpandPipelineTask> task_list;
std::queue<ExecutingGraph::Node *> async_tasks;
std::condition_variable condvar;
std::mutex mutex;
bool wake_flag = false;