diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 1b7a22a3ca2..c3a2b7989df 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -453,7 +453,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads return false; }; - using Queue = std::queue; + using Queue = std::vector; auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents) { @@ -463,7 +463,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads stack.pop(); if (prepare_processor(current_processor, children, parents)) - queue.push(graph[current_processor].execution_state.get()); + queue.push_back(graph[current_processor].execution_state.get()); } }; @@ -541,12 +541,26 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads /// Process all neighbours. Children will be on the top of stack, then parents. prepare_all_processors(queue, children, children, parents); - if (!state && !queue.empty()) + if (!state) { - state = queue.front(); - queue.pop(); + for (auto & exec_state : queue) + { + auto stream = exec_state->processor->getStream(); + + if (stream == thread_num) + { + state = exec_state; + break; + } + + if (stream == IProcessor::NO_STREAM) + state = exec_state; + } } + if (!state && !queue.empty()) + state = queue.back(); + prepare_all_processors(queue, parents, parents, parents); if (!queue.empty()) @@ -555,8 +569,10 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads while (!queue.empty() && !finished) { - task_queue.push(queue.front()); - queue.pop(); + if (queue.back() != state) + task_queue.push(queue.back()); + + queue.pop_back(); } task_queue_condvar.notify_all();