Support of Async status for PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2020-12-04 12:55:27 +03:00
parent 92d60a7f38
commit 00492eec67
4 changed files with 64 additions and 20 deletions

View File

@ -40,7 +40,7 @@ void AsyncTaskQueue::addTask(void * data, int fd)
condvar.notify_one();
}
void * AsyncTaskQueue::wait(std::unique_lock<std::mutex> lock)
void * AsyncTaskQueue::wait(std::unique_lock<std::mutex> & lock)
{
condvar.wait(lock, [&] { return !empty() || is_finished; });

View File

@ -26,7 +26,7 @@ public:
/// Wait for any descriptor. If no descriptors in queue, blocks.
/// Returns ptr which was inserted into queue or nullptr if finished was called.
/// Lock is used to wait on condvar.
void * wait(std::unique_lock<std::mutex> lock);
void * wait(std::unique_lock<std::mutex> & lock);
/// Interrupt waiting.
void finish();

View File

@ -164,7 +164,7 @@ bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
return true;
}
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, size_t thread_number)
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, size_t thread_number)
{
/// In this method we have ownership on edge, but node can be concurrently accessed.
@ -185,7 +185,7 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & ed
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
return prepareProcessor(edge.to, thread_number, queue, std::move(lock));
return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock));
}
else
graph->nodes[edge.to]->processor->onUpdatePorts();
@ -193,7 +193,7 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & ed
return true;
}
bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock<std::mutex> node_lock)
bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, Queue & async_queue, std::unique_lock<std::mutex> node_lock)
{
/// In this method we have ownership on node.
auto & node = *graph->nodes[pid];
@ -248,11 +248,9 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
}
case IProcessor::Status::Async:
{
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
// node.status = ExecStatus::Executing;
// addAsyncJob(pid);
// break;
node.status = ExecutingGraph::ExecStatus::Executing;
async_queue.push(&node);
break;
}
case IProcessor::Status::ExpandPipeline:
{
@ -284,13 +282,13 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
{
for (auto & edge : updated_direct_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, thread_number))
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number))
return false;
}
for (auto & edge : updated_back_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, thread_number))
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number))
return false;
}
}
@ -321,7 +319,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue
while (!stack.empty())
{
auto item = stack.top();
if (!prepareProcessor(item, thread_number, queue, std::unique_lock<std::mutex>(graph->nodes[item]->status_mutex)))
if (!prepareProcessor(item, thread_number, queue, async_queue, std::unique_lock<std::mutex>(graph->nodes[item]->status_mutex)))
return false;
stack.pop();
@ -374,6 +372,7 @@ void PipelineExecutor::finish()
{
std::lock_guard lock(task_queue_mutex);
finished = true;
async_task_queue.finish();
}
std::lock_guard guard(executor_contexts_mutex);
@ -518,13 +517,24 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
break;
}
if (threads_queue.size() + 1 == num_threads)
if (threads_queue.size() + 1 == num_threads && async_task_queue.empty())
{
lock.unlock();
finish();
break;
}
if (num_threads == 1)
{
/// If we execute in single thread, wait for async tasks here.
void * res = async_task_queue.wait(lock);
if (!res)
throw Exception("Empty task was returned from async task queue", ErrorCodes::LOGICAL_ERROR);
node = static_cast<ExecutingGraph::Node *>(res);
continue;
}
threads_queue.push(thread_num);
}
@ -575,6 +585,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
/// Try to execute neighbour processor.
{
Queue queue;
Queue async_queue;
++num_processing_executors;
while (auto * task = expand_pipeline_task.load())
@ -583,7 +594,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
/// Prepare processor after execution.
{
auto lock = std::unique_lock<std::mutex>(node->status_mutex);
if (!prepareProcessor(node->processors_id, thread_num, queue, std::move(lock)))
if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock)))
finish();
}
@ -597,17 +608,23 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
}
/// Push other tasks to global queue.
if (!queue.empty())
if (!queue.empty() || !async_queue.empty())
{
std::unique_lock lock(task_queue_mutex);
while (!async_queue.empty() && !finished)
{
async_task_queue.addTask(async_queue.front(), async_queue.front()->processor->schedule());
async_queue.pop();
}
while (!queue.empty() && !finished)
{
task_queue.push(queue.front(), thread_num);
queue.pop();
}
if (!threads_queue.empty() && !finished /* && task_queue.quota() > threads_queue.size()*/)
if (!threads_queue.empty() && !finished)
{
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
@ -665,6 +682,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
std::lock_guard lock(task_queue_mutex);
Queue queue;
Queue async_queue;
size_t next_thread = 0;
while (!stack.empty())
@ -672,7 +690,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
UInt64 proc = stack.top();
stack.pop();
prepareProcessor(proc, 0, queue, std::unique_lock<std::mutex>(graph->nodes[proc]->status_mutex));
prepareProcessor(proc, 0, queue, async_queue, std::unique_lock<std::mutex>(graph->nodes[proc]->status_mutex));
while (!queue.empty())
{
@ -683,6 +701,12 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
if (next_thread >= num_threads)
next_thread = 0;
}
while (!async_queue.empty())
{
async_task_queue.addTask(async_queue.front(), async_queue.front()->processor->schedule());
async_queue.pop();
}
}
}
}
@ -743,6 +767,20 @@ void PipelineExecutor::executeImpl(size_t num_threads)
});
}
{
/// Wait for async tasks.
std::unique_lock lock(task_queue_mutex);
size_t next_thread = 0;
while (void * task = async_task_queue.wait(lock))
{
task_queue.push(static_cast<ExecutingGraph::Node *>(task), next_thread);
++next_thread;
if (next_thread >= num_threads)
next_thread = 0;
}
}
for (auto & thread : threads)
if (thread.joinable())
thread.join();

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Executors/AsyncTaskQueue.h>
#include <Processors/Executors/ThreadsQueue.h>
#include <Processors/Executors/TasksQueue.h>
#include <Processors/Executors/ExecutingGraph.h>
@ -57,6 +58,11 @@ private:
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue<ExecutingGraph::Node> task_queue;
/// Queue which stores tasks where processors returned Async status after prepare.
/// If multiple threads are using, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
AsyncTaskQueue async_task_queue;
ThreadsQueue threads_queue;
std::mutex task_queue_mutex;
@ -126,14 +132,14 @@ private:
/// Pipeline execution related methods.
void addChildlessProcessorsToStack(Stack & stack);
bool tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, size_t thread_number);
bool tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, size_t thread_number);
static void addJob(ExecutingGraph::Node * execution_state);
// TODO: void addAsyncJob(UInt64 pid);
/// Prepare processor with pid number.
/// Check parents and children of current processor and push them to stacks if they also need to be prepared.
/// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used.
bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock<std::mutex> node_lock);
bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, Queue & async_queue, std::unique_lock<std::mutex> node_lock);
bool doExpandPipeline(ExpandPipelineTask * task, bool processing);
/// Continue executor (in case there are tasks in queue).