2022-06-18 00:25:26 +00:00
|
|
|
#pragma once
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
#include <Interpreters/Context_fwd.h>
|
2022-06-06 17:26:22 +00:00
|
|
|
#include <Interpreters/IJoin.h>
|
2022-09-30 11:07:49 +00:00
|
|
|
#include <Interpreters/TemporaryDataOnDisk.h>
|
2022-06-06 17:26:22 +00:00
|
|
|
|
|
|
|
#include <Core/Block.h>
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
#include <Common/MultiVersion.h>
|
|
|
|
|
|
|
|
#include <mutex>
|
|
|
|
|
2022-06-06 17:26:22 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
class TableJoin;
|
2022-06-16 12:09:23 +00:00
|
|
|
class HashJoin;
|
2022-06-06 17:26:22 +00:00
|
|
|
|
2022-06-23 16:54:04 +00:00
|
|
|
/**
|
|
|
|
* Efficient and highly parallel implementation of external memory JOIN based on HashJoin.
|
|
|
|
* Supports most of the JOIN modes, except CROSS and ASOF.
|
|
|
|
*
|
|
|
|
* The joining algorithm consists of three stages:
|
|
|
|
*
|
|
|
|
* 1) During the first stage we accumulate blocks of the right table via @addJoinedBlock.
|
|
|
|
* Each input block is split into multiple buckets based on the hash of the row join keys.
|
|
|
|
* The first bucket is added to the in-memory HashJoin, and the remaining buckets are written to disk for further processing.
|
|
|
|
* When the size of HashJoin exceeds the limits, we double the number of buckets.
|
|
|
|
* There can be multiple threads calling addJoinedBlock, just like @ConcurrentHashJoin.
|
|
|
|
*
|
|
|
|
* 2) At the second stage we process left table blocks via @joinBlock.
|
|
|
|
* Again, each input block is split into multiple buckets by hash.
|
|
|
|
* The first bucket is joined in-memory via HashJoin::joinBlock, and the remaining buckets are written to the disk.
|
|
|
|
*
|
|
|
|
* 3) When the last thread reading left table block finishes, the last stage begins.
|
|
|
|
* Each @DelayedJoiningBlocksProcessor calls repeatedly @getDelayedBlocks until there are no more unfinished buckets left.
|
|
|
|
* Inside @getDelayedBlocks we select the next not processed bucket, load right table blocks from disk into in-memory HashJoin,
|
|
|
|
* And then join them with left table blocks.
|
|
|
|
*
|
2022-09-27 12:33:09 +00:00
|
|
|
* After joining the left table blocks, we can load non-joined rows from the right table for RIGHT/FULL JOINs.
|
2022-06-23 16:54:04 +00:00
|
|
|
* Note that non-joined rows are processed in multiple threads, unlike HashJoin/ConcurrentHashJoin/MergeJoin.
|
|
|
|
*/
|
2022-06-06 17:26:22 +00:00
|
|
|
class GraceHashJoin final : public IJoin
|
|
|
|
{
|
2022-06-16 12:09:23 +00:00
|
|
|
class FileBucket;
|
|
|
|
class DelayedBlocks;
|
2022-10-05 12:40:32 +00:00
|
|
|
using InMemoryJoin = HashJoin;
|
2022-06-23 01:41:21 +00:00
|
|
|
|
2022-10-05 12:40:32 +00:00
|
|
|
using InMemoryJoinPtr = std::shared_ptr<InMemoryJoin>;
|
|
|
|
|
|
|
|
public:
|
2022-10-04 08:20:13 +00:00
|
|
|
using BucketPtr = std::shared_ptr<FileBucket>;
|
|
|
|
using Buckets = std::vector<BucketPtr>;
|
2022-06-16 12:09:23 +00:00
|
|
|
|
|
|
|
GraceHashJoin(
|
2022-09-30 11:07:49 +00:00
|
|
|
ContextPtr context_, std::shared_ptr<TableJoin> table_join_,
|
|
|
|
const Block & left_sample_block_, const Block & right_sample_block_,
|
|
|
|
TemporaryDataOnDiskScopePtr tmp_data_,
|
|
|
|
bool any_take_last_row_ = false);
|
2022-06-16 12:09:23 +00:00
|
|
|
|
2022-06-23 01:45:44 +00:00
|
|
|
~GraceHashJoin() override;
|
|
|
|
|
2022-06-11 11:03:44 +00:00
|
|
|
const TableJoin & getTableJoin() const override { return *table_join; }
|
2022-06-06 17:26:22 +00:00
|
|
|
|
|
|
|
bool addJoinedBlock(const Block & block, bool check_limits) override;
|
|
|
|
void checkTypesOfKeys(const Block & block) const override;
|
2022-06-16 12:09:23 +00:00
|
|
|
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
|
2022-06-06 17:26:22 +00:00
|
|
|
|
2022-06-23 05:41:45 +00:00
|
|
|
void setTotals(const Block & block) override;
|
2022-06-06 17:26:22 +00:00
|
|
|
|
|
|
|
size_t getTotalRowCount() const override;
|
|
|
|
size_t getTotalByteCount() const override;
|
|
|
|
bool alwaysReturnsEmptySet() const override;
|
|
|
|
|
2022-06-23 01:41:21 +00:00
|
|
|
bool supportParallelJoin() const override { return true; }
|
|
|
|
|
2022-10-18 11:43:01 +00:00
|
|
|
IBlocksStreamPtr
|
2022-06-06 17:26:22 +00:00
|
|
|
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Open iterator over joined blocks.
|
|
|
|
/// Must be called after all @joinBlock calls.
|
2022-10-18 11:43:01 +00:00
|
|
|
IBlocksStreamPtr getDelayedBlocks() override;
|
2022-10-06 17:47:40 +00:00
|
|
|
bool hasDelayedBlocks() const override { return true; }
|
2022-06-16 12:09:23 +00:00
|
|
|
|
2022-09-15 12:14:27 +00:00
|
|
|
static bool isSupported(const std::shared_ptr<TableJoin> & table_join);
|
2022-09-07 15:00:15 +00:00
|
|
|
|
2022-06-06 17:26:22 +00:00
|
|
|
private:
|
2022-10-06 14:26:56 +00:00
|
|
|
void initBuckets();
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Create empty join for in-memory processing.
|
|
|
|
InMemoryJoinPtr makeInMemoryJoin();
|
2022-10-05 12:40:32 +00:00
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Add right table block to the @join. Calls @rehash on overflow.
|
2022-10-05 12:40:32 +00:00
|
|
|
void addJoinedBlockImpl(Block block);
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Check that @join satisifes limits on rows/bytes in @table_join.
|
2022-10-05 12:40:32 +00:00
|
|
|
bool fitsInMemory() const;
|
2022-06-16 12:09:23 +00:00
|
|
|
|
|
|
|
/// Create new bucket at the end of @destination.
|
2022-10-05 12:40:32 +00:00
|
|
|
void addBucket(Buckets & destination);
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Increase number of buckets to match desired_size.
|
|
|
|
/// Called when HashJoin in-memory table for one bucket exceeds the limits.
|
2022-06-23 16:54:04 +00:00
|
|
|
///
|
2022-09-30 11:54:40 +00:00
|
|
|
/// NB: after @rehashBuckets there may be rows that are written to the buckets that they do not belong to.
|
2022-06-23 16:54:04 +00:00
|
|
|
/// It is fine; these rows will be written to the corresponding buckets during the third stage.
|
2022-10-05 12:40:32 +00:00
|
|
|
Buckets rehashBuckets(size_t to_size);
|
2022-10-04 08:20:13 +00:00
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
/// Perform some bookkeeping after all calls to @joinBlock.
|
|
|
|
void startReadingDelayedBlocks();
|
|
|
|
|
2022-09-30 11:54:40 +00:00
|
|
|
size_t getNumBuckets() const;
|
|
|
|
Buckets getCurrentBuckets() const;
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
Poco::Logger * log;
|
|
|
|
ContextPtr context;
|
2022-06-06 17:26:22 +00:00
|
|
|
std::shared_ptr<TableJoin> table_join;
|
2022-06-23 14:41:11 +00:00
|
|
|
std::atomic<bool> need_left_sample_block{true};
|
|
|
|
Block left_sample_block;
|
2022-06-16 12:09:23 +00:00
|
|
|
Block right_sample_block;
|
2022-06-23 14:41:11 +00:00
|
|
|
Block output_sample_block;
|
2022-06-16 12:09:23 +00:00
|
|
|
bool any_take_last_row;
|
|
|
|
size_t max_num_buckets;
|
2022-06-23 14:41:11 +00:00
|
|
|
size_t max_block_size;
|
2022-06-16 12:09:23 +00:00
|
|
|
|
2022-10-04 08:20:13 +00:00
|
|
|
Names left_key_names;
|
|
|
|
Names right_key_names;
|
|
|
|
|
2022-09-30 11:07:49 +00:00
|
|
|
TemporaryDataOnDiskPtr tmp_data;
|
2022-09-30 11:54:40 +00:00
|
|
|
|
|
|
|
Buckets buckets;
|
2022-10-04 08:20:13 +00:00
|
|
|
mutable std::shared_mutex rehash_mutex;
|
2022-09-30 11:54:40 +00:00
|
|
|
|
2022-10-05 12:40:32 +00:00
|
|
|
FileBucket * current_bucket = nullptr;
|
|
|
|
mutable std::mutex current_bucket_mutex;
|
|
|
|
|
|
|
|
InMemoryJoinPtr hash_join;
|
|
|
|
mutable std::mutex hash_join_mutex;
|
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
std::atomic<bool> started_reading_delayed_blocks{false};
|
2022-06-06 17:26:22 +00:00
|
|
|
|
2022-06-16 12:09:23 +00:00
|
|
|
Block totals;
|
2022-06-23 05:41:45 +00:00
|
|
|
mutable std::mutex totals_mutex;
|
2022-06-06 17:26:22 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|