fix: Add signal ports in JoiningTransform conditionally

This commit is contained in:
Sergey Skvortsov 2022-06-20 03:13:01 +03:00
parent aa38076b0d
commit 6b25a36816
No known key found for this signature in database
GPG Key ID: 120217CE540C3670
3 changed files with 11 additions and 8 deletions

View File

@ -29,7 +29,7 @@ JoiningTransform::JoiningTransform(
bool on_totals_,
bool default_totals_,
FinishCounterPtr finish_counter_)
: IProcessor({input_header}, {transformHeader(input_header, join_), Block()})
: IProcessor({input_header}, {transformHeader(input_header, join_)})
, join(std::move(join_))
, on_totals(on_totals_)
, default_totals(default_totals_)
@ -37,15 +37,19 @@ JoiningTransform::JoiningTransform(
, max_block_size(max_block_size_)
{
if (!join->isFilled())
inputs.emplace_back(Block(), this);
{
inputs.emplace_back(Block(), this); // Wait for FillingRightJoinSideTransform
if (!on_totals)
outputs.emplace_back(Block(), this); // Signal for DelayedJoinedBlocksTransform
}
}
JoiningTransform::~JoiningTransform() = default;
OutputPort* JoiningTransform::getFinishedSignal()
OutputPort & JoiningTransform::getFinishedSignal()
{
assert(outputs.size() == 2);
return &outputs.back();
return outputs.back();
}
IProcessor::Status JoiningTransform::prepare()
@ -310,8 +314,7 @@ void FillingRightJoinSideTransform::work()
DelayedJoinedBlocksTransform::DelayedJoinedBlocksTransform(Block input_header, JoinPtr join_)
: IProcessor({Block()}, {JoiningTransform::transformHeader(input_header, join_)})
, join{std::move(join_)}
: IProcessor({Block()}, {JoiningTransform::transformHeader(input_header, join_)}), join{std::move(join_)}
{
}

View File

@ -53,7 +53,7 @@ public:
static Block transformHeader(Block header, const JoinPtr & join);
OutputPort* getFinishedSignal();
OutputPort & getFinishedSignal();
Status prepare() override;
void work() override;

View File

@ -401,7 +401,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, false, default_totals, finish_counter);
connect(**lit, joining->getInputs().front());
connect(**rit, joining->getInputs().back());
connect(*joining->getFinishedSignal(), barrier->addInputPort());
connect(joining->getFinishedSignal(), barrier->addInputPort());
// Process delayed joined blocks when all JoiningTransform are finished.
auto delayed = std::make_shared<DelayedJoinedBlocksTransform>(left->getHeader(), join);