Merge pull request #47473 from ucasfl/explain-graph

Fix explain graph with projection
This commit is contained in:
robot-ch-test-poll1 2023-03-12 01:31:04 +01:00 committed by GitHub
commit bc39f3e4d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 205 additions and 6 deletions

View File

@ -504,7 +504,10 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
auto pipe = QueryPipelineBuilder::getPipe(std::move(*pipeline), resources);
const auto & processors = pipe.getProcessors();
printPipeline(processors, buf);
if (settings.compact)
printPipelineCompact(processors, buf, settings.query_pipeline_options.header);
else
printPipeline(processors, buf);
}
else
{

View File

@ -12,10 +12,19 @@ ISourceStep::ISourceStep(DataStream output_stream_)
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{
auto pipeline = std::make_unique<QueryPipelineBuilder>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
/// For `Source` step, since it's not add new Processors to `pipeline->pipe`
/// in `initializePipeline`, but make an assign with new created Pipe.
/// And Processors for the Step is added here. So we do not need to use
/// `QueryPipelineProcessorsCollector` to collect Processors.
initializePipeline(*pipeline, settings);
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
/// But we need to set QueryPlanStep manually for the Processors, which
/// will be used in `EXPLAIN PIPELINE`
for (auto & processor : processors)
{
processor->setQueryPlanStep(this);
}
return pipeline;
}

View File

@ -0,0 +1,177 @@
#include <QueryPipeline/printPipeline.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <set>
#include <map>
namespace DB
{
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header)
{
struct Node;
/// Group by processors name, QueryPlanStep and group in this step.
struct Key
{
size_t group;
IQueryPlanStep * step;
std::string name;
auto getTuple() const { return std::forward_as_tuple(group, step, name); }
bool operator<(const Key & other) const
{
return getTuple() < other.getTuple();
}
};
/// Group ports by header.
struct EdgeData
{
Block header;
size_t count;
};
using Edge = std::vector<EdgeData>;
struct Node
{
size_t id = 0;
std::map<Node *, Edge> edges = {};
std::vector<const IProcessor *> agents = {};
};
std::map<Key, Node> graph;
auto get_key = [](const IProcessor & processor)
{
return Key{processor.getQueryPlanStepGroup(), processor.getQueryPlanStep(), processor.getName()};
};
/// Fill nodes.
for (const auto & processor : processors)
{
auto res = graph.emplace(get_key(*processor), Node());
auto & node = res.first->second;
node.agents.emplace_back(processor.get());
if (res.second)
node.id = graph.size();
}
Block empty_header;
/// Fill edges.
for (const auto & processor : processors)
{
auto & from = graph[get_key(*processor)];
for (auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
auto & to = graph[get_key(port.getInputPort().getProcessor())];
auto & edge = from.edges[&to];
/// Use empty header for each edge if with_header is false.
const auto & header = with_header ? port.getHeader()
: empty_header;
/// Group by header.
bool found = false;
for (auto & item : edge)
{
if (blocksHaveEqualStructure(header, item.header))
{
found = true;
++item.count;
break;
}
}
if (!found)
edge.emplace_back(EdgeData{header, 1});
}
}
/// Group processors by it's QueryPlanStep.
std::map<IQueryPlanStep *, std::vector<const Node *>> steps_map;
for (const auto & item : graph)
steps_map[item.first.step].emplace_back(&item.second);
out << "digraph\n{\n";
out << " rankdir=\"LR\";\n";
out << " { node [shape = rect]\n";
/// Nodes // TODO quoting and escaping
size_t next_step = 0;
for (const auto & item : steps_map)
{
/// Use separate clusters for each step.
if (item.first != nullptr)
{
out << " subgraph cluster_" << next_step << " {\n";
out << " label =\"" << item.first->getName() << "\";\n";
out << " style=filled;\n";
out << " color=lightgrey;\n";
out << " node [style=filled,color=white];\n";
out << " { rank = same;\n";
++next_step;
}
for (const auto & node : item.second)
{
const auto & processor = node->agents.front();
out << " n" << node->id << " [label=\"" << processor->getName();
if (node->agents.size() > 1)
out << " × " << node->agents.size();
const auto & description = processor->getDescription();
if (!description.empty())
out << ' ' << description;
out << "\"];\n";
}
if (item.first != nullptr)
{
out << " }\n";
out << " }\n";
}
}
out << " }\n";
/// Edges
for (const auto & item : graph)
{
for (const auto & edge : item.second.edges)
{
for (const auto & data : edge.second)
{
out << " n" << item.second.id << " -> " << "n" << edge.first->id << " [label=\"";
if (data.count > 1)
out << "× " << data.count;
if (with_header)
{
for (const auto & elem : data.header)
{
out << "\n";
elem.dumpStructure(out);
}
}
out << "\"];\n";
}
}
}
out << "}\n";
}
}

View File

@ -64,4 +64,9 @@ void printPipeline(const Processors & processors, WriteBuffer & out)
printPipeline(processors, std::vector<IProcessor::Status>(), out);
}
/// Prints pipeline in compact representation.
/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup.
/// If QueryPlanStep wasn't set for processor, representation may be not correct.
/// If with_header is set, prints block header for each edge.
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header);
}

View File

@ -1,7 +1,12 @@
-- The server does not crash after these queries:
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID;
insert into t1(ID, name) values (1, 'abc'), (2, 'bbb');
-- The returned node order is uncertain
explain pipeline graph=1 select count(ID) from t1 FORMAT Null;
explain pipeline graph=1 select sum(1) from t1 FORMAT Null;
explain pipeline graph=1 select min(ID) from t1 FORMAT Null;
explain pipeline graph=1 select max(ID) from t1 FORMAT Null;
DROP TABLE t1;