Merge pull request #72323 from ClickHouse/backport/24.8/72233

Backport #72233 to 24.8: Fix race in `GraceHashJoin`
This commit is contained in:
robot-clickhouse 2024-11-23 01:12:02 +01:00 committed by GitHub
commit af707a8294
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 2 deletions

View File

@ -13,6 +13,7 @@
#include <Core/Settings.h>
#include <numeric>
#include <shared_mutex>
#include <fmt/format.h>
@ -527,6 +528,7 @@ public:
Block nextImpl() override
{
ExtraBlockPtr not_processed = nullptr;
std::shared_lock shared(eof_mutex);
{
std::lock_guard lock(extra_block_mutex);
@ -560,7 +562,24 @@ public:
block = left_reader.read();
if (!block)
{
return {};
shared.unlock();
bool there_are_still_might_be_rows_to_process = false;
{
/// The following race condition could happen without this mutex:
/// * we're called from `IBlocksStream::next()`
/// * another thread just read the last block from `left_reader` and now is in the process of or about to call `joinBlock()`
/// * it might be that `joinBlock()` will leave some rows in the `not_processed`
/// * but if the current thread will return now an empty block `finished` will be set to true in `IBlocksStream::next()` and
/// these not processed rows will be lost
/// So we shouldn't finish execution while there is at least one in-flight `joinBlock()` call. Let's wait until we're alone
/// and double check if there are any not processed rows left.
std::unique_lock exclusive(eof_mutex);
std::lock_guard lock(extra_block_mutex);
if (!not_processed_blocks.empty())
there_are_still_might_be_rows_to_process = true;
}
return there_are_still_might_be_rows_to_process ? nextImpl() : Block();
}
// block comes from left_reader, need to join with right table to get the result.
@ -595,7 +614,7 @@ public:
return block;
}
size_t current_bucket;
const size_t current_bucket;
Buckets buckets;
InMemoryJoinPtr hash_join;
@ -606,6 +625,8 @@ public:
std::mutex extra_block_mutex;
std::list<ExtraBlockPtr> not_processed_blocks TSA_GUARDED_BY(extra_block_mutex);
std::shared_mutex eof_mutex;
};
IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS t0;
DROP TABLE IF EXISTS t1;
CREATE TABLE t0 (x UInt64) ENGINE = MergeTree ORDER BY x;
INSERT INTO t0 SELECT number from numbers(20);
CREATE TABLE t1 (x UInt64) ENGINE = MergeTree ORDER BY x;
INSERT INTO t1 SELECT number from numbers(5, 20);
SET max_joined_block_size_rows = 1;
SET grace_hash_join_initial_buckets = 2;
SET join_algorithm = 'grace_hash';
SELECT sum(x), count() FROM t0 JOIN t1 USING x;