From f55f70d67bbef77af89f49fd27a2b7658a174cc5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 6 Feb 2019 19:31:18 +0300 Subject: [PATCH] Added PipelineExecutor. --- .../Processors/Executors/PipelineExecutor.cpp | 180 ++++++++++++++++++ .../Processors/Executors/PipelineExecutor.h | 69 +++++++ 2 files changed, 249 insertions(+) create mode 100644 dbms/src/Processors/Executors/PipelineExecutor.cpp create mode 100644 dbms/src/Processors/Executors/PipelineExecutor.h diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp new file mode 100644 index 00000000000..437a308b5bc --- /dev/null +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -0,0 +1,180 @@ +#include +#include +#include + +namespace DB +{ + +namespace +{ + + +} + +void PipelineExecutor::buildGraph() +{ + std::unordered_map proc_map; + size_t num_processors = processors.size(); + + auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port) + { + String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output") + + " for processor " + parent->getName() + ", but not found in original list or all processors."; + + throw Exception(msg, ErrorCodes::LOGICAL_ERROR); + }; + + graph.resize(num_processors); + for (size_t node = 0; node < num_processors; ++node) + { + IProcessor * proc = processors[node].get(); + proc_map[proc] = node; + graph[node].processor = proc; + } + + for (size_t node = 0; node < num_processors; ++node) + { + const IProcessor * cur = graph[node].processor; + + for (const InputPort & input_port : processors[node]->getInputs()) + { + const IProcessor * proc = &input_port.getProcessor(); + + auto it = proc_map.find(proc); + if (it == proc_map.end()) + throwUnknownProcessor(proc, cur, true); + + size_t proc_num = it->second; + bool new_edge = true; + for (size_t edge = 0; new_edge && edge < graph[node].backEdges.size(); ++edge) + if (graph[node].backEdges[edge].from == proc_num) + new_edge = false; + + if (new_edge) + { + graph[node].backEdges.emplace_back(); + graph[node].backEdges.back().from = proc_num; + } + } + + for (const OutputPort & output_port : processors[node]->getOutputs()) + { + const IProcessor * proc = &output_port.getProcessor(); + + auto it = proc_map.find(proc); + if (it == proc_map.end()) + throwUnknownProcessor(proc, cur, true); + + size_t proc_num = it->second; + bool new_edge = true; + for (size_t edge = 0; new_edge && edge < graph[node].directEdges.size(); ++edge) + if (graph[node].directEdges[edge].to == proc_num) + new_edge = false; + + if (new_edge) + { + graph[node].directEdges.emplace_back(); + graph[node].directEdges.back().to = proc_num; + } + } + } +} + +void PipelineExecutor::prepareProcessor(std::queue & jobs, size_t pid) +{ + auto & node = graph[pid]; + auto status = node.processor->prepare(); + + switch (status) + { + case IProcessor::Status::NeedData: + case IProcessor::Status::PortFull: + case IProcessor::Status::Unneeded: + { + node.status = ExecStatus::Idle; + node.idle_status = status; + break; + } + case IProcessor::Status::Finished: + { + node.status = ExecStatus::Finished; + break; + } + case IProcessor::Status::Ready: + { + jobs.push(pid); + break; + } + case IProcessor::Status::Async: + { + throw Exception("Async is not supported for PipelineExecutor", ErrorCodes::LOGICAL_ERROR); + } + case IProcessor::Status::Wait: + { + throw Exception("Wait is not supported for PipelineExecutor", ErrorCodes::LOGICAL_ERROR); + } + } +} + +void PipelineExecutor::traverse(std::queue & preparing, size_t pid) +{ + for (const auto & edge : graph[pid].directEdges) + { + auto & node = graph[edge.to]; + if (node.status == ExecStatus::Idle) + { + preparing.push(edge.to); + node.status = ExecStatus::Preparing; + } + } + + for (const auto & edge : graph[pid].backEdges) + { + auto & node = graph[edge.from]; + if (node.status == ExecStatus::Idle) + { + preparing.push(edge.from); + node.status = ExecStatus::Preparing; + } + } +} + +void PipelineExecutor::execute() +{ + std::queue jobs; + std::queue preparing; + + size_t num_nodes = graph.size(); + for (size_t i = 0; i < num_nodes; ++i) + { + if (graph[i].directEdges.empty()) + { + preparing.push(i); + graph[i].status = ExecStatus::Preparing; + } + } + + if (preparing.empty()) + throw Exception("No sync processors were found.", ErrorCodes::LOGICAL_ERROR); + + while (!jobs.empty() || !preparing.empty()) + { + while (!jobs.empty()) + { + size_t pid = jobs.front(); + jobs.pop(); + addJob(pid); + /// Make a job + } + + while (!preparing.empty()) + { + size_t pid = preparing.front(); + preparing.pop(); + prepareProcessor(jobs, pid); + } + } + +} + +} diff --git a/dbms/src/Processors/Executors/PipelineExecutor.h b/dbms/src/Processors/Executors/PipelineExecutor.h new file mode 100644 index 00000000000..520bcf3d643 --- /dev/null +++ b/dbms/src/Processors/Executors/PipelineExecutor.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include + +template +class ThreadPoolImpl; +class ThreadFromGlobalPool; +using ThreadPool = ThreadPoolImpl; + +namespace DB +{ + +class PipelineExecutor +{ +private: + Processors processors; + ThreadPool & pool; + + struct DirectEdge + { + size_t to; + }; + + using DirectEdges = std::vector; + + struct BackEdge + { + size_t from; + }; + + using BackEdges = std::vector; + + enum class ExecStatus + { + None, + Preparing, + Executing, + Finished, + Idle, /// Prepare was called, but some error status was returned + }; + + struct Node + { + IProcessor * processor = nullptr; + DirectEdges directEdges; + BackEdges backEdges; + + ExecStatus status = ExecStatus::None; + IProcessor::Status idle_status; /// What prepare() returned if status is Idle. + }; + + using Nodes = std::vector; + + Nodes graph; + +public: + PipelineExecutor(const Processors & processors, ThreadPool & pool) : processors(processors), pool(pool) {} + void execute(); + + String getName() const { return "PipelineExecutor"; } + +private: + void buildGraph(); + void prepareProcessor(std::queue & jobs, size_t pid); + void traverse(std::queue & preparing, size_t pid); +}; + +}