diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index 537d94b9b3d..b3c314dc8ce 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -29,7 +29,7 @@ JoiningTransform::JoiningTransform( bool on_totals_, bool default_totals_, FinishCounterPtr finish_counter_) - : IProcessor({input_header}, {transformHeader(input_header, join_), Block()}) + : IProcessor({input_header}, {transformHeader(input_header, join_)}) , join(std::move(join_)) , on_totals(on_totals_) , default_totals(default_totals_) @@ -37,15 +37,19 @@ JoiningTransform::JoiningTransform( , max_block_size(max_block_size_) { if (!join->isFilled()) - inputs.emplace_back(Block(), this); + { + inputs.emplace_back(Block(), this); // Wait for FillingRightJoinSideTransform + if (!on_totals) + outputs.emplace_back(Block(), this); // Signal for DelayedJoinedBlocksTransform + } } JoiningTransform::~JoiningTransform() = default; -OutputPort* JoiningTransform::getFinishedSignal() +OutputPort & JoiningTransform::getFinishedSignal() { assert(outputs.size() == 2); - return &outputs.back(); + return outputs.back(); } IProcessor::Status JoiningTransform::prepare() @@ -310,8 +314,7 @@ void FillingRightJoinSideTransform::work() DelayedJoinedBlocksTransform::DelayedJoinedBlocksTransform(Block input_header, JoinPtr join_) - : IProcessor({Block()}, {JoiningTransform::transformHeader(input_header, join_)}) - , join{std::move(join_)} + : IProcessor({Block()}, {JoiningTransform::transformHeader(input_header, join_)}), join{std::move(join_)} { } diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index d5d56163c78..53024356566 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -53,7 +53,7 @@ public: static Block transformHeader(Block header, const JoinPtr & join); - OutputPort* getFinishedSignal(); + OutputPort & getFinishedSignal(); Status prepare() override; void work() override; diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 1bb8e857b91..caa059a0a3f 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -401,7 +401,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelines( auto joining = std::make_shared(left->getHeader(), join, max_block_size, false, default_totals, finish_counter); connect(**lit, joining->getInputs().front()); connect(**rit, joining->getInputs().back()); - connect(*joining->getFinishedSignal(), barrier->addInputPort()); + connect(joining->getFinishedSignal(), barrier->addInputPort()); // Process delayed joined blocks when all JoiningTransform are finished. auto delayed = std::make_shared(left->getHeader(), join);