ClickHouse/src/Interpreters/GraceHashJoin.cpp

734 lines
22 KiB
C++
Raw Normal View History

2022-06-06 17:26:22 +00:00
#include <Interpreters/GraceHashJoin.h>
2022-06-11 11:03:44 +00:00
#include <Interpreters/HashJoin.h>
2022-06-16 12:09:23 +00:00
#include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h>
2022-06-16 12:09:23 +00:00
#include <Formats/NativeWriter.h>
#include <Formats/TemporaryFileStream.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/ProtocolDefines.h>
2022-06-16 12:09:23 +00:00
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Common/logger_useful.h>
#include <Common/thread_local_rng.h>
2022-06-16 12:09:23 +00:00
#include <base/FnTraits.h>
2022-06-16 12:09:23 +00:00
#include <fmt/format.h>
2022-06-06 17:26:22 +00:00
namespace DB
{
2022-06-16 12:09:23 +00:00
namespace ErrorCodes
2022-06-11 11:03:44 +00:00
{
2022-06-19 19:18:37 +00:00
extern const int LIMIT_EXCEEDED;
2022-06-18 00:25:26 +00:00
extern const int LOGICAL_ERROR;
2022-06-23 15:52:30 +00:00
extern const int NOT_IMPLEMENTED;
2022-06-16 12:09:23 +00:00
}
namespace
{
class FileBlockReader
{
public:
explicit FileBlockReader(const TemporaryFileOnDisk & file, const Block & header)
: file_reader{file.getDisk()->readFile(file.getPath())}
, compressed_reader{*file_reader}
, block_reader{compressed_reader, header, DBMS_TCP_PROTOCOL_VERSION}
2022-06-16 12:09:23 +00:00
{
}
Block read() { return block_reader.read(); }
private:
std::unique_ptr<ReadBufferFromFileBase> file_reader;
CompressedReadBuffer compressed_reader;
NativeReader block_reader;
};
using FileBlockReaderPtr = std::unique_ptr<FileBlockReader>;
class BlocksAccumulator
{
public:
explicit BlocksAccumulator(size_t desired_block_size_) : desired_block_size(desired_block_size_) { }
void addBlock(Block block)
{
sum_size += block.rows();
blocks.push_back(block);
}
Block peek()
{
if (sum_size < desired_block_size)
return {};
return flush();
}
Block flush()
{
if (blocks.empty())
return {};
Block result = concatenateBlocks(blocks);
blocks = {};
sum_size = 0;
return result;
}
private:
const size_t desired_block_size;
size_t sum_size = 0;
Blocks blocks;
};
class MergingBlockReader
{
public:
explicit MergingBlockReader(FileBlockReaderPtr reader_, size_t desired_block_size = DEFAULT_BLOCK_SIZE * 8)
: reader{std::move(reader_)}, accumulator{desired_block_size}
{
}
Block read()
{
if (eof)
return {};
Block res;
while (!(res = accumulator.peek()))
{
Block tmp = reader->read();
if (!tmp)
{
eof = true;
return accumulator.flush();
}
accumulator.addBlock(std::move(tmp));
}
return res;
}
private:
FileBlockReaderPtr reader;
BlocksAccumulator accumulator;
bool eof = false;
};
2022-06-16 12:09:23 +00:00
class FileBlockWriter
{
static std::string buildTemporaryFilePrefix(ContextPtr context, JoinTableSide side, size_t index)
2022-06-16 12:09:23 +00:00
{
std::string_view suffix = side == JoinTableSide::Left ? "left" : "right";
return fmt::format("tmp_{}_gracejoinbuf_{}_{}_", context->getCurrentQueryId(), suffix, index);
2022-06-16 12:09:23 +00:00
}
static CompressionCodecPtr getCompressionCodec(TableJoin & join)
{
return CompressionCodecFactory::instance().get(join.temporaryFilesCodec(), std::nullopt);
}
2022-06-16 12:09:23 +00:00
public:
explicit FileBlockWriter(ContextPtr context, TableJoin & join, JoinTableSide side, size_t index)
: disk{join.getTemporaryVolume()->getDisk()}
, file{disk, buildTemporaryFilePrefix(context, side, index)}
, file_writer{disk->writeFile(file.getPath(), context->getSettingsRef().grace_hash_join_buffer_size)}
, compressed_writer{*file_writer, getCompressionCodec(join), context->getSettingsRef().grace_hash_join_buffer_size}
2022-06-16 12:09:23 +00:00
{
}
void write(const Block & block)
{
if (finished.load())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing to finished temporary file");
if (!output)
reset(block);
output->write(block);
++num_blocks;
}
void finalize()
{
if (finished.exchange(true))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Flushing already finished temporary file");
compressed_writer.finalize();
file_writer->finalize();
}
MergingBlockReader makeReader() const
2022-06-16 12:09:23 +00:00
{
if (!finished.load())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Reading not finished file");
return MergingBlockReader{std::make_unique<FileBlockReader>(file, header)};
2022-06-16 12:09:23 +00:00
}
size_t numBlocks() const { return num_blocks; }
private:
void reset(const Block & sample)
{
header = sample.cloneEmpty();
output.emplace(compressed_writer, DBMS_TCP_PROTOCOL_VERSION, header);
2022-06-16 12:09:23 +00:00
}
Block header;
DiskPtr disk;
TemporaryFileOnDisk file;
std::unique_ptr<WriteBufferFromFileBase> file_writer;
CompressedWriteBuffer compressed_writer;
std::optional<NativeWriter> output;
std::atomic<bool> finished{false};
size_t num_blocks = 0;
};
std::deque<size_t> generateRandomPermutation(size_t from, size_t to)
{
size_t size = to - from;
std::deque<size_t> indices(size);
std::iota(indices.begin(), indices.end(), from);
std::shuffle(indices.begin(), indices.end(), thread_local_rng);
return indices;
}
// Try to apply @callback in the order specified in @indices
// Until it returns true for each index in the @indices.
void retryForEach(std::deque<size_t> indices, Fn<bool(size_t)> auto callback)
{
while (!indices.empty())
{
size_t bucket = indices.front();
indices.pop_front();
if (!callback(bucket))
indices.push_back(bucket);
}
}
2022-06-11 11:03:44 +00:00
}
2022-06-16 12:09:23 +00:00
class GraceHashJoin::FileBucket
{
enum class State : int
{
WRITING_BLOCKS,
JOINING_BLOCKS,
FINISHED,
2022-06-17 17:36:24 +00:00
ANY,
2022-06-16 12:09:23 +00:00
};
public:
explicit FileBucket(ContextPtr context_, TableJoin & join, size_t bucket_index_, const FileBucket * parent_)
: bucket_index{bucket_index_}
, left_file{context_, join, JoinTableSide::Left, bucket_index}
, right_file{context_, join, JoinTableSide::Right, bucket_index}
2022-06-16 12:09:23 +00:00
, parent{parent_}
, state{State::WRITING_BLOCKS}
{
}
void addLeftBlock(const Block & block) { return addBlockImpl(block, left_file_mutex, left_file); }
2022-06-23 02:31:37 +00:00
void addRightBlock(const Block & block) { return addBlockImpl(block, right_file_mutex, right_file); }
bool tryAddLeftBlock(const Block & block) { return tryAddBlockImpl(block, left_file_mutex, left_file); }
2022-06-23 02:31:37 +00:00
bool tryAddRightBlock(const Block & block) { return tryAddBlockImpl(block, right_file_mutex, right_file); }
2022-06-16 12:09:23 +00:00
void startJoining()
{
ensureState(State::JOINING_BLOCKS);
left_file.finalize();
right_file.finalize();
}
size_t index() const { return bucket_index; }
bool finished() const { return state.load() == State::FINISHED; }
2022-06-17 17:36:24 +00:00
bool empty() const { return right_file.numBlocks() == 0 && left_file.numBlocks() == 0; }
2022-06-16 12:09:23 +00:00
bool tryLockForJoining()
{
if (parent && !parent->finished())
return false;
State expected = State::WRITING_BLOCKS;
return state.compare_exchange_strong(expected, State::JOINING_BLOCKS);
}
2022-06-17 17:36:24 +00:00
void finish() { transition(State::JOINING_BLOCKS, State::FINISHED); }
MergingBlockReader openLeftTableReader() const { return left_file.makeReader(); }
2022-06-16 12:09:23 +00:00
MergingBlockReader openRightTableReader() const { return right_file.makeReader(); }
2022-06-16 12:09:23 +00:00
std::mutex & joinMutex() { return join_mutex; }
2022-06-23 02:31:37 +00:00
2022-06-16 12:09:23 +00:00
private:
bool tryAddBlockImpl(const Block & block, std::mutex & mutex, FileBlockWriter & writer)
{
ensureState(State::WRITING_BLOCKS);
std::unique_lock lock{mutex, std::try_to_lock};
if (!lock.owns_lock())
{
return false;
}
writer.write(block);
return true;
}
void addBlockImpl(const Block & block, std::mutex & mutex, FileBlockWriter & writer)
{
ensureState(State::WRITING_BLOCKS);
std::unique_lock lock{mutex};
writer.write(block);
}
2022-06-17 17:36:24 +00:00
void transition(State expected, State desired)
{
State prev = state.exchange(desired);
if (expected != State::ANY && prev != expected)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid state transition");
}
2022-06-16 12:09:23 +00:00
void ensureState(State expected)
{
if (state.load() != expected)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid state transition");
}
size_t bucket_index;
FileBlockWriter left_file;
FileBlockWriter right_file;
std::mutex left_file_mutex;
std::mutex right_file_mutex;
2022-06-23 02:31:37 +00:00
std::mutex join_mutex; /// Protects external in-memory join
2022-06-16 12:09:23 +00:00
const FileBucket * parent;
std::atomic<State> state;
};
class GraceHashJoin::InMemoryJoin : public HashJoin
{
public:
using HashJoin::HashJoin;
};
2022-06-16 12:09:23 +00:00
GraceHashJoin::GraceHashJoin(
2022-06-23 15:02:54 +00:00
ContextPtr context_, std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_)
2022-06-16 12:09:23 +00:00
: log{&Poco::Logger::get("GraceHashJoin")}
, context{context_}
, table_join{std::move(table_join_)}
, right_sample_block{right_sample_block_}
, any_take_last_row{any_take_last_row_}
, initial_num_buckets{context->getSettingsRef().grace_hash_join_initial_buckets}
, max_num_buckets{context->getSettingsRef().grace_hash_join_max_buckets}
, max_block_size{context->getSettingsRef().max_block_size}
2022-06-16 12:09:23 +00:00
, first_bucket{makeInMemoryJoin()}
{
checkJoinKind();
2022-06-16 12:09:23 +00:00
initial_num_buckets = roundUpToPowerOfTwoOrZero(initial_num_buckets);
auto tmp = std::make_unique<Buckets>();
for (size_t i = 0; i < initial_num_buckets; ++i)
{
addBucket(*tmp, nullptr);
}
buckets.set(std::move(tmp));
LOG_TRACE(log, "Initialize {} buckets", initial_num_buckets);
}
2022-09-07 15:00:15 +00:00
bool GraceHashJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
{
const auto & kind = table_join->kind();
bool is_asof = (table_join->strictness() == JoinStrictness::Asof);
bool is_right_or_full = isRight(kind) || isFull(kind);
return !is_right_or_full && !is_asof && !isCrossOrComma(kind) && table_join->oneDisjunct();
}
void GraceHashJoin::checkJoinKind()
{
switch (table_join->kind())
{
case JoinKind::Inner:
case JoinKind::Left:
case JoinKind::Right:
case JoinKind::Full:
break;
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not supported. GraceHashJoin supports only INNER/LEFT/RIGHT/FULL join variants");
}
switch (table_join->strictness())
{
case JoinStrictness::RightAny:
case JoinStrictness::All:
case JoinStrictness::Any:
case JoinStrictness::Semi:
case JoinStrictness::Anti:
break;
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not supported. GraceHashJoin supports only ALL/ANY/SEMI/ANTI join strictness");
}
}
2022-06-23 01:44:16 +00:00
GraceHashJoin::~GraceHashJoin() = default;
2022-06-16 12:09:23 +00:00
bool GraceHashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
{
2022-06-17 21:17:05 +00:00
Block materialized = materializeBlock(block);
addJoinedBlockImpl(first_bucket, 0, materialized);
2022-06-16 12:09:23 +00:00
return true;
}
2022-06-17 17:36:24 +00:00
void GraceHashJoin::rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket)
2022-06-16 12:09:23 +00:00
{
InMemoryJoinPtr prev = std::move(join);
auto right_blocks = std::move(*prev).releaseJoinedBlocks();
join = makeInMemoryJoin();
for (const Block & block : right_blocks)
{
Blocks blocks = scatterBlock<true>(block, snapshot->size());
join->addJoinedBlock(blocks[bucket], /*check_limits=*/false);
for (size_t i = 1; i < snapshot->size(); ++i)
{
2022-06-16 18:36:49 +00:00
if (i != bucket && blocks[i].rows())
2022-06-16 12:09:23 +00:00
snapshot->at(i)->addRightBlock(blocks[i]);
}
}
}
bool GraceHashJoin::fitsInMemory(InMemoryJoin * join) const
{
return table_join->sizeLimits().softCheck(join->getTotalRowCount(), join->getTotalByteCount());
}
GraceHashJoin::BucketsSnapshot GraceHashJoin::rehash(size_t desired_size)
{
desired_size = roundUpToPowerOfTwoOrZero(desired_size);
std::scoped_lock lock{rehash_mutex};
BucketsSnapshot snapshot = buckets.get();
size_t current_size = snapshot->size();
2022-06-16 18:36:49 +00:00
LOG_TRACE(log, "Rehashing from {} to {}", current_size, desired_size);
2022-06-16 12:09:23 +00:00
if (current_size >= desired_size)
return snapshot;
auto next_snapshot = std::make_unique<Buckets>(*snapshot);
size_t next_size = std::max(current_size * 2, desired_size);
if (next_size > max_num_buckets)
{
throw Exception(
ErrorCodes::LIMIT_EXCEEDED, "Too many grace hash join buckets, consider increasing max_rows_in_join/max_bytes_in_join");
2022-06-16 12:09:23 +00:00
}
next_snapshot->reserve(next_size);
while (next_snapshot->size() < next_size)
{
current_size = next_snapshot->size();
for (size_t i = 0; i < current_size; ++i)
{
if (i == 0)
addBucket(*next_snapshot, nullptr);
else
addBucket(*next_snapshot, next_snapshot->at(i).get());
}
}
buckets.set(std::move(next_snapshot));
return buckets.get();
2022-06-11 11:03:44 +00:00
}
2022-06-16 12:09:23 +00:00
void GraceHashJoin::startReadingDelayedBlocks()
{
// Drop in-memory hash join for the first bucket to reduce memory footprint.
first_bucket.reset();
}
void GraceHashJoin::addBucket(Buckets & destination, const FileBucket * parent)
{
size_t index = destination.size();
destination.emplace_back(std::make_unique<FileBucket>(context, *table_join, index, parent));
}
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
{
assert(first_bucket);
2022-06-11 11:03:44 +00:00
return first_bucket->checkTypesOfKeys(block);
}
2022-06-16 12:09:23 +00:00
void GraceHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /*not_processed*/)
{
if (need_left_sample_block.exchange(false))
{
left_sample_block = block.cloneEmpty();
output_sample_block = block.cloneEmpty();
ExtraBlockPtr not_processed;
first_bucket->joinBlock(output_sample_block, not_processed);
}
2022-06-17 17:36:24 +00:00
if (block.rows() == 0)
{
2022-06-16 18:36:49 +00:00
ExtraBlockPtr not_processed;
first_bucket->joinBlock(block, not_processed);
return;
}
2022-06-16 12:09:23 +00:00
2022-06-17 21:17:05 +00:00
materializeBlockInplace(block);
2022-06-16 12:09:23 +00:00
auto snapshot = buckets.get();
2022-06-16 18:36:49 +00:00
auto blocks = scatterBlock<false>(block, snapshot->size());
2022-06-16 12:09:23 +00:00
ExtraBlockPtr not_processed;
2022-06-16 18:36:49 +00:00
first_bucket->joinBlock(blocks[0], not_processed);
block = std::move(blocks[0]);
2022-06-16 12:09:23 +00:00
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type");
// We need to skip the first bucket that is already joined in memory, so we start with 1.
auto indices = generateRandomPermutation(1, snapshot->size());
retryForEach(
indices,
[&](size_t bucket)
2022-06-16 12:09:23 +00:00
{
Block & block_shard = blocks[bucket];
if (block_shard.rows() == 0)
return true;
2022-06-23 01:51:34 +00:00
return snapshot->at(bucket)->tryAddLeftBlock(block_shard);
});
2022-06-16 12:09:23 +00:00
}
2022-06-23 05:41:45 +00:00
void GraceHashJoin::setTotals(const Block & block)
{
if (block)
{
std::scoped_lock guard{totals_mutex};
totals = block;
}
}
const Block & GraceHashJoin::getTotals() const
{
return totals;
}
2022-06-16 12:09:23 +00:00
size_t GraceHashJoin::getTotalRowCount() const
{
assert(first_bucket);
return first_bucket->getTotalRowCount();
}
size_t GraceHashJoin::getTotalByteCount() const
{
assert(first_bucket);
return first_bucket->getTotalByteCount();
}
bool GraceHashJoin::alwaysReturnsEmptySet() const
{
auto snapshot = buckets.get();
bool file_buckets_are_empty = std::all_of(snapshot->begin(), snapshot->end(), [](const auto & bucket) { return bucket->empty(); });
bool first_bucket_is_empty = first_bucket && first_bucket->alwaysReturnsEmptySet();
return isInnerOrRight(table_join->kind()) && first_bucket_is_empty && file_buckets_are_empty;
2022-06-16 12:09:23 +00:00
}
std::shared_ptr<NotJoinedBlocks> GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
{
/// We do no support returning non joined blocks here.
/// They will be reported by getDelayedBlocks instead.
return nullptr;
2022-06-16 12:09:23 +00:00
}
class GraceHashJoin::DelayedBlocks : public IDelayedJoinedBlocksStream
{
public:
explicit DelayedBlocks(GraceHashJoin * parent_, FileBucket * bucket_, InMemoryJoinPtr join_)
: parent{parent_}, bucket{bucket_}, left_reader{bucket->openLeftTableReader()}, join{std::move(join_)}
{
}
Block next() override
{
Block result = parent->joinNextBlockInBucket(*this);
if (result)
return result;
if (process_not_joined)
{
not_joined_blocks = join->getNonJoinedBlocks(parent->left_sample_block, parent->output_sample_block, parent->max_block_size);
process_not_joined = false;
}
if (not_joined_blocks)
return not_joined_blocks->read();
return {};
}
2022-06-16 12:09:23 +00:00
GraceHashJoin * parent;
FileBucket * bucket;
MergingBlockReader left_reader;
2022-06-16 12:09:23 +00:00
InMemoryJoinPtr join;
bool process_not_joined = true;
std::shared_ptr<NotJoinedBlocks> not_joined_blocks;
2022-06-16 12:09:23 +00:00
};
2022-06-17 17:36:24 +00:00
std::unique_ptr<IDelayedJoinedBlocksStream> GraceHashJoin::getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor)
2022-06-16 12:09:23 +00:00
{
2022-06-17 17:36:24 +00:00
if (prev_cursor)
{
assert_cast<DelayedBlocks *>(prev_cursor)->bucket->finish();
}
2022-06-16 18:36:49 +00:00
if (!started_reading_delayed_blocks.exchange(true))
2022-06-16 12:09:23 +00:00
{
startReadingDelayedBlocks();
}
auto snapshot = buckets.get();
for (size_t i = 1; i < snapshot->size(); ++i)
{
2022-06-16 18:36:49 +00:00
FileBucket * bucket = snapshot->at(i).get();
2022-06-17 17:36:24 +00:00
if (!bucket->tryLockForJoining())
{
continue;
}
if (bucket->empty())
{
bucket->finish();
}
else
2022-06-16 12:09:23 +00:00
{
InMemoryJoinPtr join = makeInMemoryJoin();
fillInMemoryJoin(join, bucket);
return std::make_unique<DelayedBlocks>(this, bucket, std::move(join));
}
}
2022-06-16 12:29:29 +00:00
2022-06-17 23:43:37 +00:00
// NB: this logic is a bit racy. There can be more buckets in the @snapshot in case of rehashing in different thread reading delayed blocks.
2022-06-16 12:29:29 +00:00
// But it's ok to finish current thread: the thread that called rehash() will join the rest of the blocks.
2022-06-16 12:09:23 +00:00
return nullptr;
}
Block GraceHashJoin::joinNextBlockInBucket(DelayedBlocks & iterator)
{
2022-06-16 18:36:49 +00:00
Block block;
2022-06-17 17:36:24 +00:00
do
{
2022-06-16 18:36:49 +00:00
block = iterator.left_reader.read();
if (!block) // EOF
return block;
BucketsSnapshot snapshot = buckets.get();
Blocks blocks = scatterBlock<false>(block, snapshot->size());
block.clear();
// We need to filter out blocks that were written to the current bucket B0,
// But then virtually moved to another bucket B1 in rehash().
// Note that B1 is waiting for current bucket B0 to be processed
// (via @parent field, see @FileBucket::tryLockForJoining),
// So it is safe to add blocks to it.
for (size_t i = 0; i < snapshot->size(); ++i)
{
if (!blocks[i].rows()) // No rows with that hash modulo
continue;
if (i == iterator.bucket->index()) // Rows that are still in our bucket
block = std::move(blocks[i]);
else // Rows that were moved after rehashing
snapshot->at(i)->addLeftBlock(blocks[i]);
}
} while (block.rows() == 0);
2022-06-16 12:09:23 +00:00
ExtraBlockPtr not_processed;
iterator.join->joinBlock(block, not_processed);
if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type");
return block;
}
std::unique_ptr<GraceHashJoin::InMemoryJoin> GraceHashJoin::makeInMemoryJoin()
2022-06-16 12:09:23 +00:00
{
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
2022-06-16 12:09:23 +00:00
}
void GraceHashJoin::fillInMemoryJoin(InMemoryJoinPtr & join, FileBucket * bucket)
{
bucket->startJoining();
auto reader = bucket->openRightTableReader();
while (auto block = reader.read())
{
addJoinedBlockImpl(join, bucket->index(), block);
}
}
void GraceHashJoin::addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block)
{
BucketsSnapshot snapshot = buckets.get();
Blocks blocks = scatterBlock<true>(block, snapshot->size());
// Add block to the in-memory join
2022-06-17 17:36:24 +00:00
{
2022-06-23 03:38:33 +00:00
auto bucket = snapshot->at(bucket_index);
std::scoped_lock guard{bucket->joinMutex()};
join->addJoinedBlock(blocks[bucket_index], /*check_limits=*/false);
2022-06-17 17:36:24 +00:00
// We need to rebuild block without bucket_index part in case of overflow.
bool overflow = !fitsInMemory(join.get());
Block to_write;
if (overflow)
{
blocks.erase(blocks.begin() + bucket_index);
to_write = concatenateBlocks(blocks);
}
2022-06-16 12:09:23 +00:00
while (overflow)
{
snapshot = rehash(snapshot->size() * 2);
rehashInMemoryJoin(join, snapshot, bucket_index);
blocks = scatterBlock<true>(to_write, snapshot->size());
overflow = !fitsInMemory(join.get());
}
2022-06-16 12:09:23 +00:00
}
if (blocks.empty())
// All blocks were added to the @join
return;
// Write the rest of the blocks to the disk buckets
assert(blocks.size() == snapshot->size());
auto indices = generateRandomPermutation(1, snapshot->size());
retryForEach(
indices,
[&](size_t bucket)
{
if (bucket == bucket_index || !blocks[bucket].rows())
return true;
return snapshot->at(bucket)->tryAddRightBlock(blocks[bucket]);
});
2022-06-16 12:09:23 +00:00
}
template <bool right>
Blocks GraceHashJoin::scatterBlock(const Block & block, size_t shards) const
{
2022-06-17 17:36:24 +00:00
if (!block)
{
return {};
}
2022-06-16 12:09:23 +00:00
const Names & key_names = [](const TableJoin::JoinOnClause & clause) -> auto &
{
return right ? clause.key_names_right : clause.key_names_left;
}
(table_join->getOnlyClause());
return JoinCommon::scatterBlockByHash(key_names, block, shards);
2022-06-11 11:03:44 +00:00
}
2022-06-06 17:26:22 +00:00
}