Try to fix race in PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2019-09-09 20:10:37 +03:00
parent 9df1235046
commit da798142a5
2 changed files with 27 additions and 25 deletions

View File

@ -472,8 +472,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
while (!finished)
{
bool first = true;
/// First, find any processor to execute.
/// Just travers graph and prepare any processor.
while (!finished)
@ -486,12 +484,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
executor_contexts[thread_num]->pinned_tasks.pop();
lock.unlock();
if (!first)
{
std::lock_guard guard(task_queue_mutex);
--num_waiting_threads;
}
break;
}
}
@ -499,11 +491,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
{
std::unique_lock lock(task_queue_mutex);
if (!first)
--num_waiting_threads;
first = false;
if (!task_queue.empty())
{
state = task_queue.front();
@ -513,6 +500,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
{
auto thread_to_wake = threads_queue.top();
threads_queue.pop();
--num_waiting_threads;
lock.unlock();
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);
@ -587,13 +575,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Process all neighbours. Children will be on the top of stack, then parents.
prepare_all_processors(queue, children, children, parents);
// if (!state && !queue.empty())
// {
// state = queue.front();
// queue.pop();
// }
prepare_all_processors(queue, parents, parents, parents);
if (!state && !queue.empty())
@ -612,11 +593,28 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
auto stream = task->processor->getStream();
if (stream != IProcessor::NO_STREAM && dynamic_cast<const ISource *>(task->processor))
{
bool found_in_queue = false;
{
std::unique_lock lock(task_queue_mutex);
auto it = threads_queue.find(stream);
if (it != threads_queue.end())
{
threads_queue.erase(it);
--num_waiting_threads;
found_in_queue = true;
}
}
auto thread_to_wake = stream % num_threads;
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);
executor_contexts[thread_to_wake]->pinned_tasks.push(task);
executor_contexts[thread_to_wake]->wake_flag = true;
executor_contexts[thread_to_wake]->condvar.notify_one();
if (found_in_queue)
{
executor_contexts[thread_to_wake]->wake_flag = true;
executor_contexts[thread_to_wake]->condvar.notify_one();
}
}
else
tmp_queue.push(task);
@ -637,8 +635,10 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!threads_queue.empty())
{
auto thread_to_wake = threads_queue.top();
threads_queue.pop();
auto it = threads_queue.begin();
threads_queue.erase(it);
--num_waiting_threads;
auto thread_to_wake = *it;
lock.unlock();
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);

View File

@ -10,6 +10,8 @@
#include <boost/lockfree/stack.hpp>
#include <map>
namespace DB
{
@ -122,7 +124,7 @@ private:
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue task_queue;
std::stack<size_t> threads_queue;
std::map<size_t> threads_queue;
std::mutex task_queue_mutex;
std::atomic_bool cancelled;