Improve the pipeline description for JOIN (#35612)

Improve the pipeline description for JOIN
This commit is contained in:
何李夫 2022-04-04 19:56:41 +08:00 committed by GitHub
parent e6c9a36ac7
commit 09c04e4993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 51 additions and 1 deletions

View File

@ -70,4 +70,9 @@ void ITransformingStep::describePipeline(FormatSettings & settings) const
IQueryPlanStep::describePipeline(processors, settings);
}
void ITransformingStep::appendExtraProcessors(const Processors & extra_processors)
{
processors.insert(processors.end(), extra_processors.begin(), extra_processors.end());
}
}

View File

@ -57,6 +57,9 @@ public:
void describePipeline(FormatSettings & settings) const override;
/// Append extra processors for this step.
void appendExtraProcessors(const Processors & extra_processors);
protected:
/// Clear distinct_columns if res_header doesn't contain all of them.
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);

View File

@ -1,5 +1,6 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
@ -307,7 +308,15 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
right->pipe.dropExtremes();
left->pipe.collected_processors = collected_processors;
right->pipe.collected_processors = collected_processors;
/// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right);
/// Remember the last step of the right pipeline.
ExpressionStep* step = typeid_cast<ExpressionStep*>(right->pipe.processors.back()->getQueryPlanStep());
if (!step)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "The top step of the right pipeline should be ExpressionStep");
}
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;
@ -377,6 +386,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
left->pipe.processors.emplace_back(std::move(joining));
}
/// Move the collected processors to the last step in the right pipeline.
Processors processors = collector.detachProcessors();
step->appendExtraProcessors(processors);
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
left->pipe.holder = std::move(right->pipe.holder);
left->pipe.header = left->pipe.output_ports.front()->getHeader();

View File

@ -0,0 +1,19 @@
(Expression)
ExpressionTransform
(Join)
JoiningTransform 2 → 1
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1
(Expression)
FillingRightJoinSide
ExpressionTransform
(SettingQuotaAndLimits)
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1

View File

@ -0,0 +1,10 @@
EXPLAIN PIPELINE
SELECT * FROM
(
SELECT * FROM system.numbers LIMIT 10
) t1
ALL LEFT JOIN
(
SELECT * FROM system.numbers LIMIT 10
) t2
USING number;