Use ThreadPool in PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2019-06-26 18:57:40 +03:00
parent f808d71aba
commit e8b4362ed2
2 changed files with 88 additions and 206 deletions

View File

@ -28,8 +28,15 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
PipelineExecutor::PipelineExecutor(Processors processors)
: processors(std::move(processors)), num_waited_tasks(0), num_tasks_to_wait(0), cancelled(false), finished(false), main_executor_flag(false), num_waiting_threads(0)
PipelineExecutor::PipelineExecutor(Processors & processors)
: processors(processors)
, num_task_queue_pulls(0)
, num_task_queue_pushes(0)
, cancelled(false)
, finished(false)
, num_waiting_threads(0)
, num_preparing_threads(0)
, node_to_expand(nullptr)
{
buildGraph();
}
@ -117,7 +124,7 @@ void PipelineExecutor::buildGraph()
addEdges(node);
}
void PipelineExecutor::addChildlessProcessorsToQueue(Stack & stack)
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
{
UInt64 num_processors = processors.size();
for (UInt64 proc = 0; proc < num_processors; ++proc)
@ -145,82 +152,25 @@ static void executeJob(IProcessor * processor)
}
}
//bool PipelineExecutor::tryAssignJob(ExecutionState * state)
//{
// auto current_stream = state->current_stream;
// for (auto & executor_context : executor_contexts)
// {
// if (executor_context->current_stream == current_stream)
// {
// ExecutionState * expected = nullptr;
// if (executor_context->next_task_to_execute.compare_exchange_strong(expected, state))
// {
// ++num_tasks_to_wait;
// return true;
// }
// }
// }
//
// return false;
//}
void PipelineExecutor::addJob(ExecutionState * execution_state)
{
/// if (!threads.empty())
auto job = [execution_state]()
{
auto job = [execution_state]()
try
{
// SCOPE_EXIT(
/// while (!finished_execution_queue.push(pid));
/// event_counter.notify()
// );
Stopwatch watch;
executeJob(execution_state->processor);
execution_state->execution_time_ns += watch.elapsed();
try
{
Stopwatch watch;
executeJob(execution_state->processor);
execution_state->execution_time_ns += watch.elapsed();
++execution_state->num_executed_jobs;
}
catch (...)
{
execution_state->exception = std::current_exception();
}
};
++execution_state->num_executed_jobs;
}
catch (...)
{
/// Note: It's important to save exception before pushing pid to finished_execution_queue
execution_state->exception = std::current_exception();
}
};
execution_state->job = std::move(job);
/// auto * state = graph[pid].execution_state.get();
// bool is_stream_updated = false;
// if (state->need_update_stream)
// {
// is_stream_updated = true;
// state->current_stream = next_stream;
// ++next_stream;
// }
/// Try assign job to executor right now.
// if (is_stream_updated || !tryAssignJob(state))
// execution_states_queue.emplace_back(state);
/// while (!task_queue.push(graph[pid].execution_state.get()))
/// sleep(0);
}
// else
// {
// /// Execute task in main thread.
// executeJob(graph[pid].processor);
// while (!finished_execution_queue.push(pid));
// }
}
void PipelineExecutor::addAsyncJob(UInt64 pid)
{
graph[pid].processor->schedule(event_counter);
graph[pid].status = ExecStatus::Async;
++num_tasks_to_wait;
execution_state->job = std::move(job);
}
void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
@ -254,7 +204,7 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
}
}
bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge, Stack & stack)
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack)
{
/// In this method we have ownership on edge, but node can be concurrently accessed.
@ -308,24 +258,26 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & stack, bool async)
node.last_processor_status = status;
}
auto add_neighbours_to_prepare_queue = [&, this] ()
auto add_neighbours_to_prepare_queue = [&] ()
{
for (auto & edge : node.backEdges)
addProcessorToPrepareQueueIfUpdated(edge, stack);
tryAddProcessorToStackIfUpdated(edge, stack);
for (auto & edge : node.directEdges)
addProcessorToPrepareQueueIfUpdated(edge, stack);
tryAddProcessorToStackIfUpdated(edge, stack);
};
auto try_release_ownership = [&] ()
{
/// This function can be called after expand pipeline, where node from outer scope is not longer valid.
auto & node_ = graph[pid];
ExecStatus expected = ExecStatus::Idle;
node.status = ExecStatus::Idle;
node_.status = ExecStatus::Idle;
if (node.need_to_be_prepared)
if (node_.need_to_be_prepared)
{
while (!node.status.compare_exchange_weak(expected, ExecStatus::Preparing))
if (!(expected == ExecStatus::Idle) || !node.need_to_be_prepared)
while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing))
if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared)
return;
stack.push(pid);
@ -387,7 +339,8 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & stack, bool async)
doExpandPipeline(stack);
node.need_to_be_prepared = true;
/// node is not longer valid after pipeline was expanded
graph[pid].need_to_be_prepared = true;
try_release_ownership();
break;
}
@ -411,76 +364,42 @@ void PipelineExecutor::doExpandPipeline(Stack & stack)
if (node_to_expand)
{
expandPipeline(stack, node_to_expand.load()->processors_id);
if (graph.size() > task_queue_reserved_size)
{
task_queue.reserve(graph.size() - task_queue_reserved_size);
task_queue_reserved_size = graph.size();
}
node_to_expand = nullptr;
lock.unlock();
condvar_to_expand_pipeline.notify_all();
}
}
//void PipelineExecutor::assignJobs()
//{
// for (auto * state : execution_states_queue)
// {
// if (!tryAssignJob(state))
// {
// while (!task_queue.push(state))
// sleep(0);
//
// task_condvar.notify_one();
// ++num_tasks_to_wait;
// }
// }
//
// execution_states_queue.clear();
//}
//void PipelineExecutor::processPrepareQueue()
//{
// while (!prepare_stack.empty())
// {
// UInt64 proc = prepare_stack.top();
// prepare_stack.pop();
//
// prepareProcessor(proc, false);
// }
//
// assignJobs();
//}
//
//void PipelineExecutor::processAsyncQueue()
//{
// UInt64 num_processors = processors.size();
// for (UInt64 node = 0; node < num_processors; ++node)
// if (graph[node].status == ExecStatus::Async)
// prepareProcessor(node, true);
//
// assignJobs();
//}
void PipelineExecutor::finish()
{
finished = true;
finish_condvar.notify_one();
bool expected = false;
if (finished.compare_exchange_strong(expected, true))
{
finish_condvar.notify_one();
for (auto & context : executor_contexts)
context->condvar.notify_one();
for (auto & context : executor_contexts)
context->condvar.notify_one();
}
}
void PipelineExecutor::execute(size_t num_threads)
{
try
{
/// Wait for all tasks to finish in case of exception.
SCOPE_EXIT(
finished = true;
task_condvar.notify_all();
for (auto & thread : threads)
thread.join();
);
executeImpl(num_threads);
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph)
if (node.execution_state->exception)
std::rethrow_exception(node.execution_state->exception);
}
catch (Exception & exception)
{
@ -538,11 +457,11 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
while (!finished)
{
while (num_waited_tasks < num_tasks_to_wait)
while (num_task_queue_pulls < num_task_queue_pushes)
{
if (task_queue.pop(state))
{
++num_waited_tasks;
++num_task_queue_pulls;
break;
}
else
@ -556,11 +475,11 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
++num_waiting_threads;
if (num_waiting_threads == num_threads && num_waited_tasks == num_tasks_to_wait)
if (num_waiting_threads == num_threads && num_task_queue_pulls == num_task_queue_pushes)
finish();
executor_contexts[thread_num]->is_waiting = true;
executor_contexts[thread_num]->condvar.wait(lock, [&]() { return finished || num_waited_tasks < num_tasks_to_wait; });
executor_contexts[thread_num]->condvar.wait(lock, [&]() { return finished || num_task_queue_pulls < num_task_queue_pushes; });
executor_contexts[thread_num]->is_waiting = false;
--num_waiting_threads;
@ -609,7 +528,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
auto cur_state = graph[stack.top()].execution_state.get();
stack.pop();
++num_tasks_to_wait;
++num_task_queue_pushes;
while (!task_queue.push(cur_state));
}
@ -626,13 +545,8 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
--num_preparing_threads;
}
/// Let another thread to continue.
/// main_executor_condvar.notify_all();
processing_time_ns += processing_time_watch.elapsed();
if (!state)
continue;
@ -664,13 +578,12 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
void PipelineExecutor::executeImpl(size_t num_threads)
{
/// No need to make task_queue longer than num_threads.
/// Therefore, finished_execution_queue can't be longer than num_threads too.
task_queue.reserve_unsafe(8192);
task_queue_reserved_size = std::max<size_t>(min_task_queue_size, graph.size());
task_queue.reserve_unsafe(task_queue_reserved_size);
Stack stack;
addChildlessProcessorsToQueue(stack);
addChildlessProcessorsToStack(stack);
while (!stack.empty())
{
@ -678,29 +591,31 @@ void PipelineExecutor::executeImpl(size_t num_threads)
stack.pop();
auto cur_state = graph[proc].execution_state.get();
++num_tasks_to_wait;
++num_task_queue_pushes;
while (!task_queue.push(cur_state));
}
/// background_executor_flag = false;
num_preparing_threads = 0;
node_to_expand = nullptr;
ThreadPool pool(num_threads);
SCOPE_EXIT(
finish();
pool.wait()
);
threads.reserve(num_threads);
executor_contexts.reserve(num_threads);
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i)
{
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
auto * executor_context = executor_contexts.back().get();
executor_context->is_waiting = false;
}
// executor_context->executor_number = i;
// executor_context->next_task_to_execute = nullptr;
auto thread_group = CurrentThread::getGroup();
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
for (size_t i = 0; i < num_threads; ++i)
{
pool.schedule([this, thread_group, thread_num = i, num_threads]
{
ThreadStatus thread_status;
@ -716,19 +631,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
});
}
{
std::mutex finish_mutex;
std::unique_lock lock(finish_mutex);
finish_condvar.wait(lock, [&]() -> bool { return finished; });
}
{
std::lock_guard lock(main_executor_mutex);
for (auto & node : graph)
if (node.execution_state->exception)
std::rethrow_exception(node.execution_state->exception);
}
pool.wait();
}
String PipelineExecutor::dumpPipeline() const

View File

@ -16,7 +16,7 @@ namespace DB
class PipelineExecutor
{
private:
Processors processors;
Processors & processors;
struct Edge
{
@ -82,43 +82,28 @@ private:
using Stack = std::stack<UInt64>;
using TaskQueue = boost::lockfree::queue<ExecutionState *>;
using FinishedJobsQueue = boost::lockfree::queue<UInt64>;
/// Queue of processes which we want to call prepare. Is used only in main thread.
Stack prepare_stack;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
TaskQueue task_queue;
static constexpr size_t min_task_queue_size = 8192;
size_t task_queue_reserved_size = 0;
EventCounter event_counter;
std::atomic<UInt64> num_waited_tasks;
std::atomic<UInt64> num_tasks_to_wait;
std::atomic<UInt64> num_task_queue_pulls;
std::atomic<UInt64> num_task_queue_pushes;
std::atomic_bool cancelled;
std::atomic_bool finished;
std::vector<std::thread> threads;
std::mutex task_mutex;
std::condition_variable task_condvar;
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");
struct ExecutorContext
{
// size_t executor_number;
// std::atomic<ExecutionState *> next_task_to_execute;
std::atomic_bool is_waiting;
std::condition_variable condvar;
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
std::mutex main_executor_mutex;
std::atomic_bool main_executor_flag;
/// std::atomic_bool background_executor_flag;
std::condition_variable main_executor_condvar;
std::atomic<size_t> num_waiting_threads;
std::condition_variable finish_condvar;
@ -129,7 +114,7 @@ private:
size_t num_waiting_threads_to_expand_pipeline = 0;
public:
explicit PipelineExecutor(Processors processors);
explicit PipelineExecutor(Processors & processors);
void execute(size_t num_threads);
String getName() const { return "PipelineExecutor"; }
@ -144,27 +129,21 @@ public:
}
private:
/// Graph related methods.
using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>;
ProcessorsMap processors_map;
/// Graph related methods.
bool addEdges(UInt64 node);
void buildGraph();
void expandPipeline(Stack & stack, UInt64 pid);
void doExpandPipeline(Stack & stack);
/// Pipeline execution related methods.
void addChildlessProcessorsToQueue(Stack & stack);
bool addProcessorToPrepareQueueIfUpdated(Edge & edge, Stack & stack);
// void processPrepareQueue();
// void processAsyncQueue();
void addJob(ExecutionState * execution_state);
void addAsyncJob(UInt64 pid);
// bool tryAssignJob(ExecutionState * state);
// void assignJobs();
void addChildlessProcessorsToStack(Stack & stack);
bool tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack);
static void addJob(ExecutionState * execution_state);
// TODO: void addAsyncJob(UInt64 pid);
bool prepareProcessor(size_t pid, Stack & stack, bool async);
void doExpandPipeline(Stack & stack);
void executeImpl(size_t num_threads);
void executeSingleThread(size_t thread_num, size_t num_threads);