2022-06-18 00:25:26 +00:00
|
|
|
#pragma once
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
#include <Interpreters/Context_fwd.h>
|
2022-06-06 17:26:22 +00:00
|
|
|
#include <Interpreters/IJoin.h>
|
|
|
|
|
|
|
|
#include <Core/Block.h>
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
#include <Common/MultiVersion.h>
|
|
|
|
|
|
|
|
#include <mutex>
|
|
|
|
|
2022-06-06 17:26:22 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
class TableJoin;
|
2022-06-16 12:09:23 +00:00
|
|
|
class HashJoin;
|
2022-06-06 17:26:22 +00:00
|
|
|
|
|
|
|
class GraceHashJoin final : public IJoin
|
|
|
|
{
|
2022-06-16 12:09:23 +00:00
|
|
|
class FileBucket;
|
|
|
|
class DelayedBlocks;
|
2022-06-23 01:41:21 +00:00
|
|
|
class InMemoryJoin;
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
using Buckets = std::vector<std::shared_ptr<FileBucket>>;
|
|
|
|
using BucketsSnapshot = std::shared_ptr<const Buckets>;
|
2022-06-23 01:41:21 +00:00
|
|
|
using InMemoryJoinPtr = std::unique_ptr<InMemoryJoin>;
|
2022-06-16 12:09:23 +00:00
|
|
|
|
2022-06-06 17:26:22 +00:00
|
|
|
public:
|
2022-06-16 12:09:23 +00:00
|
|
|
GraceHashJoin(
|
|
|
|
ContextPtr context_, std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_ = false);
|
|
|
|
|
2022-06-11 11:03:44 +00:00
|
|
|
const TableJoin & getTableJoin() const override { return *table_join; }
|
2022-06-06 17:26:22 +00:00
|
|
|
|
|
|
|
bool addJoinedBlock(const Block & block, bool check_limits) override;
|
|
|
|
void checkTypesOfKeys(const Block & block) const override;
|
2022-06-16 12:09:23 +00:00
|
|
|
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
|
2022-06-06 17:26:22 +00:00
|
|
|
|
|
|
|
void setTotals(const Block & block) override { totals = block; }
|
|
|
|
const Block & getTotals() const override { return totals; }
|
|
|
|
|
|
|
|
size_t getTotalRowCount() const override;
|
|
|
|
size_t getTotalByteCount() const override;
|
|
|
|
bool alwaysReturnsEmptySet() const override;
|
|
|
|
|
2022-06-23 01:41:21 +00:00
|
|
|
bool supportParallelJoin() const override { return true; }
|
|
|
|
|
2022-06-06 17:26:22 +00:00
|
|
|
std::shared_ptr<NotJoinedBlocks>
|
|
|
|
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Open iterator over joined blocks.
|
|
|
|
/// Must be called after all @joinBlock calls.
|
2022-06-17 17:36:24 +00:00
|
|
|
std::unique_ptr<IDelayedJoinedBlocksStream> getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor) override;
|
2022-06-16 12:09:23 +00:00
|
|
|
|
2022-06-06 17:26:22 +00:00
|
|
|
private:
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Split block into multiple shards by hash.
|
|
|
|
template <bool right>
|
|
|
|
Blocks scatterBlock(const Block & block, size_t shards) const;
|
|
|
|
|
|
|
|
/// Create empty join for in-memory processing.
|
|
|
|
InMemoryJoinPtr makeInMemoryJoin();
|
|
|
|
/// Read right table blocks from @bucket to the @join. Calls @rehash on overflow.
|
|
|
|
void fillInMemoryJoin(InMemoryJoinPtr & join, FileBucket * bucket);
|
|
|
|
/// Add right table block to the @join. Calls @rehash on overflow.
|
|
|
|
void addJoinedBlockImpl(InMemoryJoinPtr & join, size_t bucket_index, const Block & block);
|
|
|
|
/// Rebuild @join after rehash: scatter the blocks in join and write parts that belongs to the other shards to disk.
|
2022-06-17 17:36:24 +00:00
|
|
|
void rehashInMemoryJoin(InMemoryJoinPtr & join, const BucketsSnapshot & snapshot, size_t bucket);
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Check that @join satisifes limits on rows/bytes in @table_join.
|
|
|
|
bool fitsInMemory(InMemoryJoin * join) const;
|
|
|
|
|
|
|
|
/// Create new bucket at the end of @destination.
|
|
|
|
void addBucket(Buckets & destination, const FileBucket * parent);
|
|
|
|
/// Read and join left table block. Called by DelayedBlocks itself (see @DelayedBlocks::next).
|
|
|
|
Block joinNextBlockInBucket(DelayedBlocks & iterator);
|
|
|
|
|
|
|
|
/// Increase number of buckets to match desired_size.
|
|
|
|
/// Called when HashJoin in-memory table for one bucket exceeds the limits.
|
|
|
|
BucketsSnapshot rehash(size_t desired_size);
|
|
|
|
/// Perform some bookkeeping after all calls to @joinBlock.
|
|
|
|
void startReadingDelayedBlocks();
|
|
|
|
|
|
|
|
Poco::Logger * log;
|
|
|
|
ContextPtr context;
|
2022-06-06 17:26:22 +00:00
|
|
|
std::shared_ptr<TableJoin> table_join;
|
2022-06-16 12:09:23 +00:00
|
|
|
Block right_sample_block;
|
|
|
|
bool any_take_last_row;
|
|
|
|
size_t initial_num_buckets;
|
|
|
|
size_t max_num_buckets;
|
|
|
|
|
|
|
|
InMemoryJoinPtr first_bucket;
|
2022-06-23 01:41:21 +00:00
|
|
|
std::mutex first_bucket_mutex;
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
MultiVersion<Buckets> buckets;
|
|
|
|
std::mutex rehash_mutex;
|
|
|
|
std::atomic<bool> started_reading_delayed_blocks{false};
|
2022-06-06 17:26:22 +00:00
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
Block totals;
|
2022-06-06 17:26:22 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|