Add info about affected ports to IProcessor::prepare

This commit is contained in:
Nikolai Kochetov 2019-11-27 19:24:44 +03:00
parent ad2af03f98
commit 4f2f474fde
5 changed files with 333 additions and 133 deletions

View File

@ -7,7 +7,6 @@
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <boost/lockfree/queue.hpp>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
@ -52,26 +51,24 @@ bool PipelineExecutor::addEdges(UInt64 node)
const IProcessor * cur = graph[node].processor; const IProcessor * cur = graph[node].processor;
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges) auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges,
bool is_backward, UInt64 input_port_number, UInt64 output_port_number)
{ {
auto it = processors_map.find(to_proc); auto it = processors_map.find(to_proc);
if (it == processors_map.end()) if (it == processors_map.end())
throwUnknownProcessor(to_proc, cur, true); throwUnknownProcessor(to_proc, cur, true);
UInt64 proc_num = it->second; UInt64 proc_num = it->second;
Edge * edge_ptr = nullptr;
for (auto & edge : edges) for (auto & edge : edges)
if (edge.to == proc_num)
edge_ptr = &edge;
if (!edge_ptr)
{ {
edge_ptr = &edges.emplace_back(); if (edge.to == proc_num)
edge_ptr->to = proc_num; throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR);
} }
from_port.setVersion(&edge_ptr->version); auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number);
from_port.setVersion(&edge.version);
}; };
bool was_edge_added = false; bool was_edge_added = false;
@ -83,10 +80,11 @@ bool PipelineExecutor::addEdges(UInt64 node)
{ {
was_edge_added = true; was_edge_added = true;
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it) for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input)
{ {
const IProcessor * proc = &it->getOutputPort().getProcessor(); const IProcessor * proc = &it->getOutputPort().getProcessor();
add_edge(*it, proc, graph[node].backEdges); auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort());
add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number);
} }
} }
@ -97,10 +95,11 @@ bool PipelineExecutor::addEdges(UInt64 node)
{ {
was_edge_added = true; was_edge_added = true;
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it) for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output)
{ {
const IProcessor * proc = &it->getInputPort().getProcessor(); const IProcessor * proc = &it->getInputPort().getProcessor();
add_edge(*it, proc, graph[node].directEdges); auto input_port_number = proc->getInputPortNumber(&it->getInputPort());
add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output);
} }
} }
@ -131,6 +130,7 @@ void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
if (graph[proc].directEdges.empty()) if (graph[proc].directEdges.empty())
{ {
stack.push(proc); stack.push(proc);
/// do not lock mutex, as this function is executedin single thread
graph[proc].status = ExecStatus::Preparing; graph[proc].status = ExecStatus::Preparing;
} }
} }
@ -195,9 +195,20 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
UInt64 num_processors = processors.size(); UInt64 num_processors = processors.size();
for (UInt64 node = 0; node < num_processors; ++node) for (UInt64 node = 0; node < num_processors; ++node)
{ {
size_t num_direct_edges = graph[node].directEdges.size();
size_t num_back_edges = graph[node].backEdges.size();
if (addEdges(node)) if (addEdges(node))
{ {
if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New) std::lock_guard guard(graph[node].status_mutex);
for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges)
graph[node].updated_input_ports.emplace_back(num_back_edges);
for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges)
graph[node].updated_output_ports.emplace_back(num_direct_edges);
if (graph[node].status == ExecStatus::Idle)
{ {
graph[node].status = ExecStatus::Preparing; graph[node].status = ExecStatus::Preparing;
stack.push(node); stack.push(node);
@ -212,93 +223,65 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stac
auto & node = graph[edge.to]; auto & node = graph[edge.to];
ExecStatus status = node.status.load(); std::lock_guard guard(node.status_mutex);
/// Don't add processor if nothing was read from port. ExecStatus status = node.status;
if (status != ExecStatus::New && edge.version == edge.prev_version)
return false;
if (status == ExecStatus::Finished) if (status == ExecStatus::Finished)
return false; return false;
/// Signal that node need to be prepared. if (edge.backward)
node.need_to_be_prepared = true; node.updated_output_ports.push_back(edge.output_port_number);
edge.prev_version = edge.version; else
node.updated_input_ports.push_back(edge.input_port_number);
/// Try to get ownership for node.
/// Assume that current status is New or Idle. Otherwise, can't prepare node.
if (status != ExecStatus::New)
status = ExecStatus::Idle;
/// Statuses but New and Idle are not interesting because they own node.
/// Prepare will be called in owning thread before changing status.
while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing))
if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared)
return false;
if (status == ExecStatus::Idle)
{
node.status = ExecStatus::Preparing;
stack.push(edge.to); stack.push(edge.to);
return true; return true;
}
return false;
} }
bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async) bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async)
{ {
/// In this method we have ownership on node. /// In this method we have ownership on node.
auto & node = graph[pid]; auto & node = graph[pid];
bool need_traverse = false;
bool need_expand_pipeline = false;
std::vector<Edge *> updated_back_edges;
std::vector<Edge *> updated_direct_edges;
{ {
/// Stopwatch watch; /// Stopwatch watch;
/// Disable flag before prepare call. Otherwise, we can skip prepare request. std::lock_guard guard(node.status_mutex);
/// Prepare can be called more times than needed, but it's ok.
node.need_to_be_prepared = false;
auto status = node.processor->prepare(); auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
node.updated_input_ports.clear();
node.updated_output_ports.clear();
/// node.execution_state->preparation_time_ns += watch.elapsed(); /// node.execution_state->preparation_time_ns += watch.elapsed();
node.last_processor_status = status; node.last_processor_status = status;
}
auto add_neighbours_to_prepare_queue = [&] ()
{
for (auto & edge : node.backEdges)
tryAddProcessorToStackIfUpdated(edge, parents);
for (auto & edge : node.directEdges)
tryAddProcessorToStackIfUpdated(edge, children);
};
auto try_release_ownership = [&] ()
{
/// This function can be called after expand pipeline, where node from outer scope is not longer valid.
auto & node_ = graph[pid];
ExecStatus expected = ExecStatus::Idle;
node_.status = ExecStatus::Idle;
if (node_.need_to_be_prepared)
{
while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing))
if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared)
return;
children.push(pid);
}
};
switch (node.last_processor_status) switch (node.last_processor_status)
{ {
case IProcessor::Status::NeedData: case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull: case IProcessor::Status::PortFull:
{ {
add_neighbours_to_prepare_queue(); need_traverse = true;
try_release_ownership(); node.status = ExecStatus::Idle;
break; break;
} }
case IProcessor::Status::Finished: case IProcessor::Status::Finished:
{ {
add_neighbours_to_prepare_queue(); need_traverse = true;
node.status = ExecStatus::Finished; node.status = ExecStatus::Finished;
break; break;
} }
@ -322,6 +305,44 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
break; break;
} }
case IProcessor::Status::ExpandPipeline: case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;
break;
}
}
if (need_traverse)
{
for (auto & edge : node.backEdges)
{
if (edge.version != edge.prev_version)
{
updated_back_edges.emplace_back(&edge);
edge.prev_version = edge.version;
}
}
for (auto & edge : node.directEdges)
{
if (edge.version != edge.prev_version)
{
updated_direct_edges.emplace_back(&edge);
edge.prev_version = edge.version;
}
}
}
}
if (need_traverse)
{
for (auto & edge : updated_back_edges)
tryAddProcessorToStackIfUpdated(*edge, parents);
for (auto & edge : updated_direct_edges)
tryAddProcessorToStackIfUpdated(*edge, children);
}
if (need_expand_pipeline)
{ {
executor_contexts[thread_number]->task_list.emplace_back( executor_contexts[thread_number]->task_list.emplace_back(
node.execution_state.get(), node.execution_state.get(),
@ -339,11 +360,8 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
doExpandPipeline(desired, true); doExpandPipeline(desired, true);
/// node is not longer valid after pipeline was expanded /// Add itself back to be prepared again.
graph[pid].need_to_be_prepared = true; children.push(pid);
try_release_ownership();
break;
}
} }
return false; return false;
@ -427,7 +445,7 @@ void PipelineExecutor::execute(size_t num_threads)
bool all_processors_finished = true; bool all_processors_finished = true;
for (auto & node : graph) for (auto & node : graph)
if (node.status != ExecStatus::Finished) if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex
all_processors_finished = false; all_processors_finished = false;
if (!all_processors_finished) if (!all_processors_finished)

View File

@ -43,12 +43,18 @@ private:
struct Edge struct Edge
{ {
Edge(UInt64 to_, bool backward_, UInt64 input_port_number_, UInt64 output_port_number_)
: to(to_), backward(backward_), input_port_number(input_port_number_), output_port_number(output_port_number_) {}
UInt64 to = std::numeric_limits<UInt64>::max(); UInt64 to = std::numeric_limits<UInt64>::max();
bool backward;
UInt64 input_port_number;
UInt64 output_port_number;
/// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details. /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
/// To compare version with prev_version we can decide if neighbour processor need to be prepared. /// To compare version with prev_version we can decide if neighbour processor need to be prepared.
UInt64 version = 0; UInt64 version = 1;
UInt64 prev_version = 0; UInt64 prev_version = 0; /// prev version is zero so ve traverse all edges after the first prepare.
}; };
/// Use std::list because new ports can be added to processor during execution. /// Use std::list because new ports can be added to processor during execution.
@ -58,7 +64,6 @@ private:
/// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can. /// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can.
enum class ExecStatus enum class ExecStatus
{ {
New, /// prepare wasn't called yet. Initial state. Non-owning.
Idle, /// prepare returned NeedData or PortFull. Non-owning. Idle, /// prepare returned NeedData or PortFull. Non-owning.
Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning. Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning.
Executing, /// prepare returned Ready and task is executing. Owning. Executing, /// prepare returned Ready and task is executing. Owning.
@ -87,17 +92,19 @@ private:
Edges directEdges; Edges directEdges;
Edges backEdges; Edges backEdges;
std::atomic<ExecStatus> status; ExecStatus status;
/// This flag can be set by any executor. std::mutex status_mutex;
/// When enabled, any executor can try to atomically set Preparing state to status.
std::atomic_bool need_to_be_prepared;
/// Last state for profiling. /// Last state for profiling.
IProcessor::Status last_processor_status = IProcessor::Status::NeedData; IProcessor::Status last_processor_status = IProcessor::Status::NeedData;
std::unique_ptr<ExecutionState> execution_state; std::unique_ptr<ExecutionState> execution_state;
IProcessor::PortNumbers updated_input_ports;
IProcessor::PortNumbers updated_output_ports;
Node(IProcessor * processor_, UInt64 processor_id) Node(IProcessor * processor_, UInt64 processor_id)
: processor(processor_), status(ExecStatus::New), need_to_be_prepared(false) : processor(processor_), status(ExecStatus::Idle)
{ {
execution_state = std::make_unique<ExecutionState>(); execution_state = std::make_unique<ExecutionState>();
execution_state->processor = processor; execution_state->processor = processor;
@ -105,8 +112,8 @@ private:
} }
Node(Node && other) noexcept Node(Node && other) noexcept
: processor(other.processor), status(other.status.load()) : processor(other.processor), status(other.status)
, need_to_be_prepared(other.need_to_be_prepared.load()), execution_state(std::move(other.execution_state)) , execution_state(std::move(other.execution_state))
{ {
} }
}; };

View File

@ -171,7 +171,15 @@ public:
* - method 'prepare' cannot be executed in parallel even for different objects, * - method 'prepare' cannot be executed in parallel even for different objects,
* if they are connected (including indirectly) to each other by their ports; * if they are connected (including indirectly) to each other by their ports;
*/ */
virtual Status prepare() = 0; virtual Status prepare()
{
throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
}
using PortNumbers = std::vector<UInt64>;
/// Optimization for prepare in case we know ports were updated.
virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); }
/** You may call this method if 'prepare' returned Ready. /** You may call this method if 'prepare' returned Ready.
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
@ -183,11 +191,6 @@ public:
throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
} }
virtual void work(size_t /*thread_num*/)
{
work();
}
/** You may call this method if 'prepare' returned Async. /** You may call this method if 'prepare' returned Async.
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
* *
@ -226,6 +229,34 @@ public:
auto & getInputs() { return inputs; } auto & getInputs() { return inputs; }
auto & getOutputs() { return outputs; } auto & getOutputs() { return outputs; }
UInt64 getInputPortNumber(const InputPort * input_port) const
{
UInt64 number = 0;
for (auto & port : inputs)
{
if (&port == input_port)
return number;
++number;
}
throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
}
UInt64 getOutputPortNumber(const OutputPort * output_port) const
{
UInt64 number = 0;
for (auto & port : outputs)
{
if (&port == output_port)
return number;
++number;
}
throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
}
const auto & getInputs() const { return inputs; } const auto & getInputs() const { return inputs; }
const auto & getOutputs() const { return outputs; } const auto & getOutputs() const { return outputs; }

View File

@ -153,5 +153,112 @@ ResizeProcessor::Status ResizeProcessor::prepare()
return get_status_if_no_inputs(); return get_status_if_no_inputs();
} }
IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
if (!initialized)
{
initialized = true;
for (auto & input : inputs)
{
input.setNeeded();
input_ports.push_back({.port = &input, .status = InputStatus::NotActive});
}
for (auto & output : outputs)
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
}
for (auto & output_number : updated_outputs)
{
auto & output = output_ports[output_number];
if (output.port->isFinished())
{
if (output.status != OutputStatus::Finished)
{
++num_finished_outputs;
output.status = OutputStatus::Finished;
}
continue;
}
if (output.port->canPush())
{
if (output.status != OutputStatus::NeedData)
{
output.status = OutputStatus::NeedData;
waiting_outputs.push(output_number);
}
}
}
if (num_finished_outputs == outputs.size())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
for (auto & input_number : updated_inputs)
{
auto & input = input_ports[input_number];
if (input.port->isFinished())
{
if (input.status != InputStatus::Finished)
{
input.status = InputStatus::Finished;
++num_finished_inputs;
}
continue;
}
if (input.port->hasData())
{
if (input.status != InputStatus::HasData)
{
input.status = InputStatus::HasData;
inputs_with_data.push(input_number);
}
}
}
while (!waiting_outputs.empty() && !inputs_with_data.empty())
{
auto & waiting_output = output_ports[waiting_outputs.front()];
waiting_outputs.pop();
auto & input_with_data = input_ports[inputs_with_data.front()];
inputs_with_data.pop();
waiting_output.port->pushData(input_with_data.port->pullData());
input_with_data.status = InputStatus::NotActive;
waiting_output.status = OutputStatus::NotActive;
if (input_with_data.port->isFinished())
{
if (input_with_data.status != InputStatus::Finished)
{
input_with_data.status = InputStatus::Finished;
++num_finished_inputs;
}
}
}
if (num_finished_inputs == inputs.size())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
if (!waiting_outputs.empty())
return Status::NeedData;
return Status::PortFull;
}
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <queue>
namespace DB namespace DB
@ -31,10 +32,46 @@ public:
String getName() const override { return "Resize"; } String getName() const override { return "Resize"; }
Status prepare() override; Status prepare() override;
Status prepare(const PortNumbers &, const PortNumbers &) override;
private: private:
InputPorts::iterator current_input; InputPorts::iterator current_input;
OutputPorts::iterator current_output; OutputPorts::iterator current_output;
size_t num_finished_inputs = 0;
size_t num_finished_outputs = 0;
std::queue<UInt64> waiting_outputs;
std::queue<UInt64> inputs_with_data;
bool initialized = false;
enum class OutputStatus
{
NotActive,
NeedData,
Finished,
};
enum class InputStatus
{
NotActive,
HasData,
Finished,
};
struct InputPortWithStatus
{
InputPort * port;
InputStatus status;
};
struct OutputPortWithStatus
{
OutputPort * port;
OutputStatus status;
};
std::vector<InputPortWithStatus> input_ports;
std::vector<OutputPortWithStatus> output_ports;
}; };
} }