2019-02-06 16:31:18 +00:00
|
|
|
#include <Processors/Executors/PipelineExecutor.h>
|
|
|
|
#include <unordered_map>
|
|
|
|
#include <queue>
|
2019-02-08 16:10:57 +00:00
|
|
|
#include <IO/WriteBufferFromString.h>
|
|
|
|
#include <Processors/printPipeline.h>
|
|
|
|
#include <Common/EventCounter.h>
|
2019-04-05 10:52:07 +00:00
|
|
|
#include <ext/scope_guard.h>
|
2019-02-06 16:31:18 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
PipelineExecutor::PipelineExecutor(Processors processors, ThreadPool * pool)
|
2019-04-05 10:52:07 +00:00
|
|
|
: processors(std::move(processors)), pool(pool), cancelled(false)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
buildGraph();
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
void PipelineExecutor::addEdges(const ProcessorsMap & processors_map, UInt64 node, UInt64 from_input, UInt64 from_output)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
|
|
|
auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port)
|
|
|
|
{
|
|
|
|
String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output")
|
2019-02-27 12:51:27 +00:00
|
|
|
+ " for processor " + parent->getName() + ", but not found in list of processors.";
|
2019-02-06 16:31:18 +00:00
|
|
|
|
|
|
|
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
};
|
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
const IProcessor * cur = graph[node].processor;
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-27 12:51:27 +00:00
|
|
|
auto it = processors_map.find(to_proc);
|
|
|
|
if (it == processors_map.end())
|
|
|
|
throwUnknownProcessor(to_proc, cur, true);
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
UInt64 proc_num = it->second;
|
|
|
|
Edge * edge_ptr = nullptr;
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
for (auto & edge : edges)
|
|
|
|
if (edge.to == proc_num)
|
|
|
|
edge_ptr = &edge;
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
if (!edge_ptr)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-27 12:51:27 +00:00
|
|
|
edge_ptr = &edges.emplace_back();
|
|
|
|
edge_ptr->to = proc_num;
|
|
|
|
}
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
from_port.setVersion(&edge_ptr->version);
|
|
|
|
};
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
auto & inputs = processors[node]->getInputs();
|
|
|
|
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it)
|
|
|
|
{
|
|
|
|
const IProcessor * proc = &it->getOutputPort().getProcessor();
|
|
|
|
add_edge(*it, proc, graph[node].backEdges);
|
|
|
|
}
|
2019-02-08 16:10:57 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
auto & outputs = processors[node]->getOutputs();
|
|
|
|
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it)
|
|
|
|
{
|
|
|
|
const IProcessor * proc = &it->getInputPort().getProcessor();
|
|
|
|
add_edge(*it, proc, graph[node].directEdges);
|
|
|
|
}
|
|
|
|
}
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
void PipelineExecutor::buildGraph()
|
|
|
|
{
|
|
|
|
ProcessorsMap processors_map;
|
|
|
|
UInt64 num_processors = processors.size();
|
2019-02-08 16:10:57 +00:00
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
graph.resize(num_processors);
|
|
|
|
for (UInt64 node = 0; node < num_processors; ++node)
|
|
|
|
{
|
|
|
|
IProcessor * proc = processors[node].get();
|
|
|
|
processors_map[proc] = node;
|
|
|
|
graph[node].processor = proc;
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
2019-02-27 12:51:27 +00:00
|
|
|
|
|
|
|
for (UInt64 node = 0; node < num_processors; ++node)
|
|
|
|
addEdges(processors_map, node, 0, 0);
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void PipelineExecutor::addChildlessProcessorsToQueue()
|
|
|
|
{
|
|
|
|
UInt64 num_processors = processors.size();
|
|
|
|
for (UInt64 proc = 0; proc < num_processors; ++proc)
|
|
|
|
{
|
|
|
|
if (graph[proc].directEdges.empty())
|
|
|
|
{
|
|
|
|
prepare_queue.push(proc);
|
|
|
|
graph[proc].status = ExecStatus::Preparing;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void PipelineExecutor::processFinishedExecutionQueue()
|
|
|
|
{
|
|
|
|
while (!finished_execution_queue.empty())
|
|
|
|
{
|
|
|
|
UInt64 proc = finished_execution_queue.front();
|
|
|
|
finished_execution_queue.pop();
|
|
|
|
|
|
|
|
graph[proc].status = ExecStatus::Preparing;
|
|
|
|
prepare_queue.push(proc);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void PipelineExecutor::processFinishedExecutionQueueSafe()
|
|
|
|
{
|
|
|
|
if (pool)
|
|
|
|
{
|
|
|
|
exception_handler.throwIfException();
|
|
|
|
std::lock_guard lock(finished_execution_mutex);
|
|
|
|
processFinishedExecutionQueue();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
processFinishedExecutionQueue();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge)
|
|
|
|
{
|
2019-03-01 14:41:12 +00:00
|
|
|
auto & node = graph[edge.to];
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
/// Don't add processor if nothing was read from port.
|
2019-03-01 14:41:12 +00:00
|
|
|
if (node.status != ExecStatus::New && edge.version == edge.prev_version)
|
2019-02-08 16:10:57 +00:00
|
|
|
return false;
|
|
|
|
|
|
|
|
edge.prev_version = edge.version;
|
|
|
|
|
2019-03-01 14:41:12 +00:00
|
|
|
if (node.status == ExecStatus::Idle || node.status == ExecStatus::New)
|
2019-02-08 16:10:57 +00:00
|
|
|
{
|
|
|
|
prepare_queue.push(edge.to);
|
|
|
|
node.status = ExecStatus::Preparing;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2019-04-12 11:18:18 +00:00
|
|
|
static void executeJob(IProcessor * processor)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
processor->work();
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
e.addMessage("While executing " + processor->getName() + " ("
|
|
|
|
+ toString(reinterpret_cast<std::uintptr_t>(processor)) + ") ");
|
2019-04-12 11:27:25 +00:00
|
|
|
throw;
|
2019-04-12 11:18:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void PipelineExecutor::addJob(UInt64 pid)
|
|
|
|
{
|
|
|
|
if (pool)
|
|
|
|
{
|
|
|
|
auto job = [this, pid]()
|
|
|
|
{
|
2019-04-08 11:06:11 +00:00
|
|
|
SCOPE_EXIT(
|
|
|
|
{
|
|
|
|
std::lock_guard lock(finished_execution_mutex);
|
|
|
|
finished_execution_queue.push(pid);
|
|
|
|
}
|
|
|
|
event_counter.notify()
|
|
|
|
);
|
2019-04-05 10:52:07 +00:00
|
|
|
|
2019-04-12 11:18:18 +00:00
|
|
|
executeJob(graph[pid].processor);
|
2019-02-08 16:10:57 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
pool->schedule(createExceptionHandledJob(std::move(job), exception_handler));
|
|
|
|
++num_tasks_to_wait;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Execute task in main thread.
|
2019-04-12 11:18:18 +00:00
|
|
|
executeJob(graph[pid].processor);
|
2019-02-08 16:10:57 +00:00
|
|
|
finished_execution_queue.push(pid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void PipelineExecutor::addAsyncJob(UInt64 pid)
|
|
|
|
{
|
|
|
|
graph[pid].processor->schedule(event_counter);
|
|
|
|
graph[pid].status = ExecStatus::Async;
|
|
|
|
++num_tasks_to_wait;
|
|
|
|
}
|
|
|
|
|
2019-02-27 12:51:27 +00:00
|
|
|
void PipelineExecutor::expendPipeline(UInt64 pid)
|
|
|
|
{
|
|
|
|
auto & cur_node = graph[pid];
|
|
|
|
UInt64 from_input = cur_node.processor->getInputs().size();
|
|
|
|
UInt64 from_output = cur_node.processor->getOutputs().size();
|
|
|
|
UInt64 from_processor = processors.size();
|
|
|
|
auto new_processors = cur_node.processor->expandPipeline();
|
|
|
|
|
|
|
|
ProcessorsMap processors_map;
|
|
|
|
processors_map[cur_node.processor] = pid;
|
|
|
|
for (const auto & processor : new_processors)
|
|
|
|
{
|
|
|
|
processors_map[processor.get()] = graph.size();
|
|
|
|
graph.emplace_back();
|
|
|
|
graph.back().processor = processor.get();
|
|
|
|
}
|
|
|
|
|
|
|
|
processors.insert(processors.end(), new_processors.begin(), new_processors.end());
|
|
|
|
UInt64 num_processors = processors.size();
|
|
|
|
|
|
|
|
for (UInt64 node = from_processor; node < num_processors; ++node)
|
|
|
|
{
|
|
|
|
addEdges(processors_map, node, 0, 0);
|
|
|
|
prepare_queue.push(node);
|
|
|
|
graph[node].status = ExecStatus::Preparing;
|
|
|
|
}
|
|
|
|
|
|
|
|
addEdges(processors_map, pid, from_input, from_output);
|
|
|
|
}
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void PipelineExecutor::prepareProcessor(UInt64 pid, bool async)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
|
|
|
auto & node = graph[pid];
|
|
|
|
auto status = node.processor->prepare();
|
2019-02-08 16:10:57 +00:00
|
|
|
node.last_processor_status = status;
|
|
|
|
|
|
|
|
auto add_neighbours_to_prepare_queue = [&, this]
|
|
|
|
{
|
|
|
|
for (auto & edge : node.directEdges)
|
|
|
|
addProcessorToPrepareQueueIfUpdated(edge);
|
|
|
|
|
|
|
|
for (auto & edge : node.backEdges)
|
|
|
|
addProcessorToPrepareQueueIfUpdated(edge);
|
|
|
|
};
|
2019-02-06 16:31:18 +00:00
|
|
|
|
|
|
|
switch (status)
|
|
|
|
{
|
|
|
|
case IProcessor::Status::NeedData:
|
2019-02-08 16:10:57 +00:00
|
|
|
{
|
|
|
|
add_neighbours_to_prepare_queue();
|
|
|
|
node.status = ExecStatus::Idle;
|
|
|
|
break;
|
|
|
|
}
|
2019-02-06 16:31:18 +00:00
|
|
|
case IProcessor::Status::PortFull:
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
add_neighbours_to_prepare_queue();
|
2019-02-06 16:31:18 +00:00
|
|
|
node.status = ExecStatus::Idle;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::Finished:
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
add_neighbours_to_prepare_queue();
|
2019-02-06 16:31:18 +00:00
|
|
|
node.status = ExecStatus::Finished;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::Ready:
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
node.status = ExecStatus::Executing;
|
|
|
|
addJob(pid);
|
2019-02-06 16:31:18 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::Async:
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
node.status = ExecStatus::Executing;
|
|
|
|
addAsyncJob(pid);
|
|
|
|
break;
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
|
|
|
case IProcessor::Status::Wait:
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
if (!async)
|
|
|
|
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
|
2019-02-27 11:24:14 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::ExpandPipeline:
|
|
|
|
{
|
2019-02-27 12:51:27 +00:00
|
|
|
expendPipeline(pid);
|
|
|
|
/// Add node to queue again.
|
|
|
|
prepare_queue.push(pid);
|
2019-03-15 17:06:32 +00:00
|
|
|
|
|
|
|
/// node ref is not valid now.
|
|
|
|
graph[pid].status = ExecStatus::Preparing;
|
2019-02-27 12:51:27 +00:00
|
|
|
break;
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void PipelineExecutor::processPrepareQueue()
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
while (!prepare_queue.empty())
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
UInt64 proc = prepare_queue.front();
|
|
|
|
prepare_queue.pop();
|
|
|
|
|
|
|
|
prepareProcessor(proc, false);
|
2019-02-06 16:31:18 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void PipelineExecutor::processAsyncQueue()
|
|
|
|
{
|
|
|
|
UInt64 num_processors = processors.size();
|
|
|
|
for (UInt64 node = 0; node < num_processors; ++node)
|
|
|
|
if (graph[node].status == ExecStatus::Async)
|
|
|
|
prepareProcessor(node, true);
|
|
|
|
}
|
|
|
|
|
2019-02-06 16:31:18 +00:00
|
|
|
void PipelineExecutor::execute()
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
addChildlessProcessorsToQueue();
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-04-12 09:57:13 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
executeImpl();
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
e.addMessage("\nCurrent state:\n" + dumpPipeline());
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool all_processors_finished = true;
|
|
|
|
for (auto & node : graph)
|
|
|
|
if (node.status != ExecStatus::Finished)
|
|
|
|
all_processors_finished = false;
|
|
|
|
|
|
|
|
if (!all_processors_finished)
|
|
|
|
/// It seems that pipeline has stuck.
|
|
|
|
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
void PipelineExecutor::executeImpl()
|
|
|
|
{
|
2019-04-05 10:52:07 +00:00
|
|
|
while (!cancelled)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
processFinishedExecutionQueueSafe();
|
|
|
|
processPrepareQueue();
|
|
|
|
processAsyncQueue();
|
|
|
|
|
|
|
|
if (prepare_queue.empty())
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
/// For single-thread executor.
|
|
|
|
if (!pool && !finished_execution_queue.empty())
|
|
|
|
continue;
|
|
|
|
|
|
|
|
if (num_tasks_to_wait > num_waited_tasks)
|
|
|
|
{
|
|
|
|
/// Try wait anything.
|
|
|
|
event_counter.wait();
|
|
|
|
++num_waited_tasks;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Here prepare_queue is empty and we have nobody to wait for. Exiting.
|
|
|
|
break;
|
|
|
|
}
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
|
|
|
}
|
2019-04-12 09:57:13 +00:00
|
|
|
}
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-04-12 09:57:13 +00:00
|
|
|
String PipelineExecutor::dumpPipeline() const
|
|
|
|
{
|
|
|
|
std::vector<IProcessor::Status> statuses;
|
|
|
|
std::vector<IProcessor *> proc_list;
|
|
|
|
statuses.reserve(graph.size());
|
|
|
|
proc_list.reserve(graph.size());
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-04-12 09:57:13 +00:00
|
|
|
for (auto & proc : graph)
|
2019-02-06 16:31:18 +00:00
|
|
|
{
|
2019-04-12 09:57:13 +00:00
|
|
|
proc_list.emplace_back(proc.processor);
|
|
|
|
statuses.emplace_back(proc.last_processor_status);
|
|
|
|
}
|
2019-02-06 16:31:18 +00:00
|
|
|
|
2019-04-12 09:57:13 +00:00
|
|
|
WriteBufferFromOwnString out;
|
|
|
|
printPipeline(processors, statuses, out);
|
|
|
|
out.finish();
|
2019-02-08 16:10:57 +00:00
|
|
|
|
2019-04-12 09:57:13 +00:00
|
|
|
return out.str();
|
2019-02-06 16:31:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|