diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 7c9b3f2af7b..2300dacb92b 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -472,8 +472,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads while (!finished) { - bool first = true; - /// First, find any processor to execute. /// Just travers graph and prepare any processor. while (!finished) @@ -486,12 +484,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads executor_contexts[thread_num]->pinned_tasks.pop(); lock.unlock(); - if (!first) - { - std::lock_guard guard(task_queue_mutex); - --num_waiting_threads; - } - break; } } @@ -499,11 +491,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads { std::unique_lock lock(task_queue_mutex); - if (!first) - --num_waiting_threads; - - first = false; - if (!task_queue.empty()) { state = task_queue.front(); @@ -513,6 +500,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads { auto thread_to_wake = threads_queue.top(); threads_queue.pop(); + --num_waiting_threads; lock.unlock(); std::lock_guard guard(executor_contexts[thread_to_wake]->mutex); @@ -587,13 +575,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads /// Process all neighbours. Children will be on the top of stack, then parents. prepare_all_processors(queue, children, children, parents); - -// if (!state && !queue.empty()) -// { -// state = queue.front(); -// queue.pop(); -// } - prepare_all_processors(queue, parents, parents, parents); if (!state && !queue.empty()) @@ -612,11 +593,28 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads auto stream = task->processor->getStream(); if (stream != IProcessor::NO_STREAM && dynamic_cast(task->processor)) { + bool found_in_queue = false; + + { + std::unique_lock lock(task_queue_mutex); + auto it = threads_queue.find(stream); + if (it != threads_queue.end()) + { + threads_queue.erase(it); + --num_waiting_threads; + found_in_queue = true; + } + } + auto thread_to_wake = stream % num_threads; std::lock_guard guard(executor_contexts[thread_to_wake]->mutex); executor_contexts[thread_to_wake]->pinned_tasks.push(task); - executor_contexts[thread_to_wake]->wake_flag = true; - executor_contexts[thread_to_wake]->condvar.notify_one(); + + if (found_in_queue) + { + executor_contexts[thread_to_wake]->wake_flag = true; + executor_contexts[thread_to_wake]->condvar.notify_one(); + } } else tmp_queue.push(task); @@ -637,8 +635,10 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads if (!threads_queue.empty()) { - auto thread_to_wake = threads_queue.top(); - threads_queue.pop(); + auto it = threads_queue.begin(); + threads_queue.erase(it); + --num_waiting_threads; + auto thread_to_wake = *it; lock.unlock(); std::lock_guard guard(executor_contexts[thread_to_wake]->mutex); diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index 22af088cde5..45503c4c06f 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -10,6 +10,8 @@ #include +#include + namespace DB { @@ -122,7 +124,7 @@ private: /// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set. /// Stores processors need to be prepared. Preparing status is already set for them. TaskQueue task_queue; - std::stack threads_queue; + std::map threads_queue; std::mutex task_queue_mutex; std::atomic_bool cancelled;