From 29b6be9a004403b54b28be4014f7a664310ad0c0 Mon Sep 17 00:00:00 2001 From: Sergey Skvortsov Date: Sat, 18 Jun 2022 01:38:18 +0300 Subject: [PATCH] feat: Temporarily enable grace hash join for all supported queries --- src/Interpreters/ExpressionAnalyzer.cpp | 12 ++++++++---- src/Interpreters/JoinSwitcher.cpp | 6 +++--- src/Interpreters/JoinSwitcher.h | 13 +++++++++++-- src/Interpreters/TableJoin.cpp | 8 ++++++++ src/Interpreters/TableJoin.h | 1 + 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 1d4b556084f..929ce15b012 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1045,9 +1045,13 @@ static std::shared_ptr chooseJoinAlgorithm(std::shared_ptr ana if (analyzed_join->tryInitDictJoin(sample_block, context)) return std::make_shared(analyzed_join, sample_block); - if (analyzed_join->forceGraceHashJoin()) - return std::make_shared(context, analyzed_join, sample_block); bool allow_merge_join = analyzed_join->allowMergeJoin(); + auto make_merge_join = [analyzed_join, sample_block] { return std::make_shared(analyzed_join, sample_block); }; + + if (analyzed_join->allowGraceHashJoin()) + { + return std::make_shared(context, analyzed_join, sample_block); + } if (analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join)) { if (analyzed_join->allowParallelHashJoin()) @@ -1057,8 +1061,8 @@ static std::shared_ptr chooseJoinAlgorithm(std::shared_ptr ana return std::make_shared(analyzed_join, sample_block); } else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join)) - return std::make_shared(analyzed_join, sample_block); - return std::make_shared(analyzed_join, sample_block); + return make_merge_join(); + return std::make_shared(analyzed_join, sample_block, make_merge_join); } static std::unique_ptr buildJoinedPlan( diff --git a/src/Interpreters/JoinSwitcher.cpp b/src/Interpreters/JoinSwitcher.cpp index b5c51b8273c..570b25dc52e 100644 --- a/src/Interpreters/JoinSwitcher.cpp +++ b/src/Interpreters/JoinSwitcher.cpp @@ -1,17 +1,17 @@ #include #include #include -#include #include namespace DB { -JoinSwitcher::JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_) +JoinSwitcher::JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_, OnDiskJoinFactory factory) : limits(table_join_->sizeLimits()) , switched(false) , table_join(table_join_) , right_sample_block(right_sample_block_.cloneEmpty()) + , make_on_disk_join(std::move(factory)) { join = std::make_shared(table_join, right_sample_block); @@ -44,7 +44,7 @@ void JoinSwitcher::switchJoin() BlocksList right_blocks = std::move(hash_join).releaseJoinedBlocks(); /// Destroy old join & create new one. - join = std::make_shared(table_join, right_sample_block); + join = make_on_disk_join(); for (const Block & saved_block : right_blocks) { diff --git a/src/Interpreters/JoinSwitcher.h b/src/Interpreters/JoinSwitcher.h index 30115710e22..32e3fe44f77 100644 --- a/src/Interpreters/JoinSwitcher.h +++ b/src/Interpreters/JoinSwitcher.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -12,11 +13,13 @@ namespace DB /// Used when setting 'join_algorithm' set to JoinAlgorithm::AUTO. /// Starts JOIN with join-in-memory algorithm and switches to join-on-disk on the fly if there's no memory to place right table. -/// Current join-in-memory and join-on-disk are JoinAlgorithm::HASH and JoinAlgorithm::PARTIAL_MERGE joins respectively. +/// Current join-in-memory and join-on-disk are JoinAlgorithm::HASH and JoinAlgorithm::PARTIAL_MERGE/JoinAlgorithm::GRACE_HASH joins respectively. class JoinSwitcher : public IJoin { public: - JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_); + using OnDiskJoinFactory = std::function; + + JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_, OnDiskJoinFactory factory); const TableJoin & getTableJoin() const override { return *table_join; } @@ -66,6 +69,11 @@ public: return join->getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size); } + std::unique_ptr getDelayedBlocks(IDelayedJoinedBlocksStream * prev_cursor) override + { + return join->getDelayedBlocks(prev_cursor); + } + private: JoinPtr join; SizeLimits limits; @@ -73,6 +81,7 @@ private: mutable std::mutex switch_mutex; std::shared_ptr table_join; const Block right_sample_block; + OnDiskJoinFactory make_on_disk_join; /// Change join-in-memory to join-on-disk moving right hand JOIN data from one to another. /// Throws an error if join-on-disk do not support JOIN kind or strictness. diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index 87502e5965e..77e67c60402 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -413,6 +413,14 @@ bool TableJoin::allowMergeJoin() const return (all_join || special_left) && oneDisjunct(); } +bool TableJoin::allowGraceHashJoin() const +{ + bool is_asof = (strictness() == ASTTableJoin::Strictness::Asof); + bool is_right_or_full = isRight(kind()) || isFull(kind()); + + return !is_right_or_full && !is_asof && !isCrossOrComma(kind()) && oneDisjunct(); +} + bool TableJoin::needStreamWithNonJoinedRows() const { if (strictness() == ASTTableJoin::Strictness::Asof || diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 3f4ab79d1c1..4f449013432 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -189,6 +189,7 @@ public: const SizeLimits & sizeLimits() const { return size_limits; } VolumePtr getTemporaryVolume() { return tmp_volume; } bool allowMergeJoin() const; + bool allowGraceHashJoin() const; bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; } bool forceMergeJoin() const { return join_algorithm == JoinAlgorithm::PARTIAL_MERGE; } bool forceGraceHashJoin() const { return join_algorithm == JoinAlgorithm::GRACE_HASH; }