diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 2e622a4c18e..ad4457f924e 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -472,9 +472,9 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const return hash_join_is_empty; } -// This is only be called for bucket[0] at present. other buckets' non-joined blocks are generated in -// DelayedBlocks. -// There is a finished counter in JoiningTransform, only the last JoiningTransform could call this function. +// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform, +// only the last processor could call this function to ensure all used flags have been inserted. +// To support delayed stream mode, need to keep the hash join before next getDelayedBlocks call. IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const { @@ -489,60 +489,30 @@ public: Buckets buckets_, InMemoryJoinPtr hash_join_, const Names & left_key_names_, - const Names & right_key_names_, - const Block & left_sample_block_, - const Block & result_sample_block_, - size_t max_block_size_) + const Names & right_key_names_) : current_bucket(current_bucket_) , buckets(std::move(buckets_)) , hash_join(std::move(hash_join_)) , left_reader(buckets[current_bucket]->getLeftTableReader()) , left_key_names(left_key_names_) , right_key_names(right_key_names_) - , left_sample_block(left_sample_block_) - , result_sample_block(result_sample_block_) - , max_block_size(max_block_size_) { } - // One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransforms, need locks for - // - reading from left_reader. left_reader.read() has had a lock inside. - // - reading non-joined blocks from hash_join. Since iterate on non-joined blocks will has state - // changed inside. Block nextImpl() override { - // there is data race case wihthout this lock: - // 1. thread 1 read the last block from left_reader, but not finish the join - // 2. thread 2 try to read from non-joined blocks. Since thread 1 has not finished, - // the used flags in the hash_join is incomplete, then thread 2 return invalid mismatch rows. - std::lock_guard lock(non_joined_blocks_mutex); - Block block; size_t num_buckets = buckets.size(); size_t current_idx = buckets[current_bucket]->idx; do { - // When left reader finish, return non-joined blocks. - // empty block means the end of this stream. - if (!is_left_reader_finished) + // One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform. + // There is a lock inside left_reader.read(). + block = left_reader.read(); + if (!block) { - block = left_reader.read(); - if (!block) - { - is_left_reader_finished = true; - non_joined_blocks = hash_join->getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size); - } - } - - if (is_left_reader_finished) - { - if (non_joined_blocks) - { - return non_joined_blocks->next(); - } - else - return {}; + return {}; } // block comes from left_reader, need to join with right table to get the result. @@ -585,12 +555,6 @@ public: Names left_key_names; Names right_key_names; - Block left_sample_block; - Block result_sample_block; - size_t max_block_size = 0; - bool is_left_reader_finished = false; - IBlocksStreamPtr non_joined_blocks = nullptr; - std::mutex non_joined_blocks_mutex; }; IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() @@ -636,18 +600,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows", bucket_idx, hash_join->getTotalRowCount(), num_rows); - auto result_sample_block = left_sample_block; - ExtraBlockPtr tmp; - hash_join->joinBlock(result_sample_block, tmp); - return std::make_unique( - current_bucket->idx, - buckets, - hash_join, - left_key_names, - right_key_names, - left_sample_block, - result_sample_block, - max_block_size); + return std::make_unique(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names); } LOG_TRACE(log, "Finished loading all {} buckets", buckets.size()); diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index b638a5582c5..d80d6b8e387 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -311,8 +311,16 @@ void FillingRightJoinSideTransform::work() } -DelayedJoinedBlocksWorkerTransform::DelayedJoinedBlocksWorkerTransform(Block output_header) - : IProcessor(InputPorts{Block()}, OutputPorts{output_header}) +DelayedJoinedBlocksWorkerTransform::DelayedJoinedBlocksWorkerTransform( + Block left_header_, + Block output_header_, + size_t max_block_size_, + JoinPtr join_) + : IProcessor(InputPorts{Block()}, OutputPorts{output_header_}) + , left_header(left_header_) + , output_header(output_header_) + , max_block_size(max_block_size_) + , join(join_) { } @@ -366,6 +374,7 @@ IProcessor::Status DelayedJoinedBlocksWorkerTransform::prepare() if (!data.chunk.hasChunkInfo()) throw Exception(ErrorCodes::LOGICAL_ERROR, "DelayedJoinedBlocksWorkerTransform must have chunk info"); task = std::dynamic_pointer_cast(data.chunk.getChunkInfo()); + } else { @@ -386,12 +395,24 @@ void DelayedJoinedBlocksWorkerTransform::work() { if (!task) return; + Block block; + if (!left_delayed_stream_finished) + { + block = task->delayed_blocks->next(); - Block block = task->delayed_blocks->next(); - + if (!block) + { + left_delayed_stream_finished = true; + block = nextNonJoinedBlock(); + } + } + else + { + block = nextNonJoinedBlock(); + } if (!block) { - task.reset(); + resetTask(); return; } @@ -400,6 +421,38 @@ void DelayedJoinedBlocksWorkerTransform::work() output_chunk.setColumns(block.getColumns(), rows); } +void DelayedJoinedBlocksWorkerTransform::resetTask() +{ + task.reset(); + left_delayed_stream_finished = false; + setup_non_joined_stream = false; + non_joined_delay_stream = nullptr; +} + +Block DelayedJoinedBlocksWorkerTransform::nextNonJoinedBlock() +{ + if (!setup_non_joined_stream) + { + setup_non_joined_stream = true; + // Before read from non-joined stream, all blocks in left file reader must have been joined. + // For example, in HashJoin, it may return invalid mismatch rows from non-joined stream before + // the all blocks in left file reader have been finished, since the used flags are incomplete. + // To make only one processor could read from non-joined stream seems be a easy way. + if (task && task->left_delayed_stream_finish_counter->isLast()) + { + if (!non_joined_delay_stream) + { + non_joined_delay_stream = join->getNonJoinedBlocks(left_header, output_header, max_block_size); + } + } + } + if (non_joined_delay_stream) + { + return non_joined_delay_stream->next(); + } + return {}; +} + DelayedJoinedBlocksTransform::DelayedJoinedBlocksTransform(size_t num_streams, JoinPtr join_) : IProcessor(InputPorts{}, OutputPorts(num_streams, Block())) , join(std::move(join_)) @@ -451,10 +504,14 @@ IProcessor::Status DelayedJoinedBlocksTransform::prepare() if (delayed_blocks) { + // This counter is used to ensure that only the last DelayedJoinedBlocksWorkerTransform + // could read non-joined blocks from the join. + auto left_delayed_stream_finished_counter = std::make_shared(outputs.size()); for (auto & output : outputs) { Chunk chunk; - chunk.setChunkInfo(std::make_shared(delayed_blocks)); + auto task = std::make_shared(delayed_blocks, left_delayed_stream_finished_counter); + chunk.setChunkInfo(task); output.push(std::move(chunk)); } delayed_blocks = nullptr; diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index e7edff40c56..e9bd79bf623 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -116,9 +116,14 @@ class DelayedBlocksTask : public ChunkInfo public: explicit DelayedBlocksTask() : finished(true) {} - explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_) : delayed_blocks(std::move(delayed_blocks_)) {} + explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_, JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter_) + : delayed_blocks(std::move(delayed_blocks_)) + , left_delayed_stream_finish_counter(left_delayed_stream_finish_counter_) + { + } IBlocksStreamPtr delayed_blocks = nullptr; + JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter = nullptr; bool finished = false; }; @@ -147,7 +152,11 @@ private: class DelayedJoinedBlocksWorkerTransform : public IProcessor { public: - explicit DelayedJoinedBlocksWorkerTransform(Block output_header); + explicit DelayedJoinedBlocksWorkerTransform( + Block left_header_, + Block output_header_, + size_t max_block_size_, + JoinPtr join_); String getName() const override { return "DelayedJoinedBlocksWorkerTransform"; } @@ -155,10 +164,19 @@ public: void work() override; private: + Block left_header; + Block output_header; + size_t max_block_size; + JoinPtr join; DelayedBlocksTaskPtr task; Chunk output_chunk; - bool finished = false; + bool left_delayed_stream_finished = false; + bool setup_non_joined_stream = false; + IBlocksStreamPtr non_joined_delay_stream = nullptr; + + void resetTask(); + Block nextNonJoinedBlock(); }; } diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index a4edf107b2f..764997e7b7e 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -491,7 +491,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe if (delayed_root) { // Process delayed joined blocks when all JoiningTransform are finished. - auto delayed = std::make_shared(joined_header); + auto delayed = std::make_shared(left_header, joined_header, max_block_size, join); if (delayed->getInputs().size() != 1 || delayed->getOutputs().size() != 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "DelayedJoinedBlocksWorkerTransform should have one input and one output"); diff --git a/tests/queries/0_stateless/02274_full_sort_join_nodistinct.reference.j2 b/tests/queries/0_stateless/02274_full_sort_join_nodistinct.reference.j2 index 6b5b5bfd298..2dc7fe33423 100644 --- a/tests/queries/0_stateless/02274_full_sort_join_nodistinct.reference.j2 +++ b/tests/queries/0_stateless/02274_full_sort_join_nodistinct.reference.j2 @@ -360,6 +360,7 @@ t1 ALL FULL JOIN tn2 | bs = {{ block_size }} 2 \N 5 0 3 3 4 4 t1 ALL FULL JOIN USING tn2 | bs = {{ block_size }} +{% if join_algorithm == 'full_sorting_merge' -%} 1 4 5 1 4 5 2 5 0 @@ -377,6 +378,25 @@ t1 ALL FULL JOIN USING tn2 | bs = {{ block_size }} \N 0 5 \N 0 5 \N 0 5 +{% else -%} +0 0 5 +0 0 5 +0 0 5 +0 0 5 +0 0 5 +0 0 5 +0 0 5 +1 4 5 +1 4 5 +2 5 0 +2 5 0 +2 5 0 +2 5 0 +2 5 0 +2 5 0 +2 5 0 +3 4 4 +{% endif -%} tn1 ALL INNER JOIN t2 | bs = {{ block_size }} 1 1 4 5 1 1 4 5