diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 517c3476f71..7f0a3449501 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -28,8 +28,15 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception && exception.code() != ErrorCodes::QUERY_WAS_CANCELLED; } -PipelineExecutor::PipelineExecutor(Processors processors) - : processors(std::move(processors)), num_waited_tasks(0), num_tasks_to_wait(0), cancelled(false), finished(false), main_executor_flag(false), num_waiting_threads(0) +PipelineExecutor::PipelineExecutor(Processors & processors) + : processors(processors) + , num_task_queue_pulls(0) + , num_task_queue_pushes(0) + , cancelled(false) + , finished(false) + , num_waiting_threads(0) + , num_preparing_threads(0) + , node_to_expand(nullptr) { buildGraph(); } @@ -117,7 +124,7 @@ void PipelineExecutor::buildGraph() addEdges(node); } -void PipelineExecutor::addChildlessProcessorsToQueue(Stack & stack) +void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) { UInt64 num_processors = processors.size(); for (UInt64 proc = 0; proc < num_processors; ++proc) @@ -145,82 +152,25 @@ static void executeJob(IProcessor * processor) } } -//bool PipelineExecutor::tryAssignJob(ExecutionState * state) -//{ -// auto current_stream = state->current_stream; -// for (auto & executor_context : executor_contexts) -// { -// if (executor_context->current_stream == current_stream) -// { -// ExecutionState * expected = nullptr; -// if (executor_context->next_task_to_execute.compare_exchange_strong(expected, state)) -// { -// ++num_tasks_to_wait; -// return true; -// } -// } -// } -// -// return false; -//} - void PipelineExecutor::addJob(ExecutionState * execution_state) { -/// if (!threads.empty()) + auto job = [execution_state]() { - auto job = [execution_state]() + try { - // SCOPE_EXIT( - /// while (!finished_execution_queue.push(pid)); - /// event_counter.notify() - // ); + Stopwatch watch; + executeJob(execution_state->processor); + execution_state->execution_time_ns += watch.elapsed(); - try - { - Stopwatch watch; - executeJob(execution_state->processor); - execution_state->execution_time_ns += watch.elapsed(); + ++execution_state->num_executed_jobs; + } + catch (...) + { + execution_state->exception = std::current_exception(); + } + }; - ++execution_state->num_executed_jobs; - } - catch (...) - { - /// Note: It's important to save exception before pushing pid to finished_execution_queue - execution_state->exception = std::current_exception(); - } - }; - - execution_state->job = std::move(job); - /// auto * state = graph[pid].execution_state.get(); - -// bool is_stream_updated = false; -// if (state->need_update_stream) -// { -// is_stream_updated = true; -// state->current_stream = next_stream; -// ++next_stream; -// } - - /// Try assign job to executor right now. -// if (is_stream_updated || !tryAssignJob(state)) -// execution_states_queue.emplace_back(state); - - /// while (!task_queue.push(graph[pid].execution_state.get())) - /// sleep(0); - } -// else -// { -// /// Execute task in main thread. -// executeJob(graph[pid].processor); -// while (!finished_execution_queue.push(pid)); -// } -} - -void PipelineExecutor::addAsyncJob(UInt64 pid) -{ - graph[pid].processor->schedule(event_counter); - graph[pid].status = ExecStatus::Async; - ++num_tasks_to_wait; + execution_state->job = std::move(job); } void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) @@ -254,7 +204,7 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) } } -bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge, Stack & stack) +bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack) { /// In this method we have ownership on edge, but node can be concurrently accessed. @@ -308,24 +258,26 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & stack, bool async) node.last_processor_status = status; } - auto add_neighbours_to_prepare_queue = [&, this] () + auto add_neighbours_to_prepare_queue = [&] () { for (auto & edge : node.backEdges) - addProcessorToPrepareQueueIfUpdated(edge, stack); + tryAddProcessorToStackIfUpdated(edge, stack); for (auto & edge : node.directEdges) - addProcessorToPrepareQueueIfUpdated(edge, stack); + tryAddProcessorToStackIfUpdated(edge, stack); }; auto try_release_ownership = [&] () { + /// This function can be called after expand pipeline, where node from outer scope is not longer valid. + auto & node_ = graph[pid]; ExecStatus expected = ExecStatus::Idle; - node.status = ExecStatus::Idle; + node_.status = ExecStatus::Idle; - if (node.need_to_be_prepared) + if (node_.need_to_be_prepared) { - while (!node.status.compare_exchange_weak(expected, ExecStatus::Preparing)) - if (!(expected == ExecStatus::Idle) || !node.need_to_be_prepared) + while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing)) + if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared) return; stack.push(pid); @@ -387,7 +339,8 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & stack, bool async) doExpandPipeline(stack); - node.need_to_be_prepared = true; + /// node is not longer valid after pipeline was expanded + graph[pid].need_to_be_prepared = true; try_release_ownership(); break; } @@ -411,76 +364,42 @@ void PipelineExecutor::doExpandPipeline(Stack & stack) if (node_to_expand) { expandPipeline(stack, node_to_expand.load()->processors_id); + + if (graph.size() > task_queue_reserved_size) + { + task_queue.reserve(graph.size() - task_queue_reserved_size); + task_queue_reserved_size = graph.size(); + } + node_to_expand = nullptr; + lock.unlock(); condvar_to_expand_pipeline.notify_all(); } } -//void PipelineExecutor::assignJobs() -//{ -// for (auto * state : execution_states_queue) -// { -// if (!tryAssignJob(state)) -// { -// while (!task_queue.push(state)) -// sleep(0); -// -// task_condvar.notify_one(); -// ++num_tasks_to_wait; -// } -// } -// -// execution_states_queue.clear(); -//} - -//void PipelineExecutor::processPrepareQueue() -//{ -// while (!prepare_stack.empty()) -// { -// UInt64 proc = prepare_stack.top(); -// prepare_stack.pop(); -// -// prepareProcessor(proc, false); -// } -// -// assignJobs(); -//} -// -//void PipelineExecutor::processAsyncQueue() -//{ -// UInt64 num_processors = processors.size(); -// for (UInt64 node = 0; node < num_processors; ++node) -// if (graph[node].status == ExecStatus::Async) -// prepareProcessor(node, true); -// -// assignJobs(); -//} - void PipelineExecutor::finish() { - finished = true; - finish_condvar.notify_one(); + bool expected = false; + if (finished.compare_exchange_strong(expected, true)) + { + finish_condvar.notify_one(); - for (auto & context : executor_contexts) - context->condvar.notify_one(); + for (auto & context : executor_contexts) + context->condvar.notify_one(); + } } void PipelineExecutor::execute(size_t num_threads) { try { - /// Wait for all tasks to finish in case of exception. - SCOPE_EXIT( - finished = true; - - task_condvar.notify_all(); - - for (auto & thread : threads) - thread.join(); - ); - executeImpl(num_threads); + + /// Execution can be stopped because of exception. Check and rethrow if any. + for (auto & node : graph) + if (node.execution_state->exception) + std::rethrow_exception(node.execution_state->exception); } catch (Exception & exception) { @@ -538,11 +457,11 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads while (!finished) { - while (num_waited_tasks < num_tasks_to_wait) + while (num_task_queue_pulls < num_task_queue_pushes) { if (task_queue.pop(state)) { - ++num_waited_tasks; + ++num_task_queue_pulls; break; } else @@ -556,11 +475,11 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads ++num_waiting_threads; - if (num_waiting_threads == num_threads && num_waited_tasks == num_tasks_to_wait) + if (num_waiting_threads == num_threads && num_task_queue_pulls == num_task_queue_pushes) finish(); executor_contexts[thread_num]->is_waiting = true; - executor_contexts[thread_num]->condvar.wait(lock, [&]() { return finished || num_waited_tasks < num_tasks_to_wait; }); + executor_contexts[thread_num]->condvar.wait(lock, [&]() { return finished || num_task_queue_pulls < num_task_queue_pushes; }); executor_contexts[thread_num]->is_waiting = false; --num_waiting_threads; @@ -609,7 +528,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads auto cur_state = graph[stack.top()].execution_state.get(); stack.pop(); - ++num_tasks_to_wait; + ++num_task_queue_pushes; while (!task_queue.push(cur_state)); } @@ -626,13 +545,8 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads --num_preparing_threads; } - - /// Let another thread to continue. - /// main_executor_condvar.notify_all(); - processing_time_ns += processing_time_watch.elapsed(); - if (!state) continue; @@ -664,13 +578,12 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads void PipelineExecutor::executeImpl(size_t num_threads) { - /// No need to make task_queue longer than num_threads. - /// Therefore, finished_execution_queue can't be longer than num_threads too. - task_queue.reserve_unsafe(8192); + task_queue_reserved_size = std::max(min_task_queue_size, graph.size()); + task_queue.reserve_unsafe(task_queue_reserved_size); Stack stack; - addChildlessProcessorsToQueue(stack); + addChildlessProcessorsToStack(stack); while (!stack.empty()) { @@ -678,29 +591,31 @@ void PipelineExecutor::executeImpl(size_t num_threads) stack.pop(); auto cur_state = graph[proc].execution_state.get(); - ++num_tasks_to_wait; + ++num_task_queue_pushes; while (!task_queue.push(cur_state)); } - /// background_executor_flag = false; - num_preparing_threads = 0; - node_to_expand = nullptr; + ThreadPool pool(num_threads); + + SCOPE_EXIT( + finish(); + pool.wait() + ); - threads.reserve(num_threads); executor_contexts.reserve(num_threads); - auto thread_group = CurrentThread::getGroup(); - for (size_t i = 0; i < num_threads; ++i) { executor_contexts.emplace_back(std::make_unique()); auto * executor_context = executor_contexts.back().get(); executor_context->is_waiting = false; + } -// executor_context->executor_number = i; -// executor_context->next_task_to_execute = nullptr; + auto thread_group = CurrentThread::getGroup(); - threads.emplace_back([this, thread_group, thread_num = i, num_threads] + for (size_t i = 0; i < num_threads; ++i) + { + pool.schedule([this, thread_group, thread_num = i, num_threads] { ThreadStatus thread_status; @@ -716,19 +631,7 @@ void PipelineExecutor::executeImpl(size_t num_threads) }); } - { - std::mutex finish_mutex; - std::unique_lock lock(finish_mutex); - finish_condvar.wait(lock, [&]() -> bool { return finished; }); - } - - { - std::lock_guard lock(main_executor_mutex); - - for (auto & node : graph) - if (node.execution_state->exception) - std::rethrow_exception(node.execution_state->exception); - } + pool.wait(); } String PipelineExecutor::dumpPipeline() const diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index 41d52f6ab9c..05f8a8443b8 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -16,7 +16,7 @@ namespace DB class PipelineExecutor { private: - Processors processors; + Processors & processors; struct Edge { @@ -82,43 +82,28 @@ private: using Stack = std::stack; using TaskQueue = boost::lockfree::queue; - using FinishedJobsQueue = boost::lockfree::queue; - /// Queue of processes which we want to call prepare. Is used only in main thread. - Stack prepare_stack; /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. TaskQueue task_queue; + static constexpr size_t min_task_queue_size = 8192; + size_t task_queue_reserved_size = 0; - EventCounter event_counter; - - std::atomic num_waited_tasks; - std::atomic num_tasks_to_wait; + std::atomic num_task_queue_pulls; + std::atomic num_task_queue_pushes; std::atomic_bool cancelled; std::atomic_bool finished; - std::vector threads; - - std::mutex task_mutex; - std::condition_variable task_condvar; - Poco::Logger * log = &Poco::Logger::get("PipelineExecutor"); struct ExecutorContext { -// size_t executor_number; -// std::atomic next_task_to_execute; std::atomic_bool is_waiting; std::condition_variable condvar; }; std::vector> executor_contexts; - std::mutex main_executor_mutex; - std::atomic_bool main_executor_flag; - /// std::atomic_bool background_executor_flag; - std::condition_variable main_executor_condvar; - std::atomic num_waiting_threads; std::condition_variable finish_condvar; @@ -129,7 +114,7 @@ private: size_t num_waiting_threads_to_expand_pipeline = 0; public: - explicit PipelineExecutor(Processors processors); + explicit PipelineExecutor(Processors & processors); void execute(size_t num_threads); String getName() const { return "PipelineExecutor"; } @@ -144,27 +129,21 @@ public: } private: - /// Graph related methods. using ProcessorsMap = std::unordered_map; ProcessorsMap processors_map; + /// Graph related methods. bool addEdges(UInt64 node); void buildGraph(); void expandPipeline(Stack & stack, UInt64 pid); - void doExpandPipeline(Stack & stack); /// Pipeline execution related methods. - void addChildlessProcessorsToQueue(Stack & stack); - bool addProcessorToPrepareQueueIfUpdated(Edge & edge, Stack & stack); - // void processPrepareQueue(); - // void processAsyncQueue(); - - void addJob(ExecutionState * execution_state); - void addAsyncJob(UInt64 pid); - // bool tryAssignJob(ExecutionState * state); - // void assignJobs(); - + void addChildlessProcessorsToStack(Stack & stack); + bool tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack); + static void addJob(ExecutionState * execution_state); + // TODO: void addAsyncJob(UInt64 pid); bool prepareProcessor(size_t pid, Stack & stack, bool async); + void doExpandPipeline(Stack & stack); void executeImpl(size_t num_threads); void executeSingleThread(size_t thread_num, size_t num_threads);