diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 183ce194396..304a782a490 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -590,17 +590,17 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads if (stream != IProcessor::NO_STREAM && dynamic_cast(task->processor)) { bool found_in_queue = false; + auto thread_to_wake = stream % num_threads; { std::unique_lock lock(task_queue_mutex); - if (threads_queue.has(stream)) + if (threads_queue.has(thread_to_wake)) { - threads_queue.pop(stream); + threads_queue.pop(thread_to_wake); found_in_queue = true; } } - auto thread_to_wake = stream % num_threads; std::lock_guard guard(executor_contexts[thread_to_wake]->mutex); executor_contexts[thread_to_wake]->pinned_tasks.push(task);