diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp index b2172a07e91..3c225522cc4 100644 --- a/src/Interpreters/InterpreterExplainQuery.cpp +++ b/src/Interpreters/InterpreterExplainQuery.cpp @@ -504,7 +504,10 @@ QueryPipeline InterpreterExplainQuery::executeImpl() auto pipe = QueryPipelineBuilder::getPipe(std::move(*pipeline), resources); const auto & processors = pipe.getProcessors(); - printPipeline(processors, buf); + if (settings.compact) + printPipelineCompact(processors, buf, settings.query_pipeline_options.header); + else + printPipeline(processors, buf); } else { diff --git a/src/Processors/QueryPlan/ISourceStep.cpp b/src/Processors/QueryPlan/ISourceStep.cpp index 0644d9b44eb..37f56bc7a43 100644 --- a/src/Processors/QueryPlan/ISourceStep.cpp +++ b/src/Processors/QueryPlan/ISourceStep.cpp @@ -12,10 +12,19 @@ ISourceStep::ISourceStep(DataStream output_stream_) QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings) { auto pipeline = std::make_unique(); - QueryPipelineProcessorsCollector collector(*pipeline, this); + + /// For `Source` step, since it's not add new Processors to `pipeline->pipe` + /// in `initializePipeline`, but make an assign with new created Pipe. + /// And Processors for the Step is added here. So we do not need to use + /// `QueryPipelineProcessorsCollector` to collect Processors. initializePipeline(*pipeline, settings); - auto added_processors = collector.detachProcessors(); - processors.insert(processors.end(), added_processors.begin(), added_processors.end()); + + /// But we need to set QueryPlanStep manually for the Processors, which + /// will be used in `EXPLAIN PIPELINE` + for (auto & processor : processors) + { + processor->setQueryPlanStep(this); + } return pipeline; } diff --git a/src/QueryPipeline/printPipeline.cpp b/src/QueryPipeline/printPipeline.cpp new file mode 100644 index 00000000000..40c88502ed0 --- /dev/null +++ b/src/QueryPipeline/printPipeline.cpp @@ -0,0 +1,177 @@ +#include +#include +#include +#include + +namespace DB +{ + +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header) +{ + struct Node; + + /// Group by processors name, QueryPlanStep and group in this step. + struct Key + { + size_t group; + IQueryPlanStep * step; + std::string name; + + auto getTuple() const { return std::forward_as_tuple(group, step, name); } + + bool operator<(const Key & other) const + { + return getTuple() < other.getTuple(); + } + }; + + /// Group ports by header. + struct EdgeData + { + Block header; + size_t count; + }; + + using Edge = std::vector; + + struct Node + { + size_t id = 0; + std::map edges = {}; + std::vector agents = {}; + }; + + std::map graph; + + auto get_key = [](const IProcessor & processor) + { + return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()}; + }; + + /// Fill nodes. + for (const auto & processor : processors) + { + auto res = graph.emplace(get_key(*processor), Node()); + auto & node = res.first->second; + node.agents.emplace_back(processor.get()); + + if (res.second) + node.id = graph.size(); + } + + Block empty_header; + + /// Fill edges. + for (const auto & processor : processors) + { + auto & from = graph[get_key(*processor)]; + + for (auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + + auto & to = graph[get_key(port.getInputPort().getProcessor())]; + auto & edge = from.edges[&to]; + + /// Use empty header for each edge if with_header is false. + const auto & header = with_header ? port.getHeader() + : empty_header; + + /// Group by header. + bool found = false; + for (auto & item : edge) + { + if (blocksHaveEqualStructure(header, item.header)) + { + found = true; + ++item.count; + break; + } + } + + if (!found) + edge.emplace_back(EdgeData{header, 1}); + } + } + + /// Group processors by it's QueryPlanStep. + std::map> steps_map; + + for (const auto & item : graph) + steps_map[item.first.step].emplace_back(&item.second); + + out << "digraph\n{\n"; + out << " rankdir=\"LR\";\n"; + out << " { node [shape = rect]\n"; + + /// Nodes // TODO quoting and escaping + size_t next_step = 0; + for (const auto & item : steps_map) + { + /// Use separate clusters for each step. + if (item.first != nullptr) + { + out << " subgraph cluster_" << next_step << " {\n"; + out << " label =\"" << item.first->getName() << "\";\n"; + out << " style=filled;\n"; + out << " color=lightgrey;\n"; + out << " node [style=filled,color=white];\n"; + out << " { rank = same;\n"; + + ++next_step; + } + + for (const auto & node : item.second) + { + const auto & processor = node->agents.front(); + out << " n" << node->id << " [label=\"" << processor->getName(); + + if (node->agents.size() > 1) + out << " × " << node->agents.size(); + + const auto & description = processor->getDescription(); + if (!description.empty()) + out << ' ' << description; + + out << "\"];\n"; + } + + if (item.first != nullptr) + { + out << " }\n"; + out << " }\n"; + } + } + + out << " }\n"; + + /// Edges + for (const auto & item : graph) + { + for (const auto & edge : item.second.edges) + { + for (const auto & data : edge.second) + { + out << " n" << item.second.id << " -> " << "n" << edge.first->id << " [label=\""; + + if (data.count > 1) + out << "× " << data.count; + + if (with_header) + { + for (const auto & elem : data.header) + { + out << "\n"; + elem.dumpStructure(out); + } + } + + out << "\"];\n"; + } + } + } + out << "}\n"; +} + +} diff --git a/src/QueryPipeline/printPipeline.h b/src/QueryPipeline/printPipeline.h index ff3b53300ce..e91909cb50b 100644 --- a/src/QueryPipeline/printPipeline.h +++ b/src/QueryPipeline/printPipeline.h @@ -64,4 +64,9 @@ void printPipeline(const Processors & processors, WriteBuffer & out) printPipeline(processors, std::vector(), out); } +/// Prints pipeline in compact representation. +/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup. +/// If QueryPlanStep wasn't set for processor, representation may be not correct. +/// If with_header is set, prints block header for each edge. +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header); } diff --git a/tests/queries/0_stateless/25340_grace_hash_limit_race.reference b/tests/queries/0_stateless/02677_grace_hash_limit_race.reference similarity index 100% rename from tests/queries/0_stateless/25340_grace_hash_limit_race.reference rename to tests/queries/0_stateless/02677_grace_hash_limit_race.reference diff --git a/tests/queries/0_stateless/25340_grace_hash_limit_race.sql b/tests/queries/0_stateless/02677_grace_hash_limit_race.sql similarity index 100% rename from tests/queries/0_stateless/25340_grace_hash_limit_race.sql rename to tests/queries/0_stateless/02677_grace_hash_limit_race.sql diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph.reference b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference similarity index 100% rename from tests/queries/0_stateless/02678_explain_pipeline_graph.reference rename to tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph.sql b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql similarity index 50% rename from tests/queries/0_stateless/02678_explain_pipeline_graph.sql rename to tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql index 48cfbf2b349..e8b7405d602 100644 --- a/tests/queries/0_stateless/02678_explain_pipeline_graph.sql +++ b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql @@ -1,7 +1,12 @@ --- The server does not crash after these queries: - DROP TABLE IF EXISTS t1; CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID; + insert into t1(ID, name) values (1, 'abc'), (2, 'bbb'); + +-- The returned node order is uncertain explain pipeline graph=1 select count(ID) from t1 FORMAT Null; +explain pipeline graph=1 select sum(1) from t1 FORMAT Null; +explain pipeline graph=1 select min(ID) from t1 FORMAT Null; +explain pipeline graph=1 select max(ID) from t1 FORMAT Null; + DROP TABLE t1;