diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index 8892418d0dc..3e2051df890 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -7,7 +7,6 @@ #include #include -#include #include #include #include @@ -52,26 +51,24 @@ bool PipelineExecutor::addEdges(UInt64 node) const IProcessor * cur = graph[node].processor; - auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges) + auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges, + bool is_backward, UInt64 input_port_number, UInt64 output_port_number) { auto it = processors_map.find(to_proc); if (it == processors_map.end()) throwUnknownProcessor(to_proc, cur, true); UInt64 proc_num = it->second; - Edge * edge_ptr = nullptr; for (auto & edge : edges) - if (edge.to == proc_num) - edge_ptr = &edge; - - if (!edge_ptr) { - edge_ptr = &edges.emplace_back(); - edge_ptr->to = proc_num; + if (edge.to == proc_num) + throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR); } - from_port.setVersion(&edge_ptr->version); + auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number); + + from_port.setVersion(&edge.version); }; bool was_edge_added = false; @@ -83,10 +80,11 @@ bool PipelineExecutor::addEdges(UInt64 node) { was_edge_added = true; - for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it) + for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input) { const IProcessor * proc = &it->getOutputPort().getProcessor(); - add_edge(*it, proc, graph[node].backEdges); + auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort()); + add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number); } } @@ -97,10 +95,11 @@ bool PipelineExecutor::addEdges(UInt64 node) { was_edge_added = true; - for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it) + for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output) { const IProcessor * proc = &it->getInputPort().getProcessor(); - add_edge(*it, proc, graph[node].directEdges); + auto input_port_number = proc->getInputPortNumber(&it->getInputPort()); + add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output); } } @@ -131,6 +130,7 @@ void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) if (graph[proc].directEdges.empty()) { stack.push(proc); + /// do not lock mutex, as this function is executedin single thread graph[proc].status = ExecStatus::Preparing; } } @@ -195,9 +195,20 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid) UInt64 num_processors = processors.size(); for (UInt64 node = 0; node < num_processors; ++node) { + size_t num_direct_edges = graph[node].directEdges.size(); + size_t num_back_edges = graph[node].backEdges.size(); + if (addEdges(node)) { - if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New) + std::lock_guard guard(graph[node].status_mutex); + + for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges) + graph[node].updated_input_ports.emplace_back(num_back_edges); + + for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges) + graph[node].updated_output_ports.emplace_back(num_direct_edges); + + if (graph[node].status == ExecStatus::Idle) { graph[node].status = ExecStatus::Preparing; stack.push(node); @@ -212,140 +223,147 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stac auto & node = graph[edge.to]; - ExecStatus status = node.status.load(); + std::lock_guard guard(node.status_mutex); - /// Don't add processor if nothing was read from port. - if (status != ExecStatus::New && edge.version == edge.prev_version) - return false; + ExecStatus status = node.status; if (status == ExecStatus::Finished) return false; - /// Signal that node need to be prepared. - node.need_to_be_prepared = true; - edge.prev_version = edge.version; + if (edge.backward) + node.updated_output_ports.push_back(edge.output_port_number); + else + node.updated_input_ports.push_back(edge.input_port_number); - /// Try to get ownership for node. - - /// Assume that current status is New or Idle. Otherwise, can't prepare node. - if (status != ExecStatus::New) - status = ExecStatus::Idle; - - /// Statuses but New and Idle are not interesting because they own node. - /// Prepare will be called in owning thread before changing status. - while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing)) - if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared) - return false; - - stack.push(edge.to); - return true; + if (status == ExecStatus::Idle) + { + node.status = ExecStatus::Preparing; + stack.push(edge.to); + return true; + } + return false; } bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async) { + + /// In this method we have ownership on node. auto & node = graph[pid]; + bool need_traverse = false; + bool need_expand_pipeline = false; + + std::vector updated_back_edges; + std::vector updated_direct_edges; + { /// Stopwatch watch; - /// Disable flag before prepare call. Otherwise, we can skip prepare request. - /// Prepare can be called more times than needed, but it's ok. - node.need_to_be_prepared = false; + std::lock_guard guard(node.status_mutex); - auto status = node.processor->prepare(); + auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports); + node.updated_input_ports.clear(); + node.updated_output_ports.clear(); /// node.execution_state->preparation_time_ns += watch.elapsed(); node.last_processor_status = status; - } - auto add_neighbours_to_prepare_queue = [&] () - { - for (auto & edge : node.backEdges) - tryAddProcessorToStackIfUpdated(edge, parents); - - for (auto & edge : node.directEdges) - tryAddProcessorToStackIfUpdated(edge, children); - }; - - auto try_release_ownership = [&] () - { - /// This function can be called after expand pipeline, where node from outer scope is not longer valid. - auto & node_ = graph[pid]; - ExecStatus expected = ExecStatus::Idle; - node_.status = ExecStatus::Idle; - - if (node_.need_to_be_prepared) + switch (node.last_processor_status) { - while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing)) - if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared) - return; - - children.push(pid); - } - }; - - switch (node.last_processor_status) - { - case IProcessor::Status::NeedData: - case IProcessor::Status::PortFull: - { - add_neighbours_to_prepare_queue(); - try_release_ownership(); - - break; - } - case IProcessor::Status::Finished: - { - add_neighbours_to_prepare_queue(); - node.status = ExecStatus::Finished; - break; - } - case IProcessor::Status::Ready: - { - node.status = ExecStatus::Executing; - return true; - } - case IProcessor::Status::Async: - { - throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR); + case IProcessor::Status::NeedData: + case IProcessor::Status::PortFull: + { + need_traverse = true; + node.status = ExecStatus::Idle; + break; + } + case IProcessor::Status::Finished: + { + need_traverse = true; + node.status = ExecStatus::Finished; + break; + } + case IProcessor::Status::Ready: + { + node.status = ExecStatus::Executing; + return true; + } + case IProcessor::Status::Async: + { + throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR); // node.status = ExecStatus::Executing; // addAsyncJob(pid); // break; - } - case IProcessor::Status::Wait: - { - if (!async) - throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR); - break; - } - case IProcessor::Status::ExpandPipeline: - { - executor_contexts[thread_number]->task_list.emplace_back( - node.execution_state.get(), - &parents - ); - - ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back(); - ExpandPipelineTask * expected = nullptr; - - while (!expand_pipeline_task.compare_exchange_strong(expected, desired)) + } + case IProcessor::Status::Wait: { - doExpandPipeline(expected, true); - expected = nullptr; + if (!async) + throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR); + break; + } + case IProcessor::Status::ExpandPipeline: + { + need_expand_pipeline = true; + break; + } + } + + if (need_traverse) + { + for (auto & edge : node.backEdges) + { + if (edge.version != edge.prev_version) + { + updated_back_edges.emplace_back(&edge); + edge.prev_version = edge.version; + } } - doExpandPipeline(desired, true); - - /// node is not longer valid after pipeline was expanded - graph[pid].need_to_be_prepared = true; - try_release_ownership(); - break; + for (auto & edge : node.directEdges) + { + if (edge.version != edge.prev_version) + { + updated_direct_edges.emplace_back(&edge); + edge.prev_version = edge.version; + } + } } } + if (need_traverse) + { + for (auto & edge : updated_back_edges) + tryAddProcessorToStackIfUpdated(*edge, parents); + + for (auto & edge : updated_direct_edges) + tryAddProcessorToStackIfUpdated(*edge, children); + } + + if (need_expand_pipeline) + { + executor_contexts[thread_number]->task_list.emplace_back( + node.execution_state.get(), + &parents + ); + + ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back(); + ExpandPipelineTask * expected = nullptr; + + while (!expand_pipeline_task.compare_exchange_strong(expected, desired)) + { + doExpandPipeline(expected, true); + expected = nullptr; + } + + doExpandPipeline(desired, true); + + /// Add itself back to be prepared again. + children.push(pid); + } + return false; } @@ -427,7 +445,7 @@ void PipelineExecutor::execute(size_t num_threads) bool all_processors_finished = true; for (auto & node : graph) - if (node.status != ExecStatus::Finished) + if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex all_processors_finished = false; if (!all_processors_finished) diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h index b5e3c7a0e1e..37a72c69991 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.h +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -43,12 +43,18 @@ private: struct Edge { + Edge(UInt64 to_, bool backward_, UInt64 input_port_number_, UInt64 output_port_number_) + : to(to_), backward(backward_), input_port_number(input_port_number_), output_port_number(output_port_number_) {} + UInt64 to = std::numeric_limits::max(); + bool backward; + UInt64 input_port_number; + UInt64 output_port_number; /// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details. /// To compare version with prev_version we can decide if neighbour processor need to be prepared. - UInt64 version = 0; - UInt64 prev_version = 0; + UInt64 version = 1; + UInt64 prev_version = 0; /// prev version is zero so ve traverse all edges after the first prepare. }; /// Use std::list because new ports can be added to processor during execution. @@ -58,7 +64,6 @@ private: /// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can. enum class ExecStatus { - New, /// prepare wasn't called yet. Initial state. Non-owning. Idle, /// prepare returned NeedData or PortFull. Non-owning. Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning. Executing, /// prepare returned Ready and task is executing. Owning. @@ -87,17 +92,19 @@ private: Edges directEdges; Edges backEdges; - std::atomic status; - /// This flag can be set by any executor. - /// When enabled, any executor can try to atomically set Preparing state to status. - std::atomic_bool need_to_be_prepared; + ExecStatus status; + std::mutex status_mutex; + /// Last state for profiling. IProcessor::Status last_processor_status = IProcessor::Status::NeedData; std::unique_ptr execution_state; + IProcessor::PortNumbers updated_input_ports; + IProcessor::PortNumbers updated_output_ports; + Node(IProcessor * processor_, UInt64 processor_id) - : processor(processor_), status(ExecStatus::New), need_to_be_prepared(false) + : processor(processor_), status(ExecStatus::Idle) { execution_state = std::make_unique(); execution_state->processor = processor; @@ -105,8 +112,8 @@ private: } Node(Node && other) noexcept - : processor(other.processor), status(other.status.load()) - , need_to_be_prepared(other.need_to_be_prepared.load()), execution_state(std::move(other.execution_state)) + : processor(other.processor), status(other.status) + , execution_state(std::move(other.execution_state)) { } }; diff --git a/dbms/src/Processors/IProcessor.h b/dbms/src/Processors/IProcessor.h index ed59f4e591d..852bde2d467 100644 --- a/dbms/src/Processors/IProcessor.h +++ b/dbms/src/Processors/IProcessor.h @@ -171,7 +171,15 @@ public: * - method 'prepare' cannot be executed in parallel even for different objects, * if they are connected (including indirectly) to each other by their ports; */ - virtual Status prepare() = 0; + virtual Status prepare() + { + throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); + } + + using PortNumbers = std::vector; + + /// Optimization for prepare in case we know ports were updated. + virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); } /** You may call this method if 'prepare' returned Ready. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. @@ -183,11 +191,6 @@ public: throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED); } - virtual void work(size_t /*thread_num*/) - { - work(); - } - /** You may call this method if 'prepare' returned Async. * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. * @@ -226,6 +229,34 @@ public: auto & getInputs() { return inputs; } auto & getOutputs() { return outputs; } + UInt64 getInputPortNumber(const InputPort * input_port) const + { + UInt64 number = 0; + for (auto & port : inputs) + { + if (&port == input_port) + return number; + + ++number; + } + + throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR); + } + + UInt64 getOutputPortNumber(const OutputPort * output_port) const + { + UInt64 number = 0; + for (auto & port : outputs) + { + if (&port == output_port) + return number; + + ++number; + } + + throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR); + } + const auto & getInputs() const { return inputs; } const auto & getOutputs() const { return outputs; } diff --git a/dbms/src/Processors/ResizeProcessor.cpp b/dbms/src/Processors/ResizeProcessor.cpp index b3cb3a1735d..2ba1dd56275 100644 --- a/dbms/src/Processors/ResizeProcessor.cpp +++ b/dbms/src/Processors/ResizeProcessor.cpp @@ -153,5 +153,112 @@ ResizeProcessor::Status ResizeProcessor::prepare() return get_status_if_no_inputs(); } +IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs) +{ + if (!initialized) + { + initialized = true; + + for (auto & input : inputs) + { + input.setNeeded(); + input_ports.push_back({.port = &input, .status = InputStatus::NotActive}); + } + + for (auto & output : outputs) + output_ports.push_back({.port = &output, .status = OutputStatus::NotActive}); + } + + for (auto & output_number : updated_outputs) + { + auto & output = output_ports[output_number]; + if (output.port->isFinished()) + { + if (output.status != OutputStatus::Finished) + { + ++num_finished_outputs; + output.status = OutputStatus::Finished; + } + + continue; + } + + if (output.port->canPush()) + { + if (output.status != OutputStatus::NeedData) + { + output.status = OutputStatus::NeedData; + waiting_outputs.push(output_number); + } + } + } + + if (num_finished_outputs == outputs.size()) + { + for (auto & input : inputs) + input.close(); + + return Status::Finished; + } + + for (auto & input_number : updated_inputs) + { + auto & input = input_ports[input_number]; + if (input.port->isFinished()) + { + if (input.status != InputStatus::Finished) + { + input.status = InputStatus::Finished; + ++num_finished_inputs; + } + continue; + } + + if (input.port->hasData()) + { + if (input.status != InputStatus::HasData) + { + input.status = InputStatus::HasData; + inputs_with_data.push(input_number); + } + } + } + + while (!waiting_outputs.empty() && !inputs_with_data.empty()) + { + auto & waiting_output = output_ports[waiting_outputs.front()]; + waiting_outputs.pop(); + + auto & input_with_data = input_ports[inputs_with_data.front()]; + inputs_with_data.pop(); + + waiting_output.port->pushData(input_with_data.port->pullData()); + input_with_data.status = InputStatus::NotActive; + waiting_output.status = OutputStatus::NotActive; + + if (input_with_data.port->isFinished()) + { + if (input_with_data.status != InputStatus::Finished) + { + input_with_data.status = InputStatus::Finished; + ++num_finished_inputs; + } + } + } + + if (num_finished_inputs == inputs.size()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + if (!waiting_outputs.empty()) + return Status::NeedData; + + return Status::PortFull; +} + } diff --git a/dbms/src/Processors/ResizeProcessor.h b/dbms/src/Processors/ResizeProcessor.h index 67574c384a1..3a9c906ecbd 100644 --- a/dbms/src/Processors/ResizeProcessor.h +++ b/dbms/src/Processors/ResizeProcessor.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -31,10 +32,46 @@ public: String getName() const override { return "Resize"; } Status prepare() override; + Status prepare(const PortNumbers &, const PortNumbers &) override; private: InputPorts::iterator current_input; OutputPorts::iterator current_output; + + size_t num_finished_inputs = 0; + size_t num_finished_outputs = 0; + std::queue waiting_outputs; + std::queue inputs_with_data; + bool initialized = false; + + enum class OutputStatus + { + NotActive, + NeedData, + Finished, + }; + + enum class InputStatus + { + NotActive, + HasData, + Finished, + }; + + struct InputPortWithStatus + { + InputPort * port; + InputStatus status; + }; + + struct OutputPortWithStatus + { + OutputPort * port; + OutputStatus status; + }; + + std::vector input_ports; + std::vector output_ports; }; }