From 00492eec67f115cbbef24729622f82a5234931cd Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 4 Dec 2020 12:55:27 +0300 Subject: [PATCH] Support of Async status for PipelineExecutor. --- src/Processors/Executors/AsyncTaskQueue.cpp | 2 +- src/Processors/Executors/AsyncTaskQueue.h | 2 +- src/Processors/Executors/PipelineExecutor.cpp | 70 ++++++++++++++----- src/Processors/Executors/PipelineExecutor.h | 10 ++- 4 files changed, 64 insertions(+), 20 deletions(-) diff --git a/src/Processors/Executors/AsyncTaskQueue.cpp b/src/Processors/Executors/AsyncTaskQueue.cpp index 417b3ce25f1..d0a63a87165 100644 --- a/src/Processors/Executors/AsyncTaskQueue.cpp +++ b/src/Processors/Executors/AsyncTaskQueue.cpp @@ -40,7 +40,7 @@ void AsyncTaskQueue::addTask(void * data, int fd) condvar.notify_one(); } -void * AsyncTaskQueue::wait(std::unique_lock lock) +void * AsyncTaskQueue::wait(std::unique_lock & lock) { condvar.wait(lock, [&] { return !empty() || is_finished; }); diff --git a/src/Processors/Executors/AsyncTaskQueue.h b/src/Processors/Executors/AsyncTaskQueue.h index b66fe932d46..2f3cdf8ab82 100644 --- a/src/Processors/Executors/AsyncTaskQueue.h +++ b/src/Processors/Executors/AsyncTaskQueue.h @@ -26,7 +26,7 @@ public: /// Wait for any descriptor. If no descriptors in queue, blocks. /// Returns ptr which was inserted into queue or nullptr if finished was called. /// Lock is used to wait on condvar. - void * wait(std::unique_lock lock); + void * wait(std::unique_lock & lock); /// Interrupt waiting. void finish(); diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index a9f6562693e..dce21067557 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -164,7 +164,7 @@ bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) return true; } -bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, size_t thread_number) +bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, size_t thread_number) { /// In this method we have ownership on edge, but node can be concurrently accessed. @@ -185,7 +185,7 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & ed if (status == ExecutingGraph::ExecStatus::Idle) { node.status = ExecutingGraph::ExecStatus::Preparing; - return prepareProcessor(edge.to, thread_number, queue, std::move(lock)); + return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock)); } else graph->nodes[edge.to]->processor->onUpdatePorts(); @@ -193,7 +193,7 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & ed return true; } -bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock node_lock) +bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, Queue & async_queue, std::unique_lock node_lock) { /// In this method we have ownership on node. auto & node = *graph->nodes[pid]; @@ -248,11 +248,9 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue } case IProcessor::Status::Async: { - throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR); - -// node.status = ExecStatus::Executing; -// addAsyncJob(pid); -// break; + node.status = ExecutingGraph::ExecStatus::Executing; + async_queue.push(&node); + break; } case IProcessor::Status::ExpandPipeline: { @@ -284,13 +282,13 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue { for (auto & edge : updated_direct_edges) { - if (!tryAddProcessorToStackIfUpdated(*edge, queue, thread_number)) + if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number)) return false; } for (auto & edge : updated_back_edges) { - if (!tryAddProcessorToStackIfUpdated(*edge, queue, thread_number)) + if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number)) return false; } } @@ -321,7 +319,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue while (!stack.empty()) { auto item = stack.top(); - if (!prepareProcessor(item, thread_number, queue, std::unique_lock(graph->nodes[item]->status_mutex))) + if (!prepareProcessor(item, thread_number, queue, async_queue, std::unique_lock(graph->nodes[item]->status_mutex))) return false; stack.pop(); @@ -374,6 +372,7 @@ void PipelineExecutor::finish() { std::lock_guard lock(task_queue_mutex); finished = true; + async_task_queue.finish(); } std::lock_guard guard(executor_contexts_mutex); @@ -518,13 +517,24 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st break; } - if (threads_queue.size() + 1 == num_threads) + if (threads_queue.size() + 1 == num_threads && async_task_queue.empty()) { lock.unlock(); finish(); break; } + if (num_threads == 1) + { + /// If we execute in single thread, wait for async tasks here. + void * res = async_task_queue.wait(lock); + if (!res) + throw Exception("Empty task was returned from async task queue", ErrorCodes::LOGICAL_ERROR); + + node = static_cast(res); + continue; + } + threads_queue.push(thread_num); } @@ -575,6 +585,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st /// Try to execute neighbour processor. { Queue queue; + Queue async_queue; ++num_processing_executors; while (auto * task = expand_pipeline_task.load()) @@ -583,7 +594,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st /// Prepare processor after execution. { auto lock = std::unique_lock(node->status_mutex); - if (!prepareProcessor(node->processors_id, thread_num, queue, std::move(lock))) + if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock))) finish(); } @@ -597,17 +608,23 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st } /// Push other tasks to global queue. - if (!queue.empty()) + if (!queue.empty() || !async_queue.empty()) { std::unique_lock lock(task_queue_mutex); + while (!async_queue.empty() && !finished) + { + async_task_queue.addTask(async_queue.front(), async_queue.front()->processor->schedule()); + async_queue.pop(); + } + while (!queue.empty() && !finished) { task_queue.push(queue.front(), thread_num); queue.pop(); } - if (!threads_queue.empty() && !finished /* && task_queue.quota() > threads_queue.size()*/) + if (!threads_queue.empty() && !finished) { auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1)); @@ -665,6 +682,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads) std::lock_guard lock(task_queue_mutex); Queue queue; + Queue async_queue; size_t next_thread = 0; while (!stack.empty()) @@ -672,7 +690,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads) UInt64 proc = stack.top(); stack.pop(); - prepareProcessor(proc, 0, queue, std::unique_lock(graph->nodes[proc]->status_mutex)); + prepareProcessor(proc, 0, queue, async_queue, std::unique_lock(graph->nodes[proc]->status_mutex)); while (!queue.empty()) { @@ -683,6 +701,12 @@ void PipelineExecutor::initializeExecution(size_t num_threads) if (next_thread >= num_threads) next_thread = 0; } + + while (!async_queue.empty()) + { + async_task_queue.addTask(async_queue.front(), async_queue.front()->processor->schedule()); + async_queue.pop(); + } } } } @@ -743,6 +767,20 @@ void PipelineExecutor::executeImpl(size_t num_threads) }); } + { + /// Wait for async tasks. + std::unique_lock lock(task_queue_mutex); + size_t next_thread = 0; + while (void * task = async_task_queue.wait(lock)) + { + task_queue.push(static_cast(task), next_thread); + + ++next_thread; + if (next_thread >= num_threads) + next_thread = 0; + } + } + for (auto & thread : threads) if (thread.joinable()) thread.join(); diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h index b457cca34b1..34339a34608 100644 --- a/src/Processors/Executors/PipelineExecutor.h +++ b/src/Processors/Executors/PipelineExecutor.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -57,6 +58,11 @@ private: /// Stores processors need to be prepared. Preparing status is already set for them. TaskQueue task_queue; + /// Queue which stores tasks where processors returned Async status after prepare. + /// If multiple threads are using, main thread will wait for async tasks. + /// For single thread, will wait for async tasks only when task_queue is empty. + AsyncTaskQueue async_task_queue; + ThreadsQueue threads_queue; std::mutex task_queue_mutex; @@ -126,14 +132,14 @@ private: /// Pipeline execution related methods. void addChildlessProcessorsToStack(Stack & stack); - bool tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, size_t thread_number); + bool tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, size_t thread_number); static void addJob(ExecutingGraph::Node * execution_state); // TODO: void addAsyncJob(UInt64 pid); /// Prepare processor with pid number. /// Check parents and children of current processor and push them to stacks if they also need to be prepared. /// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used. - bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock node_lock); + bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, Queue & async_queue, std::unique_lock node_lock); bool doExpandPipeline(ExpandPipelineTask * task, bool processing); /// Continue executor (in case there are tasks in queue).