ClickHouse/src/Interpreters/GraceHashJoin.h

138 lines
5.5 KiB
C++
Raw Normal View History

2022-06-18 00:25:26 +00:00
#pragma once
2022-06-16 12:09:23 +00:00
#include <Interpreters/Context_fwd.h>
2022-06-06 17:26:22 +00:00
#include <Interpreters/IJoin.h>
#include <Core/Block.h>
2022-06-16 12:09:23 +00:00
#include <Common/MultiVersion.h>
#include <mutex>
2022-06-06 17:26:22 +00:00
namespace DB
{
class TableJoin;
2022-06-16 12:09:23 +00:00
class HashJoin;
2022-06-06 17:26:22 +00:00
2022-06-23 16:54:04 +00:00
/**
* Efficient and highly parallel implementation of external memory JOIN based on HashJoin.
* Supports most of the JOIN modes, except CROSS and ASOF.
*
* The joining algorithm consists of three stages:
*
* 1) During the first stage we accumulate blocks of the right table via @addJoinedBlock.
* Each input block is split into multiple buckets based on the hash of the row join keys.
* The first bucket is added to the in-memory HashJoin, and the remaining buckets are written to disk for further processing.
* When the size of HashJoin exceeds the limits, we double the number of buckets.
* There can be multiple threads calling addJoinedBlock, just like @ConcurrentHashJoin.
*
* 2) At the second stage we process left table blocks via @joinBlock.
* Again, each input block is split into multiple buckets by hash.
* The first bucket is joined in-memory via HashJoin::joinBlock, and the remaining buckets are written to the disk.
*
* 3) When the last thread reading left table block finishes, the last stage begins.
* Each @DelayedJoiningBlocksProcessor calls repeatedly @getDelayedBlocks until there are no more unfinished buckets left.
* Inside @getDelayedBlocks we select the next not processed bucket, load right table blocks from disk into in-memory HashJoin,
* And then join them with left table blocks.
*
* After joining the left table blocks, we can load non-joined rows from the right talble for RIGHT/FULL JOINs.
* Note that non-joined rows are processed in multiple threads, unlike HashJoin/ConcurrentHashJoin/MergeJoin.
*/
2022-06-06 17:26:22 +00:00
class GraceHashJoin final : public IJoin
{
2022-06-16 12:09:23 +00:00
class FileBucket;
class DelayedBlocks;
class InMemoryJoin;
2022-06-16 12:09:23 +00:00
using Buckets = std::vector<std::shared_ptr<FileBucket>>;
using BucketsSnapshot = std::shared_ptr<const Buckets>;
using InMemoryJoinPtr = std::unique_ptr<InMemoryJoin>;
2022-06-16 12:09:23 +00:00
2022-06-06 17:26:22 +00:00
public:
2022-06-16 12:09:23 +00:00
GraceHashJoin(
ContextPtr context_, std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_ = false);
2022-06-23 01:45:44 +00:00
~GraceHashJoin() override;
2022-06-11 11:03:44 +00:00
const TableJoin & getTableJoin() const override { return *table_join; }
2022-06-06 17:26:22 +00:00
bool addJoinedBlock(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
2022-06-16 12:09:23 +00:00
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
2022-06-06 17:26:22 +00:00
2022-06-23 05:41:45 +00:00
void setTotals(const Block & block) override;
const Block & getTotals() const override;
2022-06-06 17:26:22 +00:00
size_t getTotalRowCount() const override;
size_t getTotalByteCount() const override;
bool alwaysReturnsEmptySet() const override;
bool supportParallelJoin() const override { return true; }
2022-06-06 17:26:22 +00:00
std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
2022-06-16 12:09:23 +00:00
/// Open iterator over joined blocks.
/// Must be called after all @joinBlock calls.
2022-06-17 17:36:24 +00:00
std::unique_ptr<IDelayedJoinedBlocksStream> getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor) override;
2022-06-16 12:09:23 +00:00
2022-06-06 17:26:22 +00:00
private:
2022-06-23 15:52:30 +00:00
/// Check that the current join kind is supported.
/// Throw exception if it is not.
void checkJoinKind();
2022-06-16 12:09:23 +00:00
/// Split block into multiple shards by hash.
template <bool right>
Blocks scatterBlock(const Block & block, size_t shards) const;
/// Create empty join for in-memory processing.
InMemoryJoinPtr makeInMemoryJoin();
/// Read right table blocks from @bucket to the @join. Calls @rehash on overflow.
void fillInMemoryJoin(InMemoryJoinPtr & join, FileBucket * bucket);
/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block);
/// Rebuild @join after rehash: scatter the blocks in join and write parts that belongs to the other shards to disk.
2022-06-17 17:36:24 +00:00
void rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket);
2022-06-16 12:09:23 +00:00
/// Check that @join satisifes limits on rows/bytes in @table_join.
bool fitsInMemory(InMemoryJoin * join) const;
/// Create new bucket at the end of @destination.
void addBucket(Buckets & destination, const FileBucket * parent);
/// Read and join left table block. Called by DelayedBlocks itself (see @DelayedBlocks::next).
Block joinNextBlockInBucket(DelayedBlocks & iterator);
/// Increase number of buckets to match desired_size.
/// Called when HashJoin in-memory table for one bucket exceeds the limits.
2022-06-23 16:54:04 +00:00
///
/// NB: after @rehash there may be rows that are written to the buckets that they do not belong to.
/// It is fine; these rows will be written to the corresponding buckets during the third stage.
2022-06-16 12:09:23 +00:00
BucketsSnapshot rehash(size_t desired_size);
/// Perform some bookkeeping after all calls to @joinBlock.
void startReadingDelayedBlocks();
Poco::Logger * log;
ContextPtr context;
2022-06-06 17:26:22 +00:00
std::shared_ptr<TableJoin> table_join;
std::atomic<bool> need_left_sample_block{true};
Block left_sample_block;
2022-06-16 12:09:23 +00:00
Block right_sample_block;
Block output_sample_block;
2022-06-16 12:09:23 +00:00
bool any_take_last_row;
size_t initial_num_buckets;
size_t max_num_buckets;
size_t max_block_size;
2022-06-16 12:09:23 +00:00
InMemoryJoinPtr first_bucket;
std::mutex first_bucket_mutex;
2022-06-16 12:09:23 +00:00
MultiVersion<Buckets> buckets;
std::mutex rehash_mutex;
std::atomic<bool> started_reading_delayed_blocks{false};
2022-06-06 17:26:22 +00:00
2022-06-16 12:09:23 +00:00
Block totals;
2022-06-23 05:41:45 +00:00
mutable std::mutex totals_mutex;
2022-06-06 17:26:22 +00:00
};
}