This commit is contained in:
flynn 2023-03-11 18:04:19 +00:00
parent 191bc9717e
commit 3190b89f9a
3 changed files with 11 additions and 122 deletions

View File

@ -12,15 +12,19 @@ ISourceStep::ISourceStep(DataStream output_stream_)
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{
auto pipeline = std::make_unique<QueryPipelineBuilder>();
/// Why we need first initializePipeline first: since it's not
/// add new Processors to `pipeline->pipe`, but make an assign
/// with new created Pipe. And Processors for the Step is added here.
initializePipeline(*pipeline, settings);
QueryPipelineProcessorsCollector collector(*pipeline, this);
/// Properly collecting processors from Pipe.
/// At the creation time of a Pipe, since `collected_processors` is nullptr,
/// the processors can not be collected.
pipeline->collectProcessors();
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
collector.detachProcessors();
return pipeline;
}

View File

@ -1,116 +0,0 @@
digraph
{
rankdir="LR";
{ node [shape = rect]
subgraph cluster_0 {
label ="Expression";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n4 [label="ExpressionTransform"];
}
}
subgraph cluster_1 {
label ="ReadFromStorage";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n3 [label="AggregatingTransform"];
n2 [label="ExpressionTransform"];
n1 [label="SourceFromSingleChunk"];
}
}
}
n3 -> n4 [label=""];
n2 -> n3 [label=""];
n1 -> n2 [label=""];
}
digraph
{
rankdir="LR";
{ node [shape = rect]
subgraph cluster_0 {
label ="Expression";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n4 [label="ExpressionTransform"];
}
}
subgraph cluster_1 {
label ="ReadFromStorage";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n3 [label="AggregatingTransform"];
n2 [label="ExpressionTransform"];
n1 [label="SourceFromSingleChunk"];
}
}
}
n3 -> n4 [label=""];
n2 -> n3 [label=""];
n1 -> n2 [label=""];
}
digraph
{
rankdir="LR";
{ node [shape = rect]
subgraph cluster_0 {
label ="Expression";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n4 [label="ExpressionTransform"];
}
}
subgraph cluster_1 {
label ="ReadFromStorage";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n3 [label="AggregatingTransform"];
n2 [label="ExpressionTransform"];
n1 [label="SourceFromSingleChunk"];
}
}
}
n3 -> n4 [label=""];
n2 -> n3 [label=""];
n1 -> n2 [label=""];
}
digraph
{
rankdir="LR";
{ node [shape = rect]
subgraph cluster_0 {
label ="Expression";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n4 [label="ExpressionTransform"];
}
}
subgraph cluster_1 {
label ="ReadFromStorage";
style=filled;
color=lightgrey;
node [style=filled,color=white];
{ rank = same;
n3 [label="AggregatingTransform"];
n2 [label="ExpressionTransform"];
n1 [label="SourceFromSingleChunk"];
}
}
}
n3 -> n4 [label=""];
n2 -> n3 [label=""];
n1 -> n2 [label=""];
}

View File

@ -3,9 +3,10 @@ CREATE TABLE t1(ID UInt64, name String) engine=MergeTree order by ID;
insert into t1(ID, name) values (1, 'abc'), (2, 'bbb');
explain pipeline graph=1 select count(ID) from t1;
explain pipeline graph=1 select sum(1) from t1;
explain pipeline graph=1 select min(ID) from t1;
explain pipeline graph=1 select max(ID) from t1;
-- The returned node order is uncertain
explain pipeline graph=1 select count(ID) from t1 FORMAT Null;
explain pipeline graph=1 select sum(1) from t1 FORMAT Null;
explain pipeline graph=1 select min(ID) from t1 FORMAT Null;
explain pipeline graph=1 select max(ID) from t1 FORMAT Null;
DROP TABLE t1;