feat: Temporarily enable grace hash join for all supported queries

This commit is contained in:
Sergey Skvortsov 2022-06-18 01:38:18 +03:00
parent ef2302910e
commit 29b6be9a00
5 changed files with 31 additions and 9 deletions

View File

@ -1045,9 +1045,13 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
if (analyzed_join->tryInitDictJoin(sample_block, context))
return std::make_shared<HashJoin>(analyzed_join, sample_block);
if (analyzed_join->forceGraceHashJoin())
return std::make_shared<GraceHashJoin>(context, analyzed_join, sample_block);
bool allow_merge_join = analyzed_join->allowMergeJoin();
auto make_merge_join = [analyzed_join, sample_block] { return std::make_shared<MergeJoin>(analyzed_join, sample_block); };
if (analyzed_join->allowGraceHashJoin())
{
return std::make_shared<GraceHashJoin>(context, analyzed_join, sample_block);
}
if (analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join))
{
if (analyzed_join->allowParallelHashJoin())
@ -1057,8 +1061,8 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> ana
return std::make_shared<HashJoin>(analyzed_join, sample_block);
}
else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join))
return std::make_shared<MergeJoin>(analyzed_join, sample_block);
return std::make_shared<JoinSwitcher>(analyzed_join, sample_block);
return make_merge_join();
return std::make_shared<JoinSwitcher>(analyzed_join, sample_block, make_merge_join);
}
static std::unique_ptr<QueryPlan> buildJoinedPlan(

View File

@ -1,17 +1,17 @@
#include <Common/typeid_cast.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/join_common.h>
namespace DB
{
JoinSwitcher::JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_)
JoinSwitcher::JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, OnDiskJoinFactory factory)
: limits(table_join_->sizeLimits())
, switched(false)
, table_join(table_join_)
, right_sample_block(right_sample_block_.cloneEmpty())
, make_on_disk_join(std::move(factory))
{
join = std::make_shared<HashJoin>(table_join, right_sample_block);
@ -44,7 +44,7 @@ void JoinSwitcher::switchJoin()
BlocksList right_blocks = std::move(hash_join).releaseJoinedBlocks();
/// Destroy old join & create new one.
join = std::make_shared<MergeJoin>(table_join, right_sample_block);
join = make_on_disk_join();
for (const Block & saved_block : right_blocks)
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <mutex>
#include <functional>
#include <Core/Block.h>
#include <Interpreters/IJoin.h>
@ -12,11 +13,13 @@ namespace DB
/// Used when setting 'join_algorithm' set to JoinAlgorithm::AUTO.
/// Starts JOIN with join-in-memory algorithm and switches to join-on-disk on the fly if there's no memory to place right table.
/// Current join-in-memory and join-on-disk are JoinAlgorithm::HASH and JoinAlgorithm::PARTIAL_MERGE joins respectively.
/// Current join-in-memory and join-on-disk are JoinAlgorithm::HASH and JoinAlgorithm::PARTIAL_MERGE/JoinAlgorithm::GRACE_HASH joins respectively.
class JoinSwitcher : public IJoin
{
public:
JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_);
using OnDiskJoinFactory = std::function<JoinPtr()>;
JoinSwitcher(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, OnDiskJoinFactory factory);
const TableJoin & getTableJoin() const override { return *table_join; }
@ -66,6 +69,11 @@ public:
return join->getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size);
}
std::unique_ptr<IDelayedJoinedBlocksStream> getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor) override
{
return join->getDelayedBlocks(prev_cursor);
}
private:
JoinPtr join;
SizeLimits limits;
@ -73,6 +81,7 @@ private:
mutable std::mutex switch_mutex;
std::shared_ptr<TableJoin> table_join;
const Block right_sample_block;
OnDiskJoinFactory make_on_disk_join;
/// Change join-in-memory to join-on-disk moving right hand JOIN data from one to another.
/// Throws an error if join-on-disk do not support JOIN kind or strictness.

View File

@ -413,6 +413,14 @@ bool TableJoin::allowMergeJoin() const
return (all_join || special_left) && oneDisjunct();
}
bool TableJoin::allowGraceHashJoin() const
{
bool is_asof = (strictness() == ASTTableJoin::Strictness::Asof);
bool is_right_or_full = isRight(kind()) || isFull(kind());
return !is_right_or_full && !is_asof && !isCrossOrComma(kind()) && oneDisjunct();
}
bool TableJoin::needStreamWithNonJoinedRows() const
{
if (strictness() == ASTTableJoin::Strictness::Asof ||

View File

@ -189,6 +189,7 @@ public:
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool allowGraceHashJoin() const;
bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; }
bool forceMergeJoin() const { return join_algorithm == JoinAlgorithm::PARTIAL_MERGE; }
bool forceGraceHashJoin() const { return join_algorithm == JoinAlgorithm::GRACE_HASH; }