This commit is contained in:
lgbo-ustc 2023-05-12 09:38:21 +08:00
parent 983514f946
commit 35d8388705
3 changed files with 11 additions and 5 deletions

View File

@ -471,10 +471,15 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
return hash_join_is_empty; return hash_join_is_empty;
} }
/// Each bucket are handled by the following steps
// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform, /// 1. build hash_join by the right side blocks.
// only the last processor could call this function to ensure all used flags have been inserted. /// 2. join left side with the hash_join,
// To support delayed stream mode, need to keep the hash join before next getDelayedBlocks call. /// 3. read right non-joined blocks from hash_join.
/// buckets are handled one by one, each hash_join will not be release before the right non-joined blocks are emitted.
///
/// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform,
/// only one processor could take the non-joined blocks from right stream, and ensure all rows from
/// left stream have been emitted before this.
IBlocksStreamPtr IBlocksStreamPtr
GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const
{ {

View File

@ -504,7 +504,7 @@ IProcessor::Status DelayedJoinedBlocksTransform::prepare()
if (delayed_blocks) if (delayed_blocks)
{ {
// This counter is used to ensure that only the last DelayedJoinedBlocksWorkerTransform // This counter is used to ensure that only the last DelayedJoinedBlocksWorkerTransform
// could read non-joined blocks from the join. // could read right non-joined blocks from the join.
auto left_delayed_stream_finished_counter = std::make_shared<JoiningTransform::FinishCounter>(outputs.size()); auto left_delayed_stream_finished_counter = std::make_shared<JoiningTransform::FinishCounter>(outputs.size());
for (auto & output : outputs) for (auto & output : outputs)
{ {

View File

@ -171,6 +171,7 @@ private:
DelayedBlocksTaskPtr task; DelayedBlocksTaskPtr task;
Chunk output_chunk; Chunk output_chunk;
/// All joined and non-joined rows from left stream are emitted, only right non-joined rows are left
bool left_delayed_stream_finished = false; bool left_delayed_stream_finished = false;
bool setup_non_joined_stream = false; bool setup_non_joined_stream = false;
IBlocksStreamPtr non_joined_delayed_stream = nullptr; IBlocksStreamPtr non_joined_delayed_stream = nullptr;