Backport #72233 to 24.10: Fix race in GraceHashJoin

This commit is contained in:
robot-clickhouse 2024-11-22 22:08:38 +00:00
parent 2f013c243f
commit b7250aaf64
3 changed files with 39 additions and 2 deletions

View File

@ -13,6 +13,7 @@
#include <Core/Settings.h> #include <Core/Settings.h>
#include <numeric> #include <numeric>
#include <shared_mutex>
#include <fmt/format.h> #include <fmt/format.h>
@ -533,6 +534,7 @@ public:
Block nextImpl() override Block nextImpl() override
{ {
ExtraBlockPtr not_processed = nullptr; ExtraBlockPtr not_processed = nullptr;
std::shared_lock shared(eof_mutex);
{ {
std::lock_guard lock(extra_block_mutex); std::lock_guard lock(extra_block_mutex);
@ -566,7 +568,24 @@ public:
block = left_reader.read(); block = left_reader.read();
if (!block) if (!block)
{ {
return {}; shared.unlock();
bool there_are_still_might_be_rows_to_process = false;
{
/// The following race condition could happen without this mutex:
/// * we're called from `IBlocksStream::next()`
/// * another thread just read the last block from `left_reader` and now is in the process of or about to call `joinBlock()`
/// * it might be that `joinBlock()` will leave some rows in the `not_processed`
/// * but if the current thread will return now an empty block `finished` will be set to true in `IBlocksStream::next()` and
/// these not processed rows will be lost
/// So we shouldn't finish execution while there is at least one in-flight `joinBlock()` call. Let's wait until we're alone
/// and double check if there are any not processed rows left.
std::unique_lock exclusive(eof_mutex);
std::lock_guard lock(extra_block_mutex);
if (!not_processed_blocks.empty())
there_are_still_might_be_rows_to_process = true;
}
return there_are_still_might_be_rows_to_process ? nextImpl() : Block();
} }
// block comes from left_reader, need to join with right table to get the result. // block comes from left_reader, need to join with right table to get the result.
@ -601,7 +620,7 @@ public:
return block; return block;
} }
size_t current_bucket; const size_t current_bucket;
Buckets buckets; Buckets buckets;
InMemoryJoinPtr hash_join; InMemoryJoinPtr hash_join;
@ -612,6 +631,8 @@ public:
std::mutex extra_block_mutex; std::mutex extra_block_mutex;
std::list<ExtraBlockPtr> not_processed_blocks TSA_GUARDED_BY(extra_block_mutex); std::list<ExtraBlockPtr> not_processed_blocks TSA_GUARDED_BY(extra_block_mutex);
std::shared_mutex eof_mutex;
}; };
IBlocksStreamPtr GraceHashJoin::getDelayedBlocks() IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS t0;
DROP TABLE IF EXISTS t1;
CREATE TABLE t0 (x UInt64) ENGINE = MergeTree ORDER BY x;
INSERT INTO t0 SELECT number from numbers(20);
CREATE TABLE t1 (x UInt64) ENGINE = MergeTree ORDER BY x;
INSERT INTO t1 SELECT number from numbers(5, 20);
SET max_joined_block_size_rows = 1;
SET grace_hash_join_initial_buckets = 2;
SET join_algorithm = 'grace_hash';
SELECT sum(x), count() FROM t0 JOIN t1 USING x;