Add task stilling to PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2019-09-03 14:15:37 +03:00
parent e55647eee9
commit 99476d0039
2 changed files with 57 additions and 55 deletions

View File

@ -437,6 +437,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
Stopwatch total_time_watch;
ExecutionState * state = nullptr;
auto & context = executor_contexts[thread_num];
auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents)
{
@ -474,33 +475,25 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Just travers graph and prepare any processor.
while (!finished)
{
auto pushed = task_queue.num_pushed.load();
bool found = false;
while (pushed > task_queue.num_popped.load())
if (!context->task_queue.empty())
{
/// Fast branch.
if (task_queue.pop(state))
state = context->task_queue.front();
context->task_queue.pop();
break;
}
{
auto expected_state = context->state_to_steal.load();
if (expected_state
&& context->state_to_steal.compare_exchange_strong(expected_state, nullptr))
{
found = true;
state = expected_state;
break;
}
}
if (found)
break;
std::unique_lock lock(task_queue_mutex);
auto popped = task_queue.num_popped.load();
// if (!task_queue.empty())
// {
// state = task_queue.front();
// task_queue.pop();
// break;
// }
++num_waiting_threads;
if (num_waiting_threads == num_threads)
@ -511,10 +504,30 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
break;
}
task_queue_condvar.wait(lock, [&]()
while (!finished)
{
return finished || popped < task_queue.num_pushed.load();
});
bool found = false;
for (auto & exec_context : executor_contexts)
{
auto expected_state = exec_context->state_to_steal.load();
if (expected_state
&& exec_context->state_to_steal.compare_exchange_strong(expected_state, nullptr))
{
state = expected_state;
found = true;
break;
}
}
if (found)
break;
task_queue_condvar.wait_for(lock, std::chrono::milliseconds(1), [&]()
{
return finished.load();
});
}
--num_waiting_threads;
}
@ -547,7 +560,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
{
Stack children;
Stack parents;
Queue queue;
auto & queue = context->task_queue;
++num_processing_executors;
while (auto task = expand_pipeline_task.load())
@ -570,17 +583,26 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!queue.empty())
{
/// std::lock_guard lock(task_queue_mutex);
auto task_to_still = queue.front();
ExecutionState * expected = nullptr;
while (!queue.empty() && !finished)
{
task_queue.push(queue.front());
if (context->state_to_steal.compare_exchange_strong(expected, task_to_still))
queue.pop();
}
task_queue_condvar.notify_all();
}
// if (!queue.empty())
// {
// /// std::lock_guard lock(task_queue_mutex);
//
// while (!queue.empty() && !finished)
// {
// local_queue.push(queue.front());
// queue.pop();
// }
//
// /// task_queue_condvar.notify_all();
// }
--num_processing_executors;
while (auto task = expand_pipeline_task.load())
doExpandPipeline(task, false);

View File

@ -118,33 +118,10 @@ private:
using Stack = std::stack<UInt64>;
struct TaskQueue
{
bool pop(ExecutionState *& state)
{
if (stack.pop(state))
{
num_popped.fetch_add(1);
return true;
}
return false;
}
void push(ExecutionState * state)
{
stack.push(state);
num_pushed.fetch_add(1);
}
lfs::LFStack<ExecutionState> stack;
std::atomic<UInt64> num_pushed {0};
std::atomic<UInt64> num_popped {0};
};
using TaskQueue = std::queue<ExecutionState *>;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue task_queue;
/// TaskQueue task_queue;
std::mutex task_queue_mutex;
std::condition_variable task_queue_condvar;
@ -178,6 +155,9 @@ private:
/// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
/// This can be solved by using atomic shard ptr.
std::list<ExpandPipelineTask> task_list;
std::atomic<ExecutionState *> state_to_steal { nullptr };
std::queue<ExecutionState *> task_queue;
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;