mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Pin sources to streams in PipelineExecutor.
This commit is contained in:
parent
3a849c3c5f
commit
818696a9a7
@ -479,11 +479,20 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
||||
while (!finished)
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(executor_contexts[thread_num]->mutex);
|
||||
std::unique_lock lock(executor_contexts[thread_num]->mutex);
|
||||
if (!executor_contexts[thread_num]->pinned_tasks.empty())
|
||||
{
|
||||
state = executor_contexts[thread_num]->pinned_tasks.front();
|
||||
executor_contexts[thread_num]->pinned_tasks.pop();
|
||||
lock.unlock();
|
||||
|
||||
if (!first)
|
||||
{
|
||||
std::lock_guard guard(task_queue_mutex);
|
||||
--num_waiting_threads;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -595,7 +604,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
||||
queue.pop();
|
||||
|
||||
auto stream = task->processor->getStream();
|
||||
if (stream != IProcessor::NO_STREAM && typeid_cast<const ISource *>(task->processor))
|
||||
if (stream != IProcessor::NO_STREAM && dynamic_cast<const ISource *>(task->processor))
|
||||
{
|
||||
auto thread_to_wake = stream % num_threads;
|
||||
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);
|
||||
|
Loading…
Reference in New Issue
Block a user