Fixing tests.

This commit is contained in:
Nikolai Kochetov 2020-12-15 15:07:31 +03:00
parent 716e0b472b
commit 24c390eb49
2 changed files with 6 additions and 1 deletions

View File

@ -502,6 +502,9 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
node = context->async_tasks.front(); node = context->async_tasks.front();
context->async_tasks.pop(); context->async_tasks.pop();
--num_waiting_async_tasks; --num_waiting_async_tasks;
if (context->async_tasks.empty())
context->has_async_tasks = false;
} }
else if (!task_queue.empty()) else if (!task_queue.empty())
node = task_queue.pop(thread_num); node = task_queue.pop(thread_num);
@ -610,7 +613,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
node = nullptr; node = nullptr;
/// Take local task from queue if has one. /// Take local task from queue if has one.
if (!queue.empty()) if (!queue.empty() && !context->has_async_tasks)
{ {
node = queue.front(); node = queue.front();
queue.pop(); queue.pop();
@ -784,6 +787,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{ {
auto * node = static_cast<ExecutingGraph::Node *>(task.data); auto * node = static_cast<ExecutingGraph::Node *>(task.data);
executor_contexts[task.thread_num]->async_tasks.push(node); executor_contexts[task.thread_num]->async_tasks.push(node);
executor_contexts[task.thread_num]->has_async_tasks = true;
++num_waiting_async_tasks; ++num_waiting_async_tasks;
if (threads_queue.has(task.thread_num)) if (threads_queue.has(task.thread_num))

View File

@ -98,6 +98,7 @@ private:
std::list<ExpandPipelineTask> task_list; std::list<ExpandPipelineTask> task_list;
std::queue<ExecutingGraph::Node *> async_tasks; std::queue<ExecutingGraph::Node *> async_tasks;
std::atomic_bool has_async_tasks = false;
std::condition_variable condvar; std::condition_variable condvar;
std::mutex mutex; std::mutex mutex;