mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
Merge pull request #72325 from ClickHouse/backport/24.9/72233
Backport #72233 to 24.9: Fix race in `GraceHashJoin`
This commit is contained in:
commit
fb9d68f864
@ -13,6 +13,7 @@
|
|||||||
#include <Core/Settings.h>
|
#include <Core/Settings.h>
|
||||||
|
|
||||||
#include <numeric>
|
#include <numeric>
|
||||||
|
#include <shared_mutex>
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
|
|
||||||
|
|
||||||
@ -533,6 +534,7 @@ public:
|
|||||||
Block nextImpl() override
|
Block nextImpl() override
|
||||||
{
|
{
|
||||||
ExtraBlockPtr not_processed = nullptr;
|
ExtraBlockPtr not_processed = nullptr;
|
||||||
|
std::shared_lock shared(eof_mutex);
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(extra_block_mutex);
|
std::lock_guard lock(extra_block_mutex);
|
||||||
@ -566,7 +568,24 @@ public:
|
|||||||
block = left_reader.read();
|
block = left_reader.read();
|
||||||
if (!block)
|
if (!block)
|
||||||
{
|
{
|
||||||
return {};
|
shared.unlock();
|
||||||
|
bool there_are_still_might_be_rows_to_process = false;
|
||||||
|
{
|
||||||
|
/// The following race condition could happen without this mutex:
|
||||||
|
/// * we're called from `IBlocksStream::next()`
|
||||||
|
/// * another thread just read the last block from `left_reader` and now is in the process of or about to call `joinBlock()`
|
||||||
|
/// * it might be that `joinBlock()` will leave some rows in the `not_processed`
|
||||||
|
/// * but if the current thread will return now an empty block `finished` will be set to true in `IBlocksStream::next()` and
|
||||||
|
/// these not processed rows will be lost
|
||||||
|
/// So we shouldn't finish execution while there is at least one in-flight `joinBlock()` call. Let's wait until we're alone
|
||||||
|
/// and double check if there are any not processed rows left.
|
||||||
|
std::unique_lock exclusive(eof_mutex);
|
||||||
|
|
||||||
|
std::lock_guard lock(extra_block_mutex);
|
||||||
|
if (!not_processed_blocks.empty())
|
||||||
|
there_are_still_might_be_rows_to_process = true;
|
||||||
|
}
|
||||||
|
return there_are_still_might_be_rows_to_process ? nextImpl() : Block();
|
||||||
}
|
}
|
||||||
|
|
||||||
// block comes from left_reader, need to join with right table to get the result.
|
// block comes from left_reader, need to join with right table to get the result.
|
||||||
@ -601,7 +620,7 @@ public:
|
|||||||
return block;
|
return block;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t current_bucket;
|
const size_t current_bucket;
|
||||||
Buckets buckets;
|
Buckets buckets;
|
||||||
InMemoryJoinPtr hash_join;
|
InMemoryJoinPtr hash_join;
|
||||||
|
|
||||||
@ -612,6 +631,8 @@ public:
|
|||||||
|
|
||||||
std::mutex extra_block_mutex;
|
std::mutex extra_block_mutex;
|
||||||
std::list<ExtraBlockPtr> not_processed_blocks TSA_GUARDED_BY(extra_block_mutex);
|
std::list<ExtraBlockPtr> not_processed_blocks TSA_GUARDED_BY(extra_block_mutex);
|
||||||
|
|
||||||
|
std::shared_mutex eof_mutex;
|
||||||
};
|
};
|
||||||
|
|
||||||
IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
|
IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
180 15
|
@ -0,0 +1,15 @@
|
|||||||
|
DROP TABLE IF EXISTS t0;
|
||||||
|
DROP TABLE IF EXISTS t1;
|
||||||
|
|
||||||
|
CREATE TABLE t0 (x UInt64) ENGINE = MergeTree ORDER BY x;
|
||||||
|
INSERT INTO t0 SELECT number from numbers(20);
|
||||||
|
|
||||||
|
CREATE TABLE t1 (x UInt64) ENGINE = MergeTree ORDER BY x;
|
||||||
|
INSERT INTO t1 SELECT number from numbers(5, 20);
|
||||||
|
|
||||||
|
SET max_joined_block_size_rows = 1;
|
||||||
|
SET grace_hash_join_initial_buckets = 2;
|
||||||
|
SET join_algorithm = 'grace_hash';
|
||||||
|
|
||||||
|
SELECT sum(x), count() FROM t0 JOIN t1 USING x;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user