From 35d83887057e245dc038e918913be45fa3ca66ae Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Fri, 12 May 2023 09:38:21 +0800 Subject: [PATCH] update --- src/Interpreters/GraceHashJoin.cpp | 13 +++++++++---- src/Processors/Transforms/JoiningTransform.cpp | 2 +- src/Processors/Transforms/JoiningTransform.h | 1 + 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 129d58a5436..4a4c69ff473 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -471,10 +471,15 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const return hash_join_is_empty; } - -// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform, -// only the last processor could call this function to ensure all used flags have been inserted. -// To support delayed stream mode, need to keep the hash join before next getDelayedBlocks call. +/// Each bucket are handled by the following steps +/// 1. build hash_join by the right side blocks. +/// 2. join left side with the hash_join, +/// 3. read right non-joined blocks from hash_join. +/// buckets are handled one by one, each hash_join will not be release before the right non-joined blocks are emitted. +/// +/// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform, +/// only one processor could take the non-joined blocks from right stream, and ensure all rows from +/// left stream have been emitted before this. IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const { diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index d4ea601209f..256ef66a27d 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -504,7 +504,7 @@ IProcessor::Status DelayedJoinedBlocksTransform::prepare() if (delayed_blocks) { // This counter is used to ensure that only the last DelayedJoinedBlocksWorkerTransform - // could read non-joined blocks from the join. + // could read right non-joined blocks from the join. auto left_delayed_stream_finished_counter = std::make_shared(outputs.size()); for (auto & output : outputs) { diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index 17222f0e514..3577906b26a 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -171,6 +171,7 @@ private: DelayedBlocksTaskPtr task; Chunk output_chunk; + /// All joined and non-joined rows from left stream are emitted, only right non-joined rows are left bool left_delayed_stream_finished = false; bool setup_non_joined_stream = false; IBlocksStreamPtr non_joined_delayed_stream = nullptr;