Revert to LFStack.

This commit is contained in:
Nikolai Kochetov 2019-09-03 11:42:26 +03:00
parent 52ca3f2b4c
commit 702c1b03da
2 changed files with 51 additions and 78 deletions

View File

@ -437,7 +437,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
Stopwatch total_time_watch; Stopwatch total_time_watch;
ExecutionState * state = nullptr; ExecutionState * state = nullptr;
std::queue<ExecutionState *> local_queue;
auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents) auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents)
{ {
@ -454,7 +453,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
return false; return false;
}; };
using Queue = std::vector<ExecutionState *>; using Queue = std::queue<ExecutionState *>;
auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents) auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents)
{ {
@ -464,7 +463,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
stack.pop(); stack.pop();
if (prepare_processor(current_processor, children, parents)) if (prepare_processor(current_processor, children, parents))
queue.push_back(graph[current_processor].execution_state.get()); queue.push(graph[current_processor].execution_state.get());
} }
}; };
@ -475,19 +474,33 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Just travers graph and prepare any processor. /// Just travers graph and prepare any processor.
while (!finished) while (!finished)
{ {
if (!local_queue.empty()) auto pushed = task_queue.num_pushed.load();
bool found = false;
while (pushed > task_queue.num_popped.load())
{ {
state = local_queue.front(); /// Fast branch.
local_queue.pop(); if (task_queue.pop(state))
{
found = true;
break; break;
} }
}
state = task_queue.pop(thread_num); if (found)
if (state)
break; break;
std::unique_lock lock(task_queue_mutex); std::unique_lock lock(task_queue_mutex);
auto popped = task_queue.num_popped.load();
// if (!task_queue.empty())
// {
// state = task_queue.front();
// task_queue.pop();
// break;
// }
++num_waiting_threads; ++num_waiting_threads;
if (num_waiting_threads == num_threads) if (num_waiting_threads == num_threads)
@ -500,7 +513,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
task_queue_condvar.wait(lock, [&]() task_queue_condvar.wait(lock, [&]()
{ {
return finished || !task_queue.empty(thread_num) || !task_queue.empty(num_threads); return finished || popped < task_queue.num_pushed.load();
}); });
--num_waiting_threads; --num_waiting_threads;
@ -547,34 +560,26 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Process all neighbours. Children will be on the top of stack, then parents. /// Process all neighbours. Children will be on the top of stack, then parents.
prepare_all_processors(queue, children, children, parents); prepare_all_processors(queue, children, children, parents);
if (!state && !queue.empty())
for (auto & exec_state : queue)
{ {
auto stream = exec_state->processor->getStream(); state = queue.front();
queue.pop();
if (stream == thread_num)
local_queue.push(exec_state);
else
task_queue.push(exec_state);
} }
queue.clear();
prepare_all_processors(queue, parents, parents, parents); prepare_all_processors(queue, parents, parents, parents);
for (auto & exec_state : queue) if (!queue.empty())
{ {
auto stream = exec_state->processor->getStream(); /// std::lock_guard lock(task_queue_mutex);
if (stream == thread_num) while (!queue.empty() && !finished)
local_queue.push(exec_state); {
else task_queue.push(queue.front());
task_queue.push(exec_state); queue.pop();
} }
queue.clear();
task_queue_condvar.notify_all(); task_queue_condvar.notify_all();
}
--num_processing_executors; --num_processing_executors;
while (auto task = expand_pipeline_task.load()) while (auto task = expand_pipeline_task.load())
@ -597,8 +602,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
void PipelineExecutor::executeImpl(size_t num_threads) void PipelineExecutor::executeImpl(size_t num_threads)
{ {
task_queue.init(num_threads);
Stack stack; Stack stack;
executor_contexts.reserve(num_threads); executor_contexts.reserve(num_threads);

View File

@ -120,56 +120,26 @@ private:
struct TaskQueue struct TaskQueue
{ {
bool pop(ExecutionState *& state)
{
if (stack.pop(state))
{
num_popped.fetch_add(1);
return true;
}
return false;
}
void push(ExecutionState * state) void push(ExecutionState * state)
{ {
auto stream = state->processor->getStream(); stack.push(state);
if (stream >= queues.size()) num_pushed.fetch_add(1);
stream = queues.size() - 1;
std::lock_guard lg(*mutexes[stream]);
queues[stream].push(state);
} }
ExecutionState * pop(size_t stream) lfs::LFStack<ExecutionState> stack;
{ std::atomic<UInt64> num_pushed {0};
{ std::atomic<UInt64> num_popped {0};
std::lock_guard lg(*mutexes[stream]);
if (!queues[stream].empty())
{
auto res = queues[stream].front();
queues[stream].pop();
return res;
}
}
stream = queues.size() - 1;
if (!queues[stream].empty())
{
auto res = queues[stream].front();
queues[stream].pop();
return res;
}
return nullptr;
}
bool empty(size_t stream)
{
std::lock_guard lg(*mutexes[stream]);
return queues[stream].empty();
}
void init(size_t num_streams)
{
queues.resize(num_streams + 1);
for (size_t i = 0; i <= num_streams; ++i)
mutexes.emplace_back(std::make_unique<std::mutex>());
}
std::vector<std::queue<ExecutionState *>> queues;
std::vector<std::unique_ptr<std::mutex>> mutexes;
}; };
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.