fix: Cleanup

This commit is contained in:
Sergey Skvortsov 2022-06-18 02:43:37 +03:00
parent 520dd637df
commit d3e7cb093b

View File

@ -227,7 +227,6 @@ GraceHashJoin::GraceHashJoin(
bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
{
LOG_TRACE(log, "addJoinedBlock(block: {} rows)", block.rows());
Block materialized = materializeBlock(block);
addJoinedBlockImpl(first_bucket, 0, materialized);
return true;
@ -314,8 +313,6 @@ void GraceHashJoin::checkTypesOfKeys(const Block & block) const
void GraceHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /*not_processed*/)
{
LOG_TRACE(log, "JoinBlock: {}", block.dumpStructure());
if (block.rows() == 0)
{
ExtraBlockPtr not_processed;
@ -377,19 +374,13 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
std::shared_ptr<NotJoinedBlocks> GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
{
auto snapshot = buckets.get();
size_t unfinished = 0;
for (size_t i = 1; i < snapshot->size(); ++i)
{
if (!snapshot->at(i)->finished())
{
LOG_ERROR(log, "Bucket {} is not finished", i);
++unfinished;
}
}
if (unfinished > 0)
{
LOG_ERROR(log, "Total {} unfinished buckets", unfinished);
}
if (!JoinCommon::hasNonJoinedBlocks(*table_join))
return nullptr;
@ -439,14 +430,13 @@ std::unique_ptr<IDelayedJoinedBlocksStream> GraceHashJoin::getDelayedBlocks(IDel
}
else
{
LOG_TRACE(log, "getDelayedBlocks: start joining bucket {}", bucket->index());
InMemoryJoinPtr join = makeInMemoryJoin();
fillInMemoryJoin(join, bucket);
return std::make_unique<DelayedBlocks>(this, bucket, std::move(join));
}
}
// NB: this logic is a bit racy. Now can be more buckets in the @snapshot in case of rehashing in different thread reading delayed blocks.
// NB: this logic is a bit racy. There can be more buckets in the @snapshot in case of rehashing in different thread reading delayed blocks.
// But it's ok to finish current thread: the thread that called rehash() will join the rest of the blocks.
return nullptr;
}