Merge pull request #51737 from ClickHouse/fix_logical_error_grace_hash_join

Fix: logical error in grace hash join
This commit is contained in:
robot-clickhouse-ci-2 2023-07-06 19:27:03 +02:00 committed by GitHub
commit 99c64971bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 41 deletions

View File

@ -288,10 +288,7 @@ void GraceHashJoin::initBuckets()
size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp<size_t>(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets));
for (size_t i = 0; i < initial_num_buckets; ++i)
{
addBucket(buckets);
}
addBuckets(initial_num_buckets);
if (buckets.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");
@ -356,52 +353,66 @@ bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const
return hasMemoryOverflow(total_rows, total_bytes);
}
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
GraceHashJoin::Buckets GraceHashJoin::rehashBuckets()
{
std::unique_lock lock(rehash_mutex);
if (!isPowerOf2(buckets.size())) [[unlikely]]
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of buckets should be power of 2 but it's {}", buckets.size());
const size_t to_size = buckets.size() * 2;
size_t current_size = buckets.size();
if (to_size <= current_size)
return buckets;
chassert(isPowerOf2(to_size));
if (to_size > max_num_buckets)
{
throw Exception(ErrorCodes::LIMIT_EXCEEDED,
throw Exception(
ErrorCodes::LIMIT_EXCEEDED,
"Too many grace hash join buckets ({} > {}), "
"consider increasing grace_hash_join_max_buckets or max_rows_in_join/max_bytes_in_join",
to_size, max_num_buckets);
to_size,
max_num_buckets);
}
LOG_TRACE(log, "Rehashing from {} to {}", current_size, to_size);
buckets.reserve(to_size);
for (size_t i = current_size; i < to_size; ++i)
addBucket(buckets);
addBuckets(to_size - current_size);
return buckets;
}
void GraceHashJoin::addBucket(Buckets & destination)
void GraceHashJoin::addBuckets(const size_t bucket_count)
{
// There could be exceptions from createStream, In ci tests
// there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability.
// It may terminate this thread and leave a broken hash_join, and another thread cores when it tries to
// use the broken hash_join. So we print an exception message here to help debug.
try
{
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
// Exception can be thrown in number of cases:
// - during creation of temporary files for buckets
// - in CI tests, there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability
// Therefore, new buckets are added only after all of them created successfully,
// otherwise we can end up having unexpected number of buckets
BucketPtr new_bucket = std::make_shared<FileBucket>(destination.size(), left_file, right_file, log);
destination.emplace_back(std::move(new_bucket));
}
catch (...)
{
LOG_ERROR(&Poco::Logger::get("GraceHashJoin"), "Can't create bucket. current buckets size: {}", destination.size());
throw;
}
const size_t current_size = buckets.size();
Buckets tmp_buckets;
tmp_buckets.reserve(bucket_count);
for (size_t i = 0; i < bucket_count; ++i)
try
{
auto & left_file = tmp_data->createStream(left_sample_block);
auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, left_file, right_file, log);
tmp_buckets.emplace_back(std::move(new_bucket));
}
catch (...)
{
LOG_ERROR(
&Poco::Logger::get("GraceHashJoin"),
"Can't create bucket {} due to error: {}",
current_size + i,
getCurrentExceptionMessage(false));
throw;
}
buckets.reserve(buckets.size() + bucket_count);
for (auto & bucket : tmp_buckets)
buckets.emplace_back(std::move(bucket));
}
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
@ -638,11 +649,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (current_block.rows() > 0)
{
std::lock_guard lock(hash_join_mutex);
auto current_buckets = getCurrentBuckets();
if (!isPowerOf2(current_buckets.size())) [[unlikely]]
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Broken buckets. its size({}) is not power of 2", current_buckets.size());
}
if (!hash_join)
hash_join = makeInMemoryJoin();
@ -654,7 +660,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};
// Must use the latest buckets snapshot in case that it has been rehashed by other threads.
buckets_snapshot = rehashBuckets(current_buckets.size() * 2);
buckets_snapshot = rehashBuckets();
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;

View File

@ -101,15 +101,16 @@ private:
bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const;
bool hasMemoryOverflow(const BlocksList & blocks) const;
/// Create new bucket at the end of @destination.
void addBucket(Buckets & destination);
/// Add bucket_count new buckets
/// Throws if a bucket creation fails
void addBuckets(size_t bucket_count);
/// Increase number of buckets to match desired_size.
/// Called when HashJoin in-memory table for one bucket exceeds the limits.
///
/// NB: after @rehashBuckets there may be rows that are written to the buckets that they do not belong to.
/// It is fine; these rows will be written to the corresponding buckets during the third stage.
Buckets rehashBuckets(size_t to_size);
Buckets rehashBuckets();
/// Perform some bookkeeping after all calls to @joinBlock.
void startReadingDelayedBlocks();