diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index a241d1cd258..e4323e31704 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -13,6 +13,7 @@ #include #include +#include #include @@ -527,6 +528,7 @@ public: Block nextImpl() override { ExtraBlockPtr not_processed = nullptr; + std::shared_lock shared(eof_mutex); { std::lock_guard lock(extra_block_mutex); @@ -560,7 +562,24 @@ public: block = left_reader.read(); if (!block) { - return {}; + shared.unlock(); + bool there_are_still_might_be_rows_to_process = false; + { + /// The following race condition could happen without this mutex: + /// * we're called from `IBlocksStream::next()` + /// * another thread just read the last block from `left_reader` and now is in the process of or about to call `joinBlock()` + /// * it might be that `joinBlock()` will leave some rows in the `not_processed` + /// * but if the current thread will return now an empty block `finished` will be set to true in `IBlocksStream::next()` and + /// these not processed rows will be lost + /// So we shouldn't finish execution while there is at least one in-flight `joinBlock()` call. Let's wait until we're alone + /// and double check if there are any not processed rows left. + std::unique_lock exclusive(eof_mutex); + + std::lock_guard lock(extra_block_mutex); + if (!not_processed_blocks.empty()) + there_are_still_might_be_rows_to_process = true; + } + return there_are_still_might_be_rows_to_process ? nextImpl() : Block(); } // block comes from left_reader, need to join with right table to get the result. @@ -595,7 +614,7 @@ public: return block; } - size_t current_bucket; + const size_t current_bucket; Buckets buckets; InMemoryJoinPtr hash_join; @@ -606,6 +625,8 @@ public: std::mutex extra_block_mutex; std::list not_processed_blocks TSA_GUARDED_BY(extra_block_mutex); + + std::shared_mutex eof_mutex; }; IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() diff --git a/tests/queries/0_stateless/03274_grace_hash_max_joined_block_size_rows_bug.reference b/tests/queries/0_stateless/03274_grace_hash_max_joined_block_size_rows_bug.reference new file mode 100644 index 00000000000..8b3a805c6a2 --- /dev/null +++ b/tests/queries/0_stateless/03274_grace_hash_max_joined_block_size_rows_bug.reference @@ -0,0 +1 @@ +180 15 diff --git a/tests/queries/0_stateless/03274_grace_hash_max_joined_block_size_rows_bug.sql b/tests/queries/0_stateless/03274_grace_hash_max_joined_block_size_rows_bug.sql new file mode 100644 index 00000000000..c91e226fdef --- /dev/null +++ b/tests/queries/0_stateless/03274_grace_hash_max_joined_block_size_rows_bug.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS t0; +DROP TABLE IF EXISTS t1; + +CREATE TABLE t0 (x UInt64) ENGINE = MergeTree ORDER BY x; +INSERT INTO t0 SELECT number from numbers(20); + +CREATE TABLE t1 (x UInt64) ENGINE = MergeTree ORDER BY x; +INSERT INTO t1 SELECT number from numbers(5, 20); + +SET max_joined_block_size_rows = 1; +SET grace_hash_join_initial_buckets = 2; +SET join_algorithm = 'grace_hash'; + +SELECT sum(x), count() FROM t0 JOIN t1 USING x; +