From 3687698b155f5dca42a4a1266b8027128e519dae Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 15:03:10 +0000 Subject: [PATCH 1/7] Fix explain graph with projection --- src/Processors/QueryPlan/ISourceStep.cpp | 7 +- src/Processors/QueryPlan/ISourceStep.h | 2 + src/QueryPipeline/Pipe.cpp | 10 ++ src/QueryPipeline/Pipe.h | 2 + src/QueryPipeline/QueryPipelineBuilder.h | 2 + ... => 02677_grace_hash_limit_race.reference} | 0 ...ce.sql => 02677_grace_hash_limit_race.sql} | 0 ...78_explain_graph_with_projection.reference | 116 ++++++++++++++++++ .../02678_explain_graph_with_projection.sql | 11 ++ 9 files changed, 149 insertions(+), 1 deletion(-) rename tests/queries/0_stateless/{25340_grace_hash_limit_race.reference => 02677_grace_hash_limit_race.reference} (100%) rename tests/queries/0_stateless/{25340_grace_hash_limit_race.sql => 02677_grace_hash_limit_race.sql} (100%) create mode 100644 tests/queries/0_stateless/02678_explain_graph_with_projection.reference create mode 100644 tests/queries/0_stateless/02678_explain_graph_with_projection.sql diff --git a/src/Processors/QueryPlan/ISourceStep.cpp b/src/Processors/QueryPlan/ISourceStep.cpp index 0644d9b44eb..562f92ff63b 100644 --- a/src/Processors/QueryPlan/ISourceStep.cpp +++ b/src/Processors/QueryPlan/ISourceStep.cpp @@ -12,8 +12,13 @@ ISourceStep::ISourceStep(DataStream output_stream_) QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings) { auto pipeline = std::make_unique(); - QueryPipelineProcessorsCollector collector(*pipeline, this); initializePipeline(*pipeline, settings); + QueryPipelineProcessorsCollector collector(*pipeline, this); + + /// Properly collecting processors from Pipe. + /// At the creation time of a Pipe, since `collected_processors` is nullptr, + /// the processors can not be collected. + pipeline->collectProcessors(); auto added_processors = collector.detachProcessors(); processors.insert(processors.end(), added_processors.begin(), added_processors.end()); return pipeline; diff --git a/src/Processors/QueryPlan/ISourceStep.h b/src/Processors/QueryPlan/ISourceStep.h index 744b6f9b5c4..1327444697e 100644 --- a/src/Processors/QueryPlan/ISourceStep.h +++ b/src/Processors/QueryPlan/ISourceStep.h @@ -10,6 +10,8 @@ class ISourceStep : public IQueryPlanStep public: explicit ISourceStep(DataStream output_stream_); + String getName() const override { return "ISourceStep"; } + QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; virtual void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) = 0; diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 91ba01c479f..a20b154054a 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -867,4 +867,14 @@ void Pipe::transform(const Transformer & transformer, bool check_ports) max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } +void Pipe::collectProcessors() +{ + if (collected_processors) + { + for (const auto & processor : *processors) + { + collected_processors->emplace_back(processor); + } + } +} } diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 09931e38578..a19364f7907 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -104,6 +104,8 @@ public: std::shared_ptr getProcessorsPtr() { return processors; } + void collectProcessors(); + private: /// Header is common for all output below. Block header; diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 4f984680c75..51a1aa69563 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -185,6 +185,8 @@ public: static Pipe getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources); static QueryPipeline getPipeline(QueryPipelineBuilder builder); + void collectProcessors() { pipe.collectProcessors(); } + private: /// Destruction order: processors, header, locks, temporary storages, local contexts diff --git a/tests/queries/0_stateless/25340_grace_hash_limit_race.reference b/tests/queries/0_stateless/02677_grace_hash_limit_race.reference similarity index 100% rename from tests/queries/0_stateless/25340_grace_hash_limit_race.reference rename to tests/queries/0_stateless/02677_grace_hash_limit_race.reference diff --git a/tests/queries/0_stateless/25340_grace_hash_limit_race.sql b/tests/queries/0_stateless/02677_grace_hash_limit_race.sql similarity index 100% rename from tests/queries/0_stateless/25340_grace_hash_limit_race.sql rename to tests/queries/0_stateless/02677_grace_hash_limit_race.sql diff --git a/tests/queries/0_stateless/02678_explain_graph_with_projection.reference b/tests/queries/0_stateless/02678_explain_graph_with_projection.reference new file mode 100644 index 00000000000..0c513f735cc --- /dev/null +++ b/tests/queries/0_stateless/02678_explain_graph_with_projection.reference @@ -0,0 +1,116 @@ +digraph +{ + rankdir="LR"; + { node [shape = rect] + subgraph cluster_0 { + label ="Expression"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n4 [label="ExpressionTransform"]; + } + } + subgraph cluster_1 { + label ="ReadFromStorage"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n3 [label="AggregatingTransform"]; + n2 [label="ExpressionTransform"]; + n1 [label="SourceFromSingleChunk"]; + } + } + } + n3 -> n4 [label=""]; + n2 -> n3 [label=""]; + n1 -> n2 [label=""]; +} +digraph +{ + rankdir="LR"; + { node [shape = rect] + subgraph cluster_0 { + label ="Expression"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n4 [label="ExpressionTransform"]; + } + } + subgraph cluster_1 { + label ="ReadFromStorage"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n3 [label="AggregatingTransform"]; + n2 [label="ExpressionTransform"]; + n1 [label="SourceFromSingleChunk"]; + } + } + } + n3 -> n4 [label=""]; + n2 -> n3 [label=""]; + n1 -> n2 [label=""]; +} +digraph +{ + rankdir="LR"; + { node [shape = rect] + subgraph cluster_0 { + label ="Expression"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n4 [label="ExpressionTransform"]; + } + } + subgraph cluster_1 { + label ="ReadFromStorage"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n3 [label="AggregatingTransform"]; + n2 [label="ExpressionTransform"]; + n1 [label="SourceFromSingleChunk"]; + } + } + } + n3 -> n4 [label=""]; + n2 -> n3 [label=""]; + n1 -> n2 [label=""]; +} +digraph +{ + rankdir="LR"; + { node [shape = rect] + subgraph cluster_0 { + label ="Expression"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n4 [label="ExpressionTransform"]; + } + } + subgraph cluster_1 { + label ="ReadFromStorage"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + { rank = same; + n3 [label="AggregatingTransform"]; + n2 [label="ExpressionTransform"]; + n1 [label="SourceFromSingleChunk"]; + } + } + } + n3 -> n4 [label=""]; + n2 -> n3 [label=""]; + n1 -> n2 [label=""]; +} diff --git a/tests/queries/0_stateless/02678_explain_graph_with_projection.sql b/tests/queries/0_stateless/02678_explain_graph_with_projection.sql new file mode 100644 index 00000000000..06ee1078e74 --- /dev/null +++ b/tests/queries/0_stateless/02678_explain_graph_with_projection.sql @@ -0,0 +1,11 @@ +DROP TABLE IF EXISTS t1; +CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID; + +insert into t1(ID, name) values (1, 'abc'), (2, 'bbb'); + +explain pipeline graph=1 select count(ID) from t1; +explain pipeline graph=1 select sum(1) from t1; +explain pipeline graph=1 select min(ID) from t1; +explain pipeline graph=1 select max(ID) from t1; + +DROP TABLE t1; From 3e554c2925c0a8604d53bcb665ce8f1f5d2ebc9c Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 15:24:19 +0000 Subject: [PATCH 2/7] fix --- src/Processors/QueryPlan/ISourceStep.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Processors/QueryPlan/ISourceStep.h b/src/Processors/QueryPlan/ISourceStep.h index 1327444697e..744b6f9b5c4 100644 --- a/src/Processors/QueryPlan/ISourceStep.h +++ b/src/Processors/QueryPlan/ISourceStep.h @@ -10,8 +10,6 @@ class ISourceStep : public IQueryPlanStep public: explicit ISourceStep(DataStream output_stream_); - String getName() const override { return "ISourceStep"; } - QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; virtual void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) = 0; From 530c9ae4905fb5f52b7ac460cf09fe0568cd72ea Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 15:43:19 +0000 Subject: [PATCH 3/7] revert --- src/Interpreters/InterpreterExplainQuery.cpp | 5 +- src/QueryPipeline/printPipeline.cpp | 173 ++++++++++++++++++ src/QueryPipeline/printPipeline.h | 5 + .../02678_explain_pipeline_graph.reference | 0 .../02678_explain_pipeline_graph.sql | 7 - ..._pipeline_graph_with_projection.reference} | 0 ...xplain_pipeline_graph_with_projection.sql} | 0 7 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 src/QueryPipeline/printPipeline.cpp delete mode 100644 tests/queries/0_stateless/02678_explain_pipeline_graph.reference delete mode 100644 tests/queries/0_stateless/02678_explain_pipeline_graph.sql rename tests/queries/0_stateless/{02678_explain_graph_with_projection.reference => 02678_explain_pipeline_graph_with_projection.reference} (100%) rename tests/queries/0_stateless/{02678_explain_graph_with_projection.sql => 02678_explain_pipeline_graph_with_projection.sql} (100%) diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp index b2172a07e91..3c225522cc4 100644 --- a/src/Interpreters/InterpreterExplainQuery.cpp +++ b/src/Interpreters/InterpreterExplainQuery.cpp @@ -504,7 +504,10 @@ QueryPipeline InterpreterExplainQuery::executeImpl() auto pipe = QueryPipelineBuilder::getPipe(std::move(*pipeline), resources); const auto & processors = pipe.getProcessors(); - printPipeline(processors, buf); + if (settings.compact) + printPipelineCompact(processors, buf, settings.query_pipeline_options.header); + else + printPipeline(processors, buf); } else { diff --git a/src/QueryPipeline/printPipeline.cpp b/src/QueryPipeline/printPipeline.cpp new file mode 100644 index 00000000000..de978578618 --- /dev/null +++ b/src/QueryPipeline/printPipeline.cpp @@ -0,0 +1,173 @@ +#include +#include +#include +#include + +namespace DB +{ + +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header) +{ + struct Node; + + /// Group by processors name, QueryPlanStep and group in this step. + struct Key + { + size_t group; + IQueryPlanStep * step; + std::string name; + + auto getTuple() const { return std::forward_as_tuple(group, step, name); } + + bool operator<(const Key & other) const { return getTuple() < other.getTuple(); } + }; + + /// Group ports by header. + struct EdgeData + { + Block header; + size_t count; + }; + + using Edge = std::vector; + + struct Node + { + size_t id = 0; + std::map edges = {}; + std::vector agents = {}; + }; + + std::map graph; + + auto get_key = [](const IProcessor & processor) { + return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()}; + }; + + /// Fill nodes. + for (const auto & processor : processors) + { + auto res = graph.emplace(get_key(*processor), Node()); + auto & node = res.first->second; + node.agents.emplace_back(processor.get()); + + if (res.second) + node.id = graph.size(); + } + + Block empty_header; + + /// Fill edges. + for (const auto & processor : processors) + { + auto & from = graph[get_key(*processor)]; + + for (auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + + auto & to = graph[get_key(port.getInputPort().getProcessor())]; + auto & edge = from.edges[&to]; + + /// Use empty header for each edge if with_header is false. + const auto & header = with_header ? port.getHeader() : empty_header; + + /// Group by header. + bool found = false; + for (auto & item : edge) + { + if (blocksHaveEqualStructure(header, item.header)) + { + found = true; + ++item.count; + break; + } + } + + if (!found) + edge.emplace_back(EdgeData{header, 1}); + } + } + + /// Group processors by it's QueryPlanStep. + std::map> steps_map; + + for (const auto & item : graph) + steps_map[item.first.step].emplace_back(&item.second); + + out << "digraph\n{\n"; + out << " rankdir=\"LR\";\n"; + out << " { node [shape = rect]\n"; + + /// Nodes // TODO quoting and escaping + size_t next_step = 0; + for (const auto & item : steps_map) + { + /// Use separate clusters for each step. + if (item.first != nullptr) + { + out << " subgraph cluster_" << next_step << " {\n"; + out << " label =\"" << item.first->getName() << "\";\n"; + out << " style=filled;\n"; + out << " color=lightgrey;\n"; + out << " node [style=filled,color=white];\n"; + out << " { rank = same;\n"; + + ++next_step; + } + + for (const auto & node : item.second) + { + const auto & processor = node->agents.front(); + out << " n" << node->id << " [label=\"" << processor->getName(); + + if (node->agents.size() > 1) + out << " × " << node->agents.size(); + + const auto & description = processor->getDescription(); + if (!description.empty()) + out << ' ' << description; + + out << "\"];\n"; + } + + if (item.first != nullptr) + { + out << " }\n"; + out << " }\n"; + } + } + + out << " }\n"; + + /// Edges + for (const auto & item : graph) + { + for (const auto & edge : item.second.edges) + { + for (const auto & data : edge.second) + { + out << " n" << item.second.id << " -> " + << "n" << edge.first->id << " [label=\""; + + if (data.count > 1) + out << "× " << data.count; + + if (with_header) + { + for (const auto & elem : data.header) + { + out << "\n"; + elem.dumpStructure(out); + } + } + + out << "\"];\n"; + } + } + } + out << "}\n"; +} + +} diff --git a/src/QueryPipeline/printPipeline.h b/src/QueryPipeline/printPipeline.h index ff3b53300ce..e91909cb50b 100644 --- a/src/QueryPipeline/printPipeline.h +++ b/src/QueryPipeline/printPipeline.h @@ -64,4 +64,9 @@ void printPipeline(const Processors & processors, WriteBuffer & out) printPipeline(processors, std::vector(), out); } +/// Prints pipeline in compact representation. +/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup. +/// If QueryPlanStep wasn't set for processor, representation may be not correct. +/// If with_header is set, prints block header for each edge. +void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header); } diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph.reference b/tests/queries/0_stateless/02678_explain_pipeline_graph.reference deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph.sql b/tests/queries/0_stateless/02678_explain_pipeline_graph.sql deleted file mode 100644 index 48cfbf2b349..00000000000 --- a/tests/queries/0_stateless/02678_explain_pipeline_graph.sql +++ /dev/null @@ -1,7 +0,0 @@ --- The server does not crash after these queries: - -DROP TABLE IF EXISTS t1; -CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID; -insert into t1(ID, name) values (1, 'abc'), (2, 'bbb'); -explain pipeline graph=1 select count(ID) from t1 FORMAT Null; -DROP TABLE t1; diff --git a/tests/queries/0_stateless/02678_explain_graph_with_projection.reference b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference similarity index 100% rename from tests/queries/0_stateless/02678_explain_graph_with_projection.reference rename to tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference diff --git a/tests/queries/0_stateless/02678_explain_graph_with_projection.sql b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql similarity index 100% rename from tests/queries/0_stateless/02678_explain_graph_with_projection.sql rename to tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql From 191bc9717e5f6dd7e6e3065e1588d98a3e0fa110 Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 15:58:16 +0000 Subject: [PATCH 4/7] fix style --- src/QueryPipeline/printPipeline.cpp | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/QueryPipeline/printPipeline.cpp b/src/QueryPipeline/printPipeline.cpp index de978578618..40c88502ed0 100644 --- a/src/QueryPipeline/printPipeline.cpp +++ b/src/QueryPipeline/printPipeline.cpp @@ -1,7 +1,7 @@ -#include -#include -#include #include +#include +#include +#include namespace DB { @@ -19,7 +19,10 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool auto getTuple() const { return std::forward_as_tuple(group, step, name); } - bool operator<(const Key & other) const { return getTuple() < other.getTuple(); } + bool operator<(const Key & other) const + { + return getTuple() < other.getTuple(); + } }; /// Group ports by header. @@ -40,7 +43,8 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool std::map graph; - auto get_key = [](const IProcessor & processor) { + auto get_key = [](const IProcessor & processor) + { return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()}; }; @@ -60,7 +64,7 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool /// Fill edges. for (const auto & processor : processors) { - auto & from = graph[get_key(*processor)]; + auto & from = graph[get_key(*processor)]; for (auto & port : processor->getOutputs()) { @@ -71,7 +75,8 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool auto & edge = from.edges[&to]; /// Use empty header for each edge if with_header is false. - const auto & header = with_header ? port.getHeader() : empty_header; + const auto & header = with_header ? port.getHeader() + : empty_header; /// Group by header. bool found = false; @@ -148,8 +153,7 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool { for (const auto & data : edge.second) { - out << " n" << item.second.id << " -> " - << "n" << edge.first->id << " [label=\""; + out << " n" << item.second.id << " -> " << "n" << edge.first->id << " [label=\""; if (data.count > 1) out << "× " << data.count; From 3190b89f9aff8fc906e36634ef002bfc125feaa6 Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 18:04:19 +0000 Subject: [PATCH 5/7] Fix --- src/Processors/QueryPlan/ISourceStep.cpp | 8 +- ...n_pipeline_graph_with_projection.reference | 116 ------------------ ...explain_pipeline_graph_with_projection.sql | 9 +- 3 files changed, 11 insertions(+), 122 deletions(-) diff --git a/src/Processors/QueryPlan/ISourceStep.cpp b/src/Processors/QueryPlan/ISourceStep.cpp index 562f92ff63b..062f6d85bac 100644 --- a/src/Processors/QueryPlan/ISourceStep.cpp +++ b/src/Processors/QueryPlan/ISourceStep.cpp @@ -12,15 +12,19 @@ ISourceStep::ISourceStep(DataStream output_stream_) QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings) { auto pipeline = std::make_unique(); + + /// Why we need first initializePipeline first: since it's not + /// add new Processors to `pipeline->pipe`, but make an assign + /// with new created Pipe. And Processors for the Step is added here. initializePipeline(*pipeline, settings); + QueryPipelineProcessorsCollector collector(*pipeline, this); /// Properly collecting processors from Pipe. /// At the creation time of a Pipe, since `collected_processors` is nullptr, /// the processors can not be collected. pipeline->collectProcessors(); - auto added_processors = collector.detachProcessors(); - processors.insert(processors.end(), added_processors.begin(), added_processors.end()); + collector.detachProcessors(); return pipeline; } diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference index 0c513f735cc..e69de29bb2d 100644 --- a/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference +++ b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.reference @@ -1,116 +0,0 @@ -digraph -{ - rankdir="LR"; - { node [shape = rect] - subgraph cluster_0 { - label ="Expression"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n4 [label="ExpressionTransform"]; - } - } - subgraph cluster_1 { - label ="ReadFromStorage"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n3 [label="AggregatingTransform"]; - n2 [label="ExpressionTransform"]; - n1 [label="SourceFromSingleChunk"]; - } - } - } - n3 -> n4 [label=""]; - n2 -> n3 [label=""]; - n1 -> n2 [label=""]; -} -digraph -{ - rankdir="LR"; - { node [shape = rect] - subgraph cluster_0 { - label ="Expression"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n4 [label="ExpressionTransform"]; - } - } - subgraph cluster_1 { - label ="ReadFromStorage"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n3 [label="AggregatingTransform"]; - n2 [label="ExpressionTransform"]; - n1 [label="SourceFromSingleChunk"]; - } - } - } - n3 -> n4 [label=""]; - n2 -> n3 [label=""]; - n1 -> n2 [label=""]; -} -digraph -{ - rankdir="LR"; - { node [shape = rect] - subgraph cluster_0 { - label ="Expression"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n4 [label="ExpressionTransform"]; - } - } - subgraph cluster_1 { - label ="ReadFromStorage"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n3 [label="AggregatingTransform"]; - n2 [label="ExpressionTransform"]; - n1 [label="SourceFromSingleChunk"]; - } - } - } - n3 -> n4 [label=""]; - n2 -> n3 [label=""]; - n1 -> n2 [label=""]; -} -digraph -{ - rankdir="LR"; - { node [shape = rect] - subgraph cluster_0 { - label ="Expression"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n4 [label="ExpressionTransform"]; - } - } - subgraph cluster_1 { - label ="ReadFromStorage"; - style=filled; - color=lightgrey; - node [style=filled,color=white]; - { rank = same; - n3 [label="AggregatingTransform"]; - n2 [label="ExpressionTransform"]; - n1 [label="SourceFromSingleChunk"]; - } - } - } - n3 -> n4 [label=""]; - n2 -> n3 [label=""]; - n1 -> n2 [label=""]; -} diff --git a/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql index 06ee1078e74..e8b7405d602 100644 --- a/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql +++ b/tests/queries/0_stateless/02678_explain_pipeline_graph_with_projection.sql @@ -3,9 +3,10 @@ CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID; insert into t1(ID, name) values (1, 'abc'), (2, 'bbb'); -explain pipeline graph=1 select count(ID) from t1; -explain pipeline graph=1 select sum(1) from t1; -explain pipeline graph=1 select min(ID) from t1; -explain pipeline graph=1 select max(ID) from t1; +-- The returned node order is uncertain +explain pipeline graph=1 select count(ID) from t1 FORMAT Null; +explain pipeline graph=1 select sum(1) from t1 FORMAT Null; +explain pipeline graph=1 select min(ID) from t1 FORMAT Null; +explain pipeline graph=1 select max(ID) from t1 FORMAT Null; DROP TABLE t1; From 26828449518ace4433b001243052af249ea1c1fc Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 18:06:50 +0000 Subject: [PATCH 6/7] fix --- src/Processors/QueryPlan/ISourceStep.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Processors/QueryPlan/ISourceStep.cpp b/src/Processors/QueryPlan/ISourceStep.cpp index 062f6d85bac..0ea386518b7 100644 --- a/src/Processors/QueryPlan/ISourceStep.cpp +++ b/src/Processors/QueryPlan/ISourceStep.cpp @@ -13,8 +13,8 @@ QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const { auto pipeline = std::make_unique(); - /// Why we need first initializePipeline first: since it's not - /// add new Processors to `pipeline->pipe`, but make an assign + /// Why we need initializePipeline first: since it's not add + /// new Processors to `pipeline->pipe`, but make an assign /// with new created Pipe. And Processors for the Step is added here. initializePipeline(*pipeline, settings); From e56eeac05fe4afcf3c2f73c1e28d8a5f39d09ab3 Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 11 Mar 2023 18:48:36 +0000 Subject: [PATCH 7/7] fix --- src/Processors/QueryPlan/ISourceStep.cpp | 20 ++++++++++---------- src/QueryPipeline/Pipe.cpp | 10 ---------- src/QueryPipeline/Pipe.h | 2 -- src/QueryPipeline/QueryPipelineBuilder.h | 2 -- 4 files changed, 10 insertions(+), 24 deletions(-) diff --git a/src/Processors/QueryPlan/ISourceStep.cpp b/src/Processors/QueryPlan/ISourceStep.cpp index 0ea386518b7..37f56bc7a43 100644 --- a/src/Processors/QueryPlan/ISourceStep.cpp +++ b/src/Processors/QueryPlan/ISourceStep.cpp @@ -13,18 +13,18 @@ QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const { auto pipeline = std::make_unique(); - /// Why we need initializePipeline first: since it's not add - /// new Processors to `pipeline->pipe`, but make an assign - /// with new created Pipe. And Processors for the Step is added here. + /// For `Source` step, since it's not add new Processors to `pipeline->pipe` + /// in `initializePipeline`, but make an assign with new created Pipe. + /// And Processors for the Step is added here. So we do not need to use + /// `QueryPipelineProcessorsCollector` to collect Processors. initializePipeline(*pipeline, settings); - QueryPipelineProcessorsCollector collector(*pipeline, this); - - /// Properly collecting processors from Pipe. - /// At the creation time of a Pipe, since `collected_processors` is nullptr, - /// the processors can not be collected. - pipeline->collectProcessors(); - collector.detachProcessors(); + /// But we need to set QueryPlanStep manually for the Processors, which + /// will be used in `EXPLAIN PIPELINE` + for (auto & processor : processors) + { + processor->setQueryPlanStep(this); + } return pipeline; } diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index a20b154054a..91ba01c479f 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -867,14 +867,4 @@ void Pipe::transform(const Transformer & transformer, bool check_ports) max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } -void Pipe::collectProcessors() -{ - if (collected_processors) - { - for (const auto & processor : *processors) - { - collected_processors->emplace_back(processor); - } - } -} } diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index a19364f7907..09931e38578 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -104,8 +104,6 @@ public: std::shared_ptr getProcessorsPtr() { return processors; } - void collectProcessors(); - private: /// Header is common for all output below. Block header; diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 51a1aa69563..4f984680c75 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -185,8 +185,6 @@ public: static Pipe getPipe(QueryPipelineBuilder pipeline, QueryPlanResourceHolder & resources); static QueryPipeline getPipeline(QueryPipelineBuilder builder); - void collectProcessors() { pipe.collectProcessors(); } - private: /// Destruction order: processors, header, locks, temporary storages, local contexts