mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Add stream enumeration to Processors.
This commit is contained in:
parent
464595a548
commit
88044fdc16
@ -453,7 +453,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
|||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
using Queue = std::queue<ExecutionState *>;
|
using Queue = std::vector<ExecutionState *>;
|
||||||
|
|
||||||
auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents)
|
auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents)
|
||||||
{
|
{
|
||||||
@ -463,7 +463,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
|||||||
stack.pop();
|
stack.pop();
|
||||||
|
|
||||||
if (prepare_processor(current_processor, children, parents))
|
if (prepare_processor(current_processor, children, parents))
|
||||||
queue.push(graph[current_processor].execution_state.get());
|
queue.push_back(graph[current_processor].execution_state.get());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -541,12 +541,26 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
|||||||
/// Process all neighbours. Children will be on the top of stack, then parents.
|
/// Process all neighbours. Children will be on the top of stack, then parents.
|
||||||
prepare_all_processors(queue, children, children, parents);
|
prepare_all_processors(queue, children, children, parents);
|
||||||
|
|
||||||
if (!state && !queue.empty())
|
if (!state)
|
||||||
{
|
{
|
||||||
state = queue.front();
|
for (auto & exec_state : queue)
|
||||||
queue.pop();
|
{
|
||||||
|
auto stream = exec_state->processor->getStream();
|
||||||
|
|
||||||
|
if (stream == thread_num)
|
||||||
|
{
|
||||||
|
state = exec_state;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream == IProcessor::NO_STREAM)
|
||||||
|
state = exec_state;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!state && !queue.empty())
|
||||||
|
state = queue.back();
|
||||||
|
|
||||||
prepare_all_processors(queue, parents, parents, parents);
|
prepare_all_processors(queue, parents, parents, parents);
|
||||||
|
|
||||||
if (!queue.empty())
|
if (!queue.empty())
|
||||||
@ -555,8 +569,10 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
|||||||
|
|
||||||
while (!queue.empty() && !finished)
|
while (!queue.empty() && !finished)
|
||||||
{
|
{
|
||||||
task_queue.push(queue.front());
|
if (queue.back() != state)
|
||||||
queue.pop();
|
task_queue.push(queue.back());
|
||||||
|
|
||||||
|
queue.pop_back();
|
||||||
}
|
}
|
||||||
|
|
||||||
task_queue_condvar.notify_all();
|
task_queue_condvar.notify_all();
|
||||||
|
Loading…
Reference in New Issue
Block a user