fixing join data was released

This commit is contained in:
vdimir 2023-01-23 12:34:36 +00:00
parent e30ab0874b
commit dac86d48d2
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
2 changed files with 18 additions and 9 deletions

View File

@ -394,7 +394,7 @@ void GraceHashJoin::addBucket(Buckets & destination)
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
{
assert(hash_join);
chassert(hash_join);
return hash_join->checkTypesOfKeys(block);
}
@ -447,7 +447,7 @@ size_t GraceHashJoin::getTotalRowCount() const
size_t GraceHashJoin::getTotalByteCount() const
{
std::lock_guard lock(hash_join_mutex);
assert(hash_join);
chassert(hash_join);
return hash_join->getTotalByteCount();
}
@ -461,9 +461,14 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
std::shared_lock lock(rehash_mutex);
return std::all_of(buckets.begin(), buckets.end(), [](const auto & bucket) { return bucket->empty(); });
}();
bool hash_join_is_empty = hash_join && hash_join->alwaysReturnsEmptySet();
return hash_join_is_empty && file_buckets_are_empty;
if (!file_buckets_are_empty)
return false;
chassert(hash_join);
bool hash_join_is_empty = hash_join->alwaysReturnsEmptySet();
return hash_join_is_empty;
}
IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
@ -621,6 +626,9 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
{
std::lock_guard lock(hash_join_mutex);
if (!hash_join)
hash_join = makeInMemoryJoin();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
if (!hasMemoryOverflow(hash_join))
@ -629,6 +637,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;
buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);

View File

@ -221,8 +221,8 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, right_sample_block(right_sample_block_)
, log(&Poco::Logger::get("HashJoin"))
{
LOG_DEBUG(log, "Datatype: {}, kind: {}, strictness: {}, right header: {}", data->type, kind, strictness, right_sample_block.dumpStructure());
LOG_DEBUG(log, "Keys: {}", TableJoin::formatClauses(table_join->getClauses(), true));
LOG_DEBUG(log, "({}) Datatype: {}, kind: {}, strictness: {}, right header: {}", fmt::ptr(this), data->type, kind, strictness, right_sample_block.dumpStructure());
LOG_DEBUG(log, "({}) Keys: {}", fmt::ptr(this), TableJoin::formatClauses(table_join->getClauses(), true));
if (isCrossOrComma(kind))
{
@ -1774,10 +1774,10 @@ HashJoin::~HashJoin()
{
if (!data)
{
LOG_TRACE(log, "Join data has been released");
LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this));
return;
}
LOG_TRACE(log, "Join data is being destroyed, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount());
LOG_TRACE(log, "({}) Join data is being destroyed, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
}
template <typename Mapped>
@ -2058,7 +2058,7 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
{
LOG_TRACE(log, "Join data is being released, {} bytes and {} rows in hash table", getTotalByteCount(), getTotalRowCount());
LOG_TRACE(log, "({}) Join data is being released, {} bytes and {} rows in hash table", fmt::ptr(this), getTotalByteCount(), getTotalRowCount());
BlocksList right_blocks = std::move(data->blocks);
if (!restructure)