Don't use a lot of stack for pipeline traverse.

This commit is contained in:
Nikolai Kochetov 2021-10-26 21:50:13 +03:00
parent 05f42e2d07
commit c7a07bafe0
5 changed files with 129 additions and 137 deletions

View File

@ -45,7 +45,7 @@ class ExecutionThreadContext
{
private:
/// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
/// This can be solved by using atomic shard ptr.
/// This can be solved by using atomic shared ptr.
std::list<StoppingPipelineTask> task_list;
/// A queue of async tasks. Task is added to queue when waited.

View File

@ -125,155 +125,149 @@ bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
return true;
}
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, ExecutionThreadContext & thread_context)
bool PipelineExecutor::prepareProcessor(UInt64 pid, ExecutionThreadContext & thread_context, Queue & queue, Queue & async_queue)
{
/// In this method we have ownership on edge, but node can be concurrently accessed.
std::stack<ExecutingGraph::Edge *> updated_edges;
Stack updated_processors;
updated_processors.push(pid);
auto & node = *graph->nodes[edge.to];
std::unique_lock lock(node.status_mutex);
ExecutingGraph::ExecStatus status = node.status;
if (status == ExecutingGraph::ExecStatus::Finished)
return true;
if (edge.backward)
node.updated_output_ports.push_back(edge.output_port_number);
else
node.updated_input_ports.push_back(edge.input_port_number);
if (status == ExecutingGraph::ExecStatus::Idle)
while (!updated_processors.empty() || !updated_edges.empty())
{
node.status = ExecutingGraph::ExecStatus::Preparing;
return prepareProcessor(edge.to, thread_context, queue, async_queue, std::move(lock));
}
else
graph->nodes[edge.to]->processor->onUpdatePorts();
std::optional<std::unique_lock<std::mutex>> stack_top_lock;
return true;
}
if (updated_processors.empty())
{
auto * edge = updated_edges.top();
updated_edges.pop();
bool PipelineExecutor::prepareProcessor(UInt64 pid, ExecutionThreadContext & thread_context, Queue & queue, Queue & async_queue, std::unique_lock<std::mutex> node_lock)
{
/// In this method we have ownership on node.
auto & node = *graph->nodes[pid];
/// Here we have ownership on edge, but node can be concurrently accessed.
bool need_expand_pipeline = false;
auto & node = *graph->nodes[edge->to];
std::vector<ExecutingGraph::Edge *> updated_back_edges;
std::vector<ExecutingGraph::Edge *> updated_direct_edges;
std::unique_lock lock(node.status_mutex);
{
ExecutingGraph::ExecStatus status = node.status;
if (status != ExecutingGraph::ExecStatus::Finished)
{
if (edge->backward)
node.updated_output_ports.push_back(edge->output_port_number);
else
node.updated_input_ports.push_back(edge->input_port_number);
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
updated_processors.push(edge->to);
stack_top_lock = std::move(lock);
}
else
graph->nodes[edge->to]->processor->onUpdatePorts();
}
}
else
{
pid = updated_processors.top();
updated_processors.pop();
/// In this method we have ownership on node.
auto & node = *graph->nodes[pid];
bool need_expand_pipeline = false;
if (!stack_top_lock)
stack_top_lock.emplace(node.status_mutex);
{
#ifndef NDEBUG
Stopwatch watch;
Stopwatch watch;
#endif
std::unique_lock<std::mutex> lock(std::move(node_lock));
std::unique_lock<std::mutex> lock(std::move(*stack_top_lock));
try
{
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
}
catch (...)
{
node.exception = std::current_exception();
return false;
}
try
{
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
}
catch (...)
{
node.exception = std::current_exception();
return false;
}
#ifndef NDEBUG
node.preparation_time_ns += watch.elapsed();
node.preparation_time_ns += watch.elapsed();
#endif
node.updated_input_ports.clear();
node.updated_output_ports.clear();
node.updated_input_ports.clear();
node.updated_output_ports.clear();
switch (node.last_processor_status)
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
node.status = ExecutingGraph::ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
node.status = ExecutingGraph::ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
case IProcessor::Status::Async:
{
node.status = ExecutingGraph::ExecStatus::Executing;
async_queue.push(&node);
break;
}
case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;
break;
}
}
switch (node.last_processor_status)
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
node.status = ExecutingGraph::ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
node.status = ExecutingGraph::ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
case IProcessor::Status::Async:
{
node.status = ExecutingGraph::ExecStatus::Executing;
async_queue.push(&node);
break;
}
case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;
break;
}
}
{
for (auto & edge_id : node.post_updated_input_ports)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(edge_id);
updated_back_edges.emplace_back(edge);
edge->update_info.trigger();
if (!need_expand_pipeline)
{
for (auto & edge_id : node.post_updated_input_ports)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(edge_id);
updated_edges.push(edge);
edge->update_info.trigger();
}
for (auto & edge_id : node.post_updated_output_ports)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(edge_id);
updated_edges.push(edge);
edge->update_info.trigger();
}
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
}
}
for (auto & edge_id : node.post_updated_output_ports)
if (need_expand_pipeline)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(edge_id);
updated_direct_edges.emplace_back(edge);
edge->update_info.trigger();
auto callback = [this, &updated_processors, pid = node.processors_id]()
{
return expandPipeline(updated_processors, pid);
};
if (!tasks.executeStoppingTask(thread_context, std::move(callback)))
return false;
/// Add itself back to be prepared again.
updated_processors.push(pid);
}
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
}
}
{
for (auto & edge : updated_direct_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_context))
return false;
}
for (auto & edge : updated_back_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_context))
return false;
}
}
if (need_expand_pipeline)
{
Stack stack;
auto callback = [this, &stack, pid = node.processors_id]() { return expandPipeline(stack, pid); };
if (!tasks.executeStoppingTask(thread_context, std::move(callback)))
return false;
/// Add itself back to be prepared again.
stack.push(pid);
while (!stack.empty())
{
auto item = stack.top();
auto lock = std::unique_lock<std::mutex>(graph->nodes[item]->status_mutex);
if (!prepareProcessor(item, thread_context, queue, async_queue, std::move(lock)))
return false;
stack.pop();
}
}
@ -426,11 +420,8 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
tasks.enterConcurrentReadSection();
/// Prepare processor after execution.
{
auto lock = context.lockStatus();
if (!prepareProcessor(context.getProcessorID(), context, queue, async_queue, std::move(lock)))
finish();
}
if (!prepareProcessor(context.getProcessorID(), context, queue, async_queue))
finish();
tasks.exitConcurrentReadSection();
@ -473,7 +464,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
UInt64 proc = stack.top();
stack.pop();
prepareProcessor(proc, context, queue, async_queue, std::unique_lock<std::mutex>(graph->nodes[proc]->status_mutex));
prepareProcessor(proc, context, queue, async_queue);
if (!async_queue.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Async is only possible after work() call. Processor {}",

View File

@ -70,12 +70,11 @@ private:
/// Pipeline execution related methods.
void addChildlessProcessorsToStack(Stack & stack);
bool tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, ExecutionThreadContext & thread_context);
/// Prepare processor with pid number.
/// Check parents and children of current processor and push them to stacks if they also need to be prepared.
/// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used.
bool prepareProcessor(UInt64 pid, ExecutionThreadContext & thread_context, Queue & queue, Queue & async_queue, std::unique_lock<std::mutex> node_lock);
bool prepareProcessor(UInt64 pid, ExecutionThreadContext & thread_context, Queue & queue, Queue & async_queue);
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.

View File

@ -1,6 +1,7 @@
#pragma once
#include <vector>
#include <queue>
#include <Common/Exception.h>
namespace DB
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/Exception.h>
#include <base/defines.h>
namespace DB
{
namespace ErrorCodes