Revert PipelineExecutor changes.

This commit is contained in:
Nikolai Kochetov 2019-09-03 21:05:44 +03:00
parent 16284b98b5
commit 73c052f576
2 changed files with 18 additions and 68 deletions

View File

@ -437,7 +437,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
Stopwatch total_time_watch;
ExecutionState * state = nullptr;
auto & context = executor_contexts[thread_num];
auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents)
{
@ -475,47 +474,15 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Just travers graph and prepare any processor.
while (!finished)
{
/// Fast way. Get task from local queue.
if (!context->task_queue.empty())
{
state = context->task_queue.front();
context->task_queue.pop();
break;
}
/// Steal from current thread if can.
{
auto expected_state = context->state_to_steal.load();
if (expected_state
&& context->state_to_steal.compare_exchange_strong(expected_state, nullptr))
{
state = expected_state;
break;
}
}
/// Steal from any other thread.
{
bool found = false;
for (auto & exec_context : executor_contexts)
{
auto expected_state = exec_context->state_to_steal.load();
if (expected_state
&& exec_context->state_to_steal.compare_exchange_strong(expected_state, nullptr))
{
state = expected_state;
found = true;
break;
}
}
if (found)
break;
}
std::unique_lock lock(task_queue_mutex);
if (!task_queue.empty())
{
state = task_queue.front();
task_queue.pop();
break;
}
++num_waiting_threads;
if (num_waiting_threads == num_threads)
@ -526,9 +493,9 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
break;
}
task_queue_condvar.wait_for(lock, std::chrono::milliseconds(1), [&]()
task_queue_condvar.wait(lock, [&]()
{
return finished.load();
return finished || !task_queue.empty();
});
--num_waiting_threads;
@ -562,7 +529,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
{
Stack children;
Stack parents;
auto & queue = context->task_queue;
Queue queue;
++num_processing_executors;
while (auto task = expand_pipeline_task.load())
@ -585,25 +552,16 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
if (!queue.empty())
{
auto task_to_still = queue.front();
ExecutionState * expected = nullptr;
std::lock_guard lock(task_queue_mutex);
if (context->state_to_steal.compare_exchange_strong(expected, task_to_still))
while (!queue.empty() && !finished)
{
task_queue.push(queue.front());
queue.pop();
}
// if (!queue.empty())
// {
// /// std::lock_guard lock(task_queue_mutex);
//
// while (!queue.empty() && !finished)
// {
// local_queue.push(queue.front());
// queue.pop();
// }
//
// /// task_queue_condvar.notify_all();
// }
task_queue_condvar.notify_all();
}
--num_processing_executors;
while (auto task = expand_pipeline_task.load())
@ -654,7 +612,6 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{
std::lock_guard lock(task_queue_mutex);
size_t cur_thread = 0;
while (!stack.empty())
{
@ -664,11 +621,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (prepareProcessor(proc, stack, stack, 0, false))
{
auto cur_state = graph[proc].execution_state.get();
executor_contexts[cur_thread]->task_queue.push(cur_state);
++cur_thread;
if (cur_thread == num_threads)
cur_thread = 0;
task_queue.push(cur_state);
}
}
}

View File

@ -3,7 +3,6 @@
#include <queue>
#include <stack>
#include <Processors/IProcessor.h>
#include <Processors/Executors/LFStack.h>
#include <mutex>
#include <Common/ThreadPool.h>
#include <Common/EventCounter.h>
@ -119,9 +118,10 @@ private:
using Stack = std::stack<UInt64>;
using TaskQueue = std::queue<ExecutionState *>;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
/// TaskQueue task_queue;
TaskQueue task_queue;
std::mutex task_queue_mutex;
std::condition_variable task_queue_condvar;
@ -155,9 +155,6 @@ private:
/// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
/// This can be solved by using atomic shard ptr.
std::list<ExpandPipelineTask> task_list;
std::atomic<ExecutionState *> state_to_steal { nullptr };
std::queue<ExecutionState *> task_queue;
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;