mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
debug
This commit is contained in:
parent
97f76d712c
commit
179a7ce202
@ -385,11 +385,23 @@ GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
|
||||
|
||||
void GraceHashJoin::addBucket(Buckets & destination)
|
||||
{
|
||||
auto & left_file = tmp_data->createStream(left_sample_block);
|
||||
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
|
||||
// There could be exceptions from createStream, In ci tests
|
||||
// there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability.
|
||||
// It may terminate this thread and leave a broken hash_join, and another thread cores when it tries to
|
||||
// use the broken hash_join. So we print an exception message here to help debug.
|
||||
try
|
||||
{
|
||||
auto & left_file = tmp_data->createStream(left_sample_block);
|
||||
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
|
||||
|
||||
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
|
||||
destination.emplace_back(std::move(new_bucket));
|
||||
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
|
||||
destination.emplace_back(std::move(new_bucket));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_ERROR(&Poco::Logger::get("GraceHashJoin"), "Can't create bucket. current buckets size: {}", destination.size());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
|
||||
@ -626,7 +638,11 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
|
||||
if (current_block.rows() > 0)
|
||||
{
|
||||
std::lock_guard lock(hash_join_mutex);
|
||||
|
||||
auto current_buckets = getCurrentBuckets();
|
||||
if (!isPowerOf2(current_buckets.size())) [[unlikely]]
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Broken buckets. its size({}) is not power of 2", current_buckets.size());
|
||||
}
|
||||
if (!hash_join)
|
||||
hash_join = makeInMemoryJoin();
|
||||
|
||||
@ -637,11 +653,11 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
|
||||
|
||||
current_block = {};
|
||||
|
||||
// Must use the latest buckets snapshot in case that it has been rehashed by other threads.
|
||||
buckets_snapshot = rehashBuckets(current_buckets.size() * 2);
|
||||
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
|
||||
hash_join = nullptr;
|
||||
|
||||
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);
|
||||
|
||||
{
|
||||
Blocks current_blocks;
|
||||
current_blocks.reserve(right_blocks.size());
|
||||
|
@ -38,6 +38,9 @@ def get_options(i, upgrade_check):
|
||||
client_options.append("join_algorithm='partial_merge'")
|
||||
if join_alg_num % 5 == 2:
|
||||
client_options.append("join_algorithm='full_sorting_merge'")
|
||||
if join_alg_num % 5 == 3 and not upgrade_check:
|
||||
# Some crashes are not fixed in 23.2 yet, so ignore the setting in Upgrade check
|
||||
client_options.append("join_algorithm='grace_hash'")
|
||||
if join_alg_num % 5 == 4:
|
||||
client_options.append("join_algorithm='auto'")
|
||||
client_options.append("max_rows_in_join=1000")
|
||||
|
Loading…
Reference in New Issue
Block a user