diff --git a/src/Processors/Executors/ExecutionThreadContext.h b/src/Processors/Executors/ExecutionThreadContext.h index ecc9714e01e..deacd0380fa 100644 --- a/src/Processors/Executors/ExecutionThreadContext.h +++ b/src/Processors/Executors/ExecutionThreadContext.h @@ -43,8 +43,13 @@ public: const bool profile_processors; const bool trace_processors; - constexpr static size_t max_consequintely_scheduled_local_tasks = 128; - size_t num_consequintely_scheduled_local_tasks = 0; + /// There is a performance optimization that schedules a task to the current thread, avoiding global task queue. + /// Optimization decreases contention on global task queue but may cause starvation. + /// See 01104_distributed_numbers_test.sql + /// This constant tells us that we should skip the optimization + /// if it was applied more than `max_scheduled_local_tasks` in a row. + constexpr static size_t max_scheduled_local_tasks = 128; + size_t num_scheduled_local_tasks = 0; void wait(std::atomic_bool & finished); void wakeUp(); diff --git a/src/Processors/Executors/ExecutorTasks.cpp b/src/Processors/Executors/ExecutorTasks.cpp index 4569858276a..ec1fc539884 100644 --- a/src/Processors/Executors/ExecutorTasks.cpp +++ b/src/Processors/Executors/ExecutorTasks.cpp @@ -121,14 +121,14 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea /// Take local task from queue if has one. if (!queue.empty() && !context.hasAsyncTasks() - && context.num_consequintely_scheduled_local_tasks < context.max_consequintely_scheduled_local_tasks) + && context.num_scheduled_local_tasks < context.max_scheduled_local_tasks) { - ++context.num_consequintely_scheduled_local_tasks; + ++context.num_scheduled_local_tasks; context.setTask(queue.front()); queue.pop(); } else - context.num_consequintely_scheduled_local_tasks = 0; + context.num_scheduled_local_tasks = 0; if (!queue.empty() || !async_queue.empty()) {