From 530c9ae4905fb5f52b7ac460cf09fe0568cd72ea Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 15:43:19 +0000 Subject: [PATCH] revert --- src/Interpreters/InterpreterExplainQuery.cpp | 5 +- src/QueryPipeline/printPipeline.cpp | 173 ++++++++++++++++++ src/QueryPipeline/printPipeline.h | 5 + .../02678_explain_pipeline_graph.reference | 0 .../02678_explain_pipeline_graph.sql | 7 - ..._pipeline_graph_with_projection.reference} | 0 ...xplain_pipeline_graph_with_projection.sql} | 0 7 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 src/QueryPipeline/printPipeline.cpp delete mode 100644 tests/queries/0_stateless/02678_explain_pipeline_graph.reference delete mode 100644 tests/queries/0_stateless/02678_explain_pipeline_graph.sql rename tests/queries/0_stateless/{02678_explain_graph_with_projection.reference => 02678_explain_pipeline_graph_with_projection.reference} (100%) rename tests/queries/0_stateless/{02678_explain_graph_with_projection.sql => 02678_explain_pipeline_graph_with_projection.sql} (100%) diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp index b2172a07e91..3c225522cc4 100644 --- a/src/Interpreters/InterpreterExplainQuery.cpp +++ b/src/Interpreters/InterpreterExplainQuery.cpp @@ -504,7 +504,10 @@ QueryPipeline InterpreterExplainQuery::executeImpl() auto pipe = QueryPipelineBuilder::getPipe(std::move(*pipeline), resources); const auto & processors = pipe.getProcessors(); - printPipeline(processors, buf); + if (settings.compact) + printPipelineCompact(processors, buf, settings.query_pipeline_options.header); + else + printPipeline(processors, buf); } else { diff --git a/src/QueryPipeline/printPipeline.cpp b/src/QueryPipeline/printPipeline.cpp new file mode 100644 index 00000000000..de978578618 --- /dev/null +++ b/src/QueryPipeline/printPipeline.cpp @@ -0,0 +1,173 @@ +#include +#include +#include +#include + +namespace DB +{ + +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header) +{ + struct Node; + + /// Group by processors name, QueryPlanStep and group in this step. + struct Key + { + size_t group; + IQueryPlanStep * step; + std::string name; + + auto getTuple() const { return std::forward_as_tuple(group, step, name); } + + bool operator<(const Key & other) const { return getTuple() < other.getTuple(); } + }; + + /// Group ports by header. + struct EdgeData + { + Block header; + size_t count; + }; + + using Edge = std::vector; + + struct Node + { + size_t id = 0; + std::map edges = {}; + std::vector agents = {}; + }; + + std::map graph; + + auto get_key = [](const IProcessor & processor) { + return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()}; + }; + + /// Fill nodes. + for (const auto & processor : processors) + { + auto res = graph.emplace(get_key(*processor), Node()); + auto & node = res.first->second; + node.agents.emplace_back(processor.get()); + + if (res.second) + node.id = graph.size(); + } + + Block empty_header; + + /// Fill edges. + for (const auto & processor : processors) + { + auto & from = graph[get_key(*processor)]; + + for (auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + + auto & to = graph[get_key(port.getInputPort().getProcessor())]; + auto & edge = from.edges[&to]; + + /// Use empty header for each edge if with_header is false. + const auto & header = with_header ? port.getHeader() : empty_header; + + /// Group by header. + bool found = false; + for (auto & item : edge) + { + if (blocksHaveEqualStructure(header, item.header)) + { + found = true; + ++item.count; + break; + } + } + + if (!found) + edge.emplace_back(EdgeData{header, 1}); + } + } + + /// Group processors by it's QueryPlanStep. + std::map> steps_map; + + for (const auto & item : graph) + steps_map[item.first.step].emplace_back(&item.second); + + out << "digraph\n{\n"; + out << " rankdir=\"LR\";\n"; + out << " { node [shape = rect]\n"; + + /// Nodes // TODO quoting and escaping + size_t next_step = 0; + for (const auto & item : steps_map) + { + /// Use separate clusters for each step. + if (item.first != nullptr) + { + out << " subgraph cluster_" << next_step << " {\n"; + out << " label =\"" << item.first->getName() << "\";\n"; + out << " style=filled;\n"; + out << " color=lightgrey;\n"; + out << " node [style=filled,color=white];\n"; + out << " { rank = same;\n"; + + ++next_step; + } + + for (const auto & node : item.second) + { + const auto & processor = node->agents.front(); + out << " n" << node->id << " [label=\"" << processor->getName(); + + if (node->agents.size() > 1) + out << " × " << node->agents.size(); + + const auto & description = processor->getDescription(); + if (!description.empty()) + out << ' ' << description; + + out << "\"];\n"; + } + + if (item.first != nullptr) + { + out << " }\n"; + out << " }\n"; + } + } + + out << " }\n"; + + /// Edges + for (const auto & item : graph) + { + for (const auto & edge : item.second.edges) + { + for (const auto & data : edge.second) + { + out << " n" << item.second.id << " -> " + << "n" << edge.first->id << " [label=\""; + + if (data.count > 1) + out << "× " << data.count; + + if (with_header) + { + for (const auto & elem : data.header) + { + out << "\n"; + elem.dumpStructure(out); + } + } + + out << "\"];\n"; + } + } + } + out << "}\n"; +} + +} diff --git a/src/QueryPipeline/printPipeline.h b/src/QueryPipeline/printPipeline.h index ff3b53300ce..e91909cb50b 100644 --- a/src/QueryPipeline/printPipeline.h +++ b/src/QueryPipeline/printPipeline.h @@ -64,4 +64,9 @@ void printPipeline(const Processors & processors, WriteBuffer & out) printPipeline(processors, std::vector(), out); } +/// Prints pipeline in compact representation. +/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup. +/// If QueryPlanStep wasn't set for processor, representation may be not correct. +/// If with_header is set, prints block header for each edge. +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header); } diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph.reference b/tests/queries/0_stateless/02678_explain_pipeline_graph.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph.sql b/tests/queries/0_stateless/02678_explain_pipeline_graph.sql deleted file mode 100644 index 48cfbf2b349..00000000000 --- a/tests/queries/0_stateless/02678_explain_pipeline_graph.sql +++ /dev/null @@ -1,7 +0,0 @@ --- The server does not crash after these queries: - -DROP TABLE IF EXISTS t1; -CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID; -insert into t1(ID, name) values (1, 'abc'), (2, 'bbb'); -explain pipeline graph=1 select count(ID) from t1 FORMAT Null; -DROP TABLE t1; diff --git a/tests/queries/0_stateless/02678_explain_graph_with_projection.reference b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference similarity index 100% rename from tests/queries/0_stateless/02678_explain_graph_with_projection.reference rename to tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference diff --git a/tests/queries/0_stateless/02678_explain_graph_with_projection.sql b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql similarity index 100% rename from tests/queries/0_stateless/02678_explain_graph_with_projection.sql rename to tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql