From d2d4118730dcd846dd7889bbc0d0e056e1163e8f Mon Sep 17 00:00:00 2001 From: chertus Date: Tue, 11 Feb 2020 21:27:52 +0300 Subject: [PATCH 01/15] add JoinSwitcher --- dbms/src/Interpreters/AnalyzedJoin.cpp | 27 ++------ dbms/src/Interpreters/AnalyzedJoin.h | 4 +- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 4 +- dbms/src/Interpreters/JoinSwitcher.h | 72 ++++++++++++++++++++ 4 files changed, 81 insertions(+), 26 deletions(-) create mode 100644 dbms/src/Interpreters/JoinSwitcher.h diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index e7115816920..5d79ad71ae2 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -1,6 +1,4 @@ #include -#include -#include #include @@ -229,27 +227,14 @@ bool AnalyzedJoin::sameStrictnessAndKind(ASTTableJoin::Strictness strictness_, A return false; } -JoinPtr makeJoin(std::shared_ptr table_join, const Block & right_sample_block) +bool AnalyzedJoin::allowMergeJoin() const { - auto kind = table_join->kind(); - auto strictness = table_join->strictness(); + bool is_any = (strictness() == ASTTableJoin::Strictness::Any); + bool is_all = (strictness() == ASTTableJoin::Strictness::All); + bool is_semi = (strictness() == ASTTableJoin::Strictness::Semi); - bool is_any = (strictness == ASTTableJoin::Strictness::Any); - bool is_all = (strictness == ASTTableJoin::Strictness::All); - bool is_semi = (strictness == ASTTableJoin::Strictness::Semi); - - bool allow_merge_join = (isLeft(kind) && (is_any || is_all || is_semi)) || (isInner(kind) && is_all); - - if (table_join->partial_merge_join && allow_merge_join) - return std::make_shared(table_join, right_sample_block); - return std::make_shared(table_join, right_sample_block); -} - -bool isMergeJoin(const JoinPtr & join) -{ - if (join) - return typeid_cast(join.get()); - return false; + bool allow_merge_join = (isLeft(kind()) && (is_any || is_all || is_semi)) || (isInner(kind()) && is_all); + return allow_merge_join && partial_merge_join; } } diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index fe89b6f47ef..a96ea54d5fe 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -87,6 +87,7 @@ public: bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const; const SizeLimits & sizeLimits() const { return size_limits; } VolumePtr getTemporaryVolume() { return tmp_volume; } + bool allowMergeJoin() const; bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); } bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); } @@ -128,9 +129,6 @@ public: void setRightKeys(const Names & keys) { key_names_right = keys; } static bool sameJoin(const AnalyzedJoin * x, const AnalyzedJoin * y); - friend JoinPtr makeJoin(std::shared_ptr table_join, const Block & right_sample_block); }; -bool isMergeJoin(const JoinPtr &); - } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index f131afb86c6..c430e348e13 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -29,7 +29,7 @@ #include #include #include -#include +#include #include #include @@ -564,7 +564,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer /// TODO You do not need to set this up when JOIN is only needed on remote servers. subquery_for_join.setJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside - subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block); + subquery_for_join.join = std::make_shared(syntax->analyzed_join, subquery_for_join.sample_block); } return subquery_for_join.join; diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h new file mode 100644 index 00000000000..4c627bd7b8e --- /dev/null +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class JoinSwitcher : public IJoin +{ +public: + JoinSwitcher(std::shared_ptr table_join, const Block & right_sample_block) + { + if (table_join->allowMergeJoin()) + join = std::make_shared(table_join, right_sample_block); + else + join = std::make_shared(table_join, right_sample_block); + } + + bool addJoinedBlock(const Block & block) override + { + /// TODO: switch Join -> MergeJoin + return join->addJoinedBlock(block); + } + + void joinBlock(Block & block, std::shared_ptr & not_processed) override + { + join->joinBlock(block, not_processed); + } + + bool hasTotals() const override + { + return join->hasTotals(); + } + + void setTotals(const Block & block) override + { + join->setTotals(block); + } + + void joinTotals(Block & block) const override + { + join->joinTotals(block); + } + + size_t getTotalRowCount() const override + { + return join->getTotalRowCount(); + } + + bool alwaysReturnsEmptySet() const override + { + return join->alwaysReturnsEmptySet(); + } + + BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & block, UInt64 max_block_size) const override + { + return join->createStreamWithNonJoinedRows(block, max_block_size); + } + + bool hasStreamWithNonJoinedRows() const override + { + return join->hasStreamWithNonJoinedRows(); + } + +private: + JoinPtr join; +}; + +} From 4a658f4325a6e87de98ca07a00e8602359c03205 Mon Sep 17 00:00:00 2001 From: chertus Date: Mon, 17 Feb 2020 20:08:31 +0300 Subject: [PATCH 02/15] swtich HashJoin to MergeJoin if JOIN limit exceeded --- dbms/src/Interpreters/AnalyzedJoin.cpp | 2 +- dbms/src/Interpreters/AnalyzedJoin.h | 2 + dbms/src/Interpreters/ExpressionAnalyzer.cpp | 11 +++- dbms/src/Interpreters/IJoin.h | 3 +- dbms/src/Interpreters/Join.cpp | 5 +- dbms/src/Interpreters/Join.h | 9 ++- dbms/src/Interpreters/JoinSwitcher.cpp | 64 ++++++++++++++++++++ dbms/src/Interpreters/JoinSwitcher.h | 31 +++++++--- dbms/src/Interpreters/MergeJoin.cpp | 2 +- dbms/src/Interpreters/MergeJoin.h | 3 +- 10 files changed, 114 insertions(+), 18 deletions(-) create mode 100644 dbms/src/Interpreters/JoinSwitcher.cpp diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index 5d79ad71ae2..33876080860 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -234,7 +234,7 @@ bool AnalyzedJoin::allowMergeJoin() const bool is_semi = (strictness() == ASTTableJoin::Strictness::Semi); bool allow_merge_join = (isLeft(kind()) && (is_any || is_all || is_semi)) || (isInner(kind()) && is_all); - return allow_merge_join && partial_merge_join; + return allow_merge_join; } } diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index a96ea54d5fe..713ebc3b9d5 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -88,6 +88,8 @@ public: const SizeLimits & sizeLimits() const { return size_limits; } VolumePtr getTemporaryVolume() { return tmp_volume; } bool allowMergeJoin() const; + bool forceMergeJoin() const { return allowMergeJoin() && partial_merge_join; } + bool forceHashJoin() const { return !allowMergeJoin(); } bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); } bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index c430e348e13..a83d235a6f4 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -538,6 +538,15 @@ static ExpressionActionsPtr createJoinedBlockActions(const Context & context, co return ExpressionAnalyzer(expression_list, syntax_result, context).getActions(true, false); } +static std::shared_ptr makeJoin(std::shared_ptr analyzed_join, const Block & sample_block) +{ + if (analyzed_join->forceHashJoin()) + return std::make_shared(analyzed_join, sample_block); + else if (analyzed_join->forceMergeJoin()) + return std::make_shared(analyzed_join, sample_block); + return std::make_shared(analyzed_join, sample_block); +} + JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQueryElement & join_element) { /// Two JOINs are not supported with the same subquery, but different USINGs. @@ -564,7 +573,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer /// TODO You do not need to set this up when JOIN is only needed on remote servers. subquery_for_join.setJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside - subquery_for_join.join = std::make_shared(syntax->analyzed_join, subquery_for_join.sample_block); + subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block); } return subquery_for_join.join; diff --git a/dbms/src/Interpreters/IJoin.h b/dbms/src/Interpreters/IJoin.h index 6d290d143da..fb59e02544d 100644 --- a/dbms/src/Interpreters/IJoin.h +++ b/dbms/src/Interpreters/IJoin.h @@ -20,7 +20,7 @@ public: /// Add block of data from right hand of JOIN. /// @returns false, if some limit was exceeded and you should not insert more data. - virtual bool addJoinedBlock(const Block & block) = 0; + virtual bool addJoinedBlock(const Block & block, bool check_limits = true) = 0; /// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock). /// Could be called from different threads in parallel. @@ -31,6 +31,7 @@ public: virtual void joinTotals(Block & block) const = 0; virtual size_t getTotalRowCount() const = 0; + virtual size_t getTotalByteCount() const = 0; virtual bool alwaysReturnsEmptySet() const { return false; } virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 7e6f66f1f7f..89c20afc409 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -518,7 +518,7 @@ Block Join::structureRightBlock(const Block & block) const return structured_block; } -bool Join::addJoinedBlock(const Block & source_block) +bool Join::addJoinedBlock(const Block & source_block, bool check_limits) { if (empty()) throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR); @@ -565,6 +565,9 @@ bool Join::addJoinedBlock(const Block & source_block) if (save_nullmap) data->blocks_nullmaps.emplace_back(stored_block, null_map_holder); + if (!check_limits) + return true; + /// TODO: Do not calculate them every time total_rows = getTotalRowCount(); total_bytes = getTotalByteCount(); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 337c18a5980..1d513564d72 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -153,7 +153,7 @@ public: /** Add block of data from right hand of JOIN to the map. * Returns false, if some limit was exceeded and you should not insert more data. */ - bool addJoinedBlock(const Block & block) override; + bool addJoinedBlock(const Block & block, bool check_limits = true) override; /** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table. * Could be called from different threads in parallel. @@ -184,7 +184,7 @@ public: /// Number of keys in all built JOIN maps. size_t getTotalRowCount() const final; /// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools. - size_t getTotalByteCount() const; + size_t getTotalByteCount() const final; bool alwaysReturnsEmptySet() const final { return isInnerOrRight(getKind()) && data->empty; } @@ -320,6 +320,11 @@ public: data = join.data; } + std::shared_ptr getJoinedData() const + { + return data; + } + private: friend class NonJoinedBlockInputStream; friend class JoinBlockInputStream; diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp new file mode 100644 index 00000000000..3ded6124091 --- /dev/null +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -0,0 +1,64 @@ +#include +#include +#include + +namespace DB +{ + +static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, bool nullable) +{ + if (nullable) + JoinCommon::convertColumnToNullable(column); + else + JoinCommon::removeColumnNullability(column); + + return std::move(column); +} + +bool JoinSwitcher::addJoinedBlock(const Block & block, bool) +{ + /// Trying to make MergeJoin without lock + + if (switched) + return join->addJoinedBlock(block); + + std::lock_guard lock(switch_mutex); + + if (switched) + return join->addJoinedBlock(block); + + /// HashJoin with external limits check + + join->addJoinedBlock(block, false); + size_t rows = join->getTotalRowCount(); + size_t bytes = join->getTotalByteCount(); + + auto & limits = table_join->sizeLimits(); + if (!limits.softCheck(rows, bytes)) + switchJoin(); + + return true; +} + +void JoinSwitcher::switchJoin() +{ + std::shared_ptr joined_data = static_cast(*join).getJoinedData(); + BlocksList right_blocks = std::move(joined_data->blocks); + + /// Destroy old join & create new one. Destroy first in case of memory saving. + join = std::make_shared(table_join, right_sample_block); + + for (Block & block : right_blocks) + { + for (const auto & sample_column : right_sample_block) + { + auto & column = block.getByName(sample_column.name); + block.insert(correctNullability(std::move(column), sample_column.type->isNullable())); + join->addJoinedBlock(block); + } + } + + switched = true; +} + +} diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index 4c627bd7b8e..b2c60f698d0 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -1,5 +1,9 @@ #pragma once +#include +#include + +#include #include #include #include @@ -11,19 +15,15 @@ namespace DB class JoinSwitcher : public IJoin { public: - JoinSwitcher(std::shared_ptr table_join, const Block & right_sample_block) + JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_) + : switched(false) + , table_join(table_join_) + , right_sample_block(right_sample_block_.cloneEmpty()) { - if (table_join->allowMergeJoin()) - join = std::make_shared(table_join, right_sample_block); - else - join = std::make_shared(table_join, right_sample_block); + join = std::make_shared(table_join, right_sample_block); } - bool addJoinedBlock(const Block & block) override - { - /// TODO: switch Join -> MergeJoin - return join->addJoinedBlock(block); - } + bool addJoinedBlock(const Block & block, bool check_limits = true) override; void joinBlock(Block & block, std::shared_ptr & not_processed) override { @@ -50,6 +50,11 @@ public: return join->getTotalRowCount(); } + size_t getTotalByteCount() const override + { + return join->getTotalByteCount(); + } + bool alwaysReturnsEmptySet() const override { return join->alwaysReturnsEmptySet(); @@ -67,6 +72,12 @@ public: private: JoinPtr join; + bool switched; + mutable std::mutex switch_mutex; + std::shared_ptr table_join; + Block right_sample_block; + + void switchJoin(); }; } diff --git a/dbms/src/Interpreters/MergeJoin.cpp b/dbms/src/Interpreters/MergeJoin.cpp index 3f65782c4e1..2538863db85 100644 --- a/dbms/src/Interpreters/MergeJoin.cpp +++ b/dbms/src/Interpreters/MergeJoin.cpp @@ -585,7 +585,7 @@ bool MergeJoin::saveRightBlock(Block && block) return true; } -bool MergeJoin::addJoinedBlock(const Block & src_block) +bool MergeJoin::addJoinedBlock(const Block & src_block, bool) { Block block = materializeBlock(src_block); JoinCommon::removeLowCardinalityInplace(block); diff --git a/dbms/src/Interpreters/MergeJoin.h b/dbms/src/Interpreters/MergeJoin.h index 58b38b102bb..7d934aed06a 100644 --- a/dbms/src/Interpreters/MergeJoin.h +++ b/dbms/src/Interpreters/MergeJoin.h @@ -50,12 +50,13 @@ class MergeJoin : public IJoin public: MergeJoin(std::shared_ptr table_join_, const Block & right_sample_block); - bool addJoinedBlock(const Block & block) override; + bool addJoinedBlock(const Block & block, bool check_limits = true) override; void joinBlock(Block &, ExtraBlockPtr & not_processed) override; void joinTotals(Block &) const override; void setTotals(const Block &) override; bool hasTotals() const override { return totals; } size_t getTotalRowCount() const override { return right_blocks_row_count; } + size_t getTotalByteCount() const override { return right_blocks_bytes; } private: struct NotProcessed : public ExtraBlock From 5717d4833364ea839ec5c272ffceee945a94df6d Mon Sep 17 00:00:00 2001 From: chertus Date: Mon, 17 Feb 2020 20:21:03 +0300 Subject: [PATCH 03/15] better includes --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 2 ++ dbms/src/Interpreters/JoinSwitcher.cpp | 10 ++++++++++ dbms/src/Interpreters/JoinSwitcher.h | 11 +---------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index a83d235a6f4..c4df430b355 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -30,6 +30,8 @@ #include #include #include +#include +#include #include #include diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index 3ded6124091..4c23d0f8749 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include namespace DB @@ -15,6 +17,14 @@ static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, return std::move(column); } +JoinSwitcher::JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_) + : switched(false) + , table_join(table_join_) + , right_sample_block(right_sample_block_.cloneEmpty()) +{ + join = std::make_shared(table_join, right_sample_block); +} + bool JoinSwitcher::addJoinedBlock(const Block & block, bool) { /// Trying to make MergeJoin without lock diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index b2c60f698d0..138b4382868 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -1,12 +1,9 @@ #pragma once #include -#include #include #include -#include -#include #include namespace DB @@ -15,13 +12,7 @@ namespace DB class JoinSwitcher : public IJoin { public: - JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_) - : switched(false) - , table_join(table_join_) - , right_sample_block(right_sample_block_.cloneEmpty()) - { - join = std::make_shared(table_join, right_sample_block); - } + JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_); bool addJoinedBlock(const Block & block, bool check_limits = true) override; From a9e743d8bd4ac88e067ec2b02f002b6acf588220 Mon Sep 17 00:00:00 2001 From: chertus Date: Mon, 17 Feb 2020 20:41:38 +0300 Subject: [PATCH 04/15] minor JoinSwitcher optimisation --- dbms/src/Interpreters/JoinSwitcher.cpp | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index 4c23d0f8749..082f96815d3 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -58,12 +58,26 @@ void JoinSwitcher::switchJoin() /// Destroy old join & create new one. Destroy first in case of memory saving. join = std::make_shared(table_join, right_sample_block); - for (Block & block : right_blocks) + /// names to positions optimization + std::vector positions; + std::vector is_nullable; + if (right_blocks.size()) { + positions.reserve(right_sample_block.columns()); + Block & tmp_block = *right_blocks.begin(); for (const auto & sample_column : right_sample_block) { - auto & column = block.getByName(sample_column.name); - block.insert(correctNullability(std::move(column), sample_column.type->isNullable())); + positions.emplace_back(tmp_block.getPositionByName(sample_column.name)); + is_nullable.emplace_back(sample_column.type->isNullable()); + } + } + + for (Block & block : right_blocks) + { + for (size_t i = 0; i < positions.size(); ++i) + { + auto & column = block.getByPosition(positions[i]); + block.insert(correctNullability(std::move(column), is_nullable[i])); join->addJoinedBlock(block); } } From f1673e13442724a84c8adf9570f940251432918b Mon Sep 17 00:00:00 2001 From: chertus Date: Tue, 18 Feb 2020 15:41:23 +0300 Subject: [PATCH 05/15] fix JoinSwitcher logic --- dbms/src/Interpreters/Join.cpp | 11 +++++------ dbms/src/Interpreters/Join.h | 2 +- dbms/src/Interpreters/JoinSwitcher.cpp | 11 ++++++----- dbms/src/Interpreters/JoinSwitcher.h | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 89c20afc409..c53e61a565c 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -316,7 +316,7 @@ void Join::setSampleBlock(const Block & block) ColumnRawPtrs key_columns = JoinCommon::extractKeysForJoin(key_names_right, block, right_table_keys, sample_block_with_columns_to_add); - initRightBlockStructure(); + initRightBlockStructure(data->sample_block); initRequiredRightKeys(); JoinCommon::createMissedColumns(sample_block_with_columns_to_add); @@ -481,13 +481,12 @@ void Join::initRequiredRightKeys() } } -void Join::initRightBlockStructure() +void Join::initRightBlockStructure(Block & saved_block_sample) { - auto & saved_block_sample = data->sample_block; - - if (isRightOrFull(kind)) + /// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any). + bool save_key_columns = !table_join->forceHashJoin() || isRightOrFull(kind); + if (save_key_columns) { - /// Save keys for NonJoinedBlockInputStream saved_block_sample = right_table_keys.cloneEmpty(); } else if (strictness == ASTTableJoin::Strictness::Asof) diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 1d513564d72..6311d75c65e 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -369,7 +369,7 @@ private: /// Modify (structure) right block to save it in block list Block structureRightBlock(const Block & stored_block) const; - void initRightBlockStructure(); + void initRightBlockStructure(Block & saved_block_sample); void initRequiredRightKeys(); template diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index 082f96815d3..102129df409 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -64,7 +64,7 @@ void JoinSwitcher::switchJoin() if (right_blocks.size()) { positions.reserve(right_sample_block.columns()); - Block & tmp_block = *right_blocks.begin(); + const Block & tmp_block = *right_blocks.begin(); for (const auto & sample_column : right_sample_block) { positions.emplace_back(tmp_block.getPositionByName(sample_column.name)); @@ -72,14 +72,15 @@ void JoinSwitcher::switchJoin() } } - for (Block & block : right_blocks) + for (Block & saved_block : right_blocks) { + Block restored_block; for (size_t i = 0; i < positions.size(); ++i) { - auto & column = block.getByPosition(positions[i]); - block.insert(correctNullability(std::move(column), is_nullable[i])); - join->addJoinedBlock(block); + auto & column = saved_block.getByPosition(positions[i]); + restored_block.insert(correctNullability(std::move(column), is_nullable[i])); } + join->addJoinedBlock(restored_block); } switched = true; diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index 138b4382868..414ba1309f6 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -66,7 +66,7 @@ private: bool switched; mutable std::mutex switch_mutex; std::shared_ptr table_join; - Block right_sample_block; + const Block right_sample_block; void switchJoin(); }; From fbecd0c155311ce38ac47946d7eaf7710888b8db Mon Sep 17 00:00:00 2001 From: chertus Date: Tue, 18 Feb 2020 17:19:13 +0300 Subject: [PATCH 06/15] use default_max_bytes_in_join in JoinSwitcher if no join limits --- dbms/src/Core/Settings.h | 2 +- dbms/src/Interpreters/JoinSwitcher.cpp | 5 ++++- dbms/src/Interpreters/JoinSwitcher.h | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 22c759b4c4b..934078bc3b9 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -318,7 +318,7 @@ struct Settings : public SettingsCollection M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \ M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for joins (ANY|ALL|SEMI LEFT and ALL INNER are supported for now).", 0) \ M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join", 0) \ - M(SettingUInt64, default_max_bytes_in_join, 100000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ + M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ \ M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.", 0) \ diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index 102129df409..ff0c507036f 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -19,10 +19,14 @@ static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, JoinSwitcher::JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_) : switched(false) + , limits(table_join_->sizeLimits()) , table_join(table_join_) , right_sample_block(right_sample_block_.cloneEmpty()) { join = std::make_shared(table_join, right_sample_block); + + if (!limits.hasLimits()) + limits.max_bytes = table_join->defaultMaxBytes(); } bool JoinSwitcher::addJoinedBlock(const Block & block, bool) @@ -43,7 +47,6 @@ bool JoinSwitcher::addJoinedBlock(const Block & block, bool) size_t rows = join->getTotalRowCount(); size_t bytes = join->getTotalByteCount(); - auto & limits = table_join->sizeLimits(); if (!limits.softCheck(rows, bytes)) switchJoin(); diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index 414ba1309f6..0ecce1a0bdd 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -64,6 +64,7 @@ public: private: JoinPtr join; bool switched; + SizeLimits limits; mutable std::mutex switch_mutex; std::shared_ptr table_join; const Block right_sample_block; From 74a5227ac465c97dd55b4822a15c34ab314b136d Mon Sep 17 00:00:00 2001 From: chertus Date: Tue, 18 Feb 2020 20:31:22 +0300 Subject: [PATCH 07/15] add join_algorithm setting --- dbms/src/Common/ErrorCodes.cpp | 2 +- dbms/src/Core/Settings.h | 3 ++- dbms/src/Core/SettingsCollection.cpp | 10 ++++++++-- dbms/src/Core/SettingsCollection.h | 8 ++++++++ dbms/src/Interpreters/AnalyzedJoin.cpp | 4 +++- dbms/src/Interpreters/AnalyzedJoin.h | 9 ++++++--- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 2 +- 7 files changed, 29 insertions(+), 9 deletions(-) diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 718fc0cbf89..b6f8e4d0d77 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -390,7 +390,7 @@ namespace ErrorCodes extern const int ALL_REPLICAS_LOST = 415; extern const int REPLICA_STATUS_CHANGED = 416; extern const int EXPECTED_ALL_OR_ANY = 417; - extern const int UNKNOWN_JOIN_STRICTNESS = 418; + extern const int UNKNOWN_JOIN = 418; extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN = 419; extern const int CANNOT_UPDATE_COLUMN = 420; extern const int CANNOT_ADD_DIFFERENT_AGGREGATE_STATES = 421; diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 934078bc3b9..450a253651c 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -316,7 +316,8 @@ struct Settings : public SettingsCollection M(SettingUInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).", 0) \ M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \ - M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for joins (ANY|ALL|SEMI LEFT and ALL INNER are supported for now).", 0) \ + M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \ + M(SettingJoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \ M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join", 0) \ M(SettingUInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ diff --git a/dbms/src/Core/SettingsCollection.cpp b/dbms/src/Core/SettingsCollection.cpp index 606328f400c..9edfddb435d 100644 --- a/dbms/src/Core/SettingsCollection.cpp +++ b/dbms/src/Core/SettingsCollection.cpp @@ -22,7 +22,7 @@ namespace ErrorCodes extern const int UNKNOWN_COMPRESSION_METHOD; extern const int UNKNOWN_DISTRIBUTED_PRODUCT_MODE; extern const int UNKNOWN_GLOBAL_SUBQUERIES_METHOD; - extern const int UNKNOWN_JOIN_STRICTNESS; + extern const int UNKNOWN_JOIN; extern const int UNKNOWN_LOG_LEVEL; extern const int SIZE_OF_FIXED_STRING_DOESNT_MATCH; extern const int BAD_ARGUMENTS; @@ -495,8 +495,14 @@ IMPLEMENT_SETTING_ENUM(LoadBalancing, LOAD_BALANCING_LIST_OF_NAMES, ErrorCodes:: M(Unspecified, "") \ M(ALL, "ALL") \ M(ANY, "ANY") -IMPLEMENT_SETTING_ENUM(JoinStrictness, JOIN_STRICTNESS_LIST_OF_NAMES, ErrorCodes::UNKNOWN_JOIN_STRICTNESS) +IMPLEMENT_SETTING_ENUM(JoinStrictness, JOIN_STRICTNESS_LIST_OF_NAMES, ErrorCodes::UNKNOWN_JOIN) +#define JOIN_ALGORITHM_NAMES(M) \ + M(AUTO, "auto") \ + M(HASH, "hash") \ + M(PARTIAL_MERGE, "partial_merge") \ + M(PREFER_PARTIAL_MERGE, "prefer_partial_merge") +IMPLEMENT_SETTING_ENUM(JoinAlgorithm, JOIN_ALGORITHM_NAMES, ErrorCodes::UNKNOWN_JOIN) #define TOTALS_MODE_LIST_OF_NAMES(M) \ M(BEFORE_HAVING, "before_having") \ diff --git a/dbms/src/Core/SettingsCollection.h b/dbms/src/Core/SettingsCollection.h index 4f44612c36f..700e96f0d40 100644 --- a/dbms/src/Core/SettingsCollection.h +++ b/dbms/src/Core/SettingsCollection.h @@ -242,6 +242,14 @@ enum class JoinStrictness }; using SettingJoinStrictness = SettingEnum; +enum class JoinAlgorithm +{ + AUTO = 0, + HASH, + PARTIAL_MERGE, + PREFER_PARTIAL_MERGE, +}; +using SettingJoinAlgorithm = SettingEnum; /// Which rows should be included in TOTALS. enum class TotalsMode diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index 33876080860..b35ddfaeff8 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -22,7 +22,9 @@ AnalyzedJoin::AnalyzedJoin(const Settings & settings, VolumePtr tmp_volume_) , default_max_bytes(settings.default_max_bytes_in_join) , join_use_nulls(settings.join_use_nulls) , max_joined_block_rows(settings.max_joined_block_size_rows) - , partial_merge_join(settings.partial_merge_join) + , force_hash_join(settings.join_algorithm == JoinAlgorithm::HASH) + , force_partial_merge_join(settings.join_algorithm == JoinAlgorithm::PARTIAL_MERGE) + , prefer_partial_merge_join(settings.join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE || settings.partial_merge_join) , partial_merge_join_optimizations(settings.partial_merge_join_optimizations) , partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks) , tmp_volume(tmp_volume_) diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index 713ebc3b9d5..879de9e5073 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -44,7 +44,9 @@ class AnalyzedJoin const size_t default_max_bytes; const bool join_use_nulls; const size_t max_joined_block_rows = 0; - const bool partial_merge_join = false; + const bool force_hash_join = false; + const bool force_partial_merge_join = false; + const bool prefer_partial_merge_join = false; const bool partial_merge_join_optimizations = false; const size_t partial_merge_join_rows_in_right_blocks = 0; @@ -88,8 +90,9 @@ public: const SizeLimits & sizeLimits() const { return size_limits; } VolumePtr getTemporaryVolume() { return tmp_volume; } bool allowMergeJoin() const; - bool forceMergeJoin() const { return allowMergeJoin() && partial_merge_join; } - bool forceHashJoin() const { return !allowMergeJoin(); } + bool preferMergeJoin() const { return allowMergeJoin() && prefer_partial_merge_join; } + bool forceMergeJoin() const { return force_partial_merge_join; } + bool forceHashJoin() const { return force_hash_join; } bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); } bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index c4df430b355..59115d30422 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -544,7 +544,7 @@ static std::shared_ptr makeJoin(std::shared_ptr analyzed_jo { if (analyzed_join->forceHashJoin()) return std::make_shared(analyzed_join, sample_block); - else if (analyzed_join->forceMergeJoin()) + else if (analyzed_join->forceMergeJoin() || analyzed_join->preferMergeJoin()) return std::make_shared(analyzed_join, sample_block); return std::make_shared(analyzed_join, sample_block); } From f748427ffd4ce11c9e0d225286b87a8a02e45371 Mon Sep 17 00:00:00 2001 From: chertus Date: Wed, 19 Feb 2020 14:20:35 +0300 Subject: [PATCH 08/15] minor fix (remove logic dependent on partial_merge_join setting) --- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index c100ef1e0fa..10ac3441d6c 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -50,6 +50,7 @@ #include #include #include +#include #include #include @@ -862,6 +863,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (expressions.hasJoin()) { Block header_before_join; + JoinPtr join = expressions.before_join->getTableJoinAlgo(); if constexpr (pipeline_with_processors) { @@ -879,10 +881,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS { bool on_totals = type == QueryPipeline::StreamType::Totals; std::shared_ptr ret; - if (settings.partial_merge_join) - ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); - else + if (!join || typeid_cast(join.get())) ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); + else + ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); + return ret; }); } @@ -894,7 +897,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS stream = std::make_shared(stream, expressions.before_join); } - if (JoinPtr join = expressions.before_join->getTableJoinAlgo()) + if (join) { Block join_result_sample = ExpressionBlockInputStream( std::make_shared(header_before_join), expressions.before_join).getHeader(); From 3348697128935e35007ff374424365ffb4019f24 Mon Sep 17 00:00:00 2001 From: chertus Date: Wed, 19 Feb 2020 15:37:56 +0300 Subject: [PATCH 09/15] fix storage join --- dbms/src/Interpreters/AnalyzedJoin.h | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index 879de9e5073..dc3c3acad08 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -78,6 +78,7 @@ public: : size_limits(limits) , default_max_bytes(0) , join_use_nulls(use_nulls) + , force_hash_join(true) , key_names_right(key_names_right_) { table_join.kind = kind; From 1c297412d4260599d5ae88eac07b810c34612844 Mon Sep 17 00:00:00 2001 From: chertus Date: Wed, 19 Feb 2020 16:10:03 +0300 Subject: [PATCH 10/15] fix test --- .../queries/0_stateless/01010_pmj_on_disk.reference | 4 ++++ .../tests/queries/0_stateless/01010_pmj_on_disk.sql | 13 +++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/dbms/tests/queries/0_stateless/01010_pmj_on_disk.reference b/dbms/tests/queries/0_stateless/01010_pmj_on_disk.reference index d2ae3db9528..ba1d03fcc5d 100644 --- a/dbms/tests/queries/0_stateless/01010_pmj_on_disk.reference +++ b/dbms/tests/queries/0_stateless/01010_pmj_on_disk.reference @@ -10,3 +10,7 @@ 1 0 2 11 3 0 +0 10 +1 0 +2 11 +3 0 diff --git a/dbms/tests/queries/0_stateless/01010_pmj_on_disk.sql b/dbms/tests/queries/0_stateless/01010_pmj_on_disk.sql index d84d5b1c52a..2b88b528bbf 100644 --- a/dbms/tests/queries/0_stateless/01010_pmj_on_disk.sql +++ b/dbms/tests/queries/0_stateless/01010_pmj_on_disk.sql @@ -1,4 +1,4 @@ -SET partial_merge_join = 0; +SET join_algorithm = 'hash'; SELECT number as n, j FROM numbers(4) ANY LEFT JOIN ( @@ -16,7 +16,7 @@ ANY LEFT JOIN ( ) js2 USING n; -- { serverError 191 } -SET partial_merge_join = 1; +SET join_algorithm = 'partial_merge'; SELECT number as n, j FROM numbers(4) ANY LEFT JOIN ( @@ -33,3 +33,12 @@ ANY LEFT JOIN ( FROM numbers(4000) ) js2 USING n; + +SET join_algorithm = 'auto'; + +SELECT number as n, j FROM numbers(4) +ANY LEFT JOIN ( + SELECT number * 2 AS n, number + 10 AS j + FROM numbers(4000) +) js2 +USING n; From 9f43fa87c3dd732925b6923375ead0250a214f29 Mon Sep 17 00:00:00 2001 From: chertus Date: Wed, 19 Feb 2020 17:23:21 +0300 Subject: [PATCH 11/15] make switched flag atomic --- dbms/src/Interpreters/JoinSwitcher.cpp | 4 ++-- dbms/src/Interpreters/JoinSwitcher.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index ff0c507036f..1fa5283c121 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -33,7 +33,7 @@ bool JoinSwitcher::addJoinedBlock(const Block & block, bool) { /// Trying to make MergeJoin without lock - if (switched) + if (switched.load(std::memory_order_relaxed)) return join->addJoinedBlock(block); std::lock_guard lock(switch_mutex); @@ -58,7 +58,7 @@ void JoinSwitcher::switchJoin() std::shared_ptr joined_data = static_cast(*join).getJoinedData(); BlocksList right_blocks = std::move(joined_data->blocks); - /// Destroy old join & create new one. Destroy first in case of memory saving. + /// Destroy old join & create new one. Early destroy for memory saving. join = std::make_shared(table_join, right_sample_block); /// names to positions optimization diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index 0ecce1a0bdd..487f35d4d6b 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -63,8 +63,8 @@ public: private: JoinPtr join; - bool switched; SizeLimits limits; + mutable std::atomic switched; mutable std::mutex switch_mutex; std::shared_ptr table_join; const Block right_sample_block; From 3d7959d2e88b1629c0f86cfffeee30def83c78cc Mon Sep 17 00:00:00 2001 From: chertus Date: Wed, 19 Feb 2020 22:11:23 +0300 Subject: [PATCH 12/15] fix join_algorithm and partial_merge_join settings check --- dbms/src/Interpreters/AnalyzedJoin.cpp | 9 +++++---- dbms/src/Interpreters/AnalyzedJoin.h | 13 ++++++------- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 7 ++++--- dbms/src/Interpreters/JoinSwitcher.cpp | 4 ++-- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index b35ddfaeff8..b3a14541fa3 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -22,13 +22,14 @@ AnalyzedJoin::AnalyzedJoin(const Settings & settings, VolumePtr tmp_volume_) , default_max_bytes(settings.default_max_bytes_in_join) , join_use_nulls(settings.join_use_nulls) , max_joined_block_rows(settings.max_joined_block_size_rows) - , force_hash_join(settings.join_algorithm == JoinAlgorithm::HASH) - , force_partial_merge_join(settings.join_algorithm == JoinAlgorithm::PARTIAL_MERGE) - , prefer_partial_merge_join(settings.join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE || settings.partial_merge_join) + , join_algorithm(settings.join_algorithm) , partial_merge_join_optimizations(settings.partial_merge_join_optimizations) , partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks) , tmp_volume(tmp_volume_) -{} +{ + if (settings.partial_merge_join) + join_algorithm = JoinAlgorithm::PREFER_PARTIAL_MERGE; +} void AnalyzedJoin::addUsingKey(const ASTPtr & ast) { diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index dc3c3acad08..a2df2ff6bab 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -44,9 +45,7 @@ class AnalyzedJoin const size_t default_max_bytes; const bool join_use_nulls; const size_t max_joined_block_rows = 0; - const bool force_hash_join = false; - const bool force_partial_merge_join = false; - const bool prefer_partial_merge_join = false; + JoinAlgorithm join_algorithm; const bool partial_merge_join_optimizations = false; const size_t partial_merge_join_rows_in_right_blocks = 0; @@ -78,7 +77,7 @@ public: : size_limits(limits) , default_max_bytes(0) , join_use_nulls(use_nulls) - , force_hash_join(true) + , join_algorithm(JoinAlgorithm::HASH) , key_names_right(key_names_right_) { table_join.kind = kind; @@ -91,9 +90,9 @@ public: const SizeLimits & sizeLimits() const { return size_limits; } VolumePtr getTemporaryVolume() { return tmp_volume; } bool allowMergeJoin() const; - bool preferMergeJoin() const { return allowMergeJoin() && prefer_partial_merge_join; } - bool forceMergeJoin() const { return force_partial_merge_join; } - bool forceHashJoin() const { return force_hash_join; } + bool preferMergeJoin() const { return allowMergeJoin() && join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; } + bool forceMergeJoin() const { return join_algorithm == JoinAlgorithm::PARTIAL_MERGE; } + bool forceHashJoin() const { return join_algorithm == JoinAlgorithm::HASH; } bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); } bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 10ac3441d6c..0cead565cc8 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -864,6 +864,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS { Block header_before_join; JoinPtr join = expressions.before_join->getTableJoinAlgo(); + bool inflating_join = join && !typeid_cast(join.get()); if constexpr (pipeline_with_processors) { @@ -881,10 +882,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS { bool on_totals = type == QueryPipeline::StreamType::Totals; std::shared_ptr ret; - if (!join || typeid_cast(join.get())) - ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); - else + if (inflating_join) ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); + else + ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); return ret; }); diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index 1fa5283c121..d500f55fc5a 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -18,8 +18,8 @@ static ColumnWithTypeAndName correctNullability(ColumnWithTypeAndName && column, } JoinSwitcher::JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_) - : switched(false) - , limits(table_join_->sizeLimits()) + : limits(table_join_->sizeLimits()) + , switched(false) , table_join(table_join_) , right_sample_block(right_sample_block_.cloneEmpty()) { From 56fa3cc1eb4f9b4d2baf26baee717cf5059f1cd3 Mon Sep 17 00:00:00 2001 From: chertus Date: Thu, 20 Feb 2020 14:26:00 +0300 Subject: [PATCH 13/15] correct fallback for PREFER_PARTIAL_MERGE: HashJoin -> MergeJoin instead of HashJoin -> JoinSwitcher --- dbms/src/Interpreters/AnalyzedJoin.h | 2 +- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index a2df2ff6bab..6c5c65cde54 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -90,7 +90,7 @@ public: const SizeLimits & sizeLimits() const { return size_limits; } VolumePtr getTemporaryVolume() { return tmp_volume; } bool allowMergeJoin() const; - bool preferMergeJoin() const { return allowMergeJoin() && join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; } + bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; } bool forceMergeJoin() const { return join_algorithm == JoinAlgorithm::PARTIAL_MERGE; } bool forceHashJoin() const { return join_algorithm == JoinAlgorithm::HASH; } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 59115d30422..1a727795edf 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -542,9 +542,11 @@ static ExpressionActionsPtr createJoinedBlockActions(const Context & context, co static std::shared_ptr makeJoin(std::shared_ptr analyzed_join, const Block & sample_block) { - if (analyzed_join->forceHashJoin()) + bool allow_merge_join = analyzed_join->allowMergeJoin(); + + if (analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join)) return std::make_shared(analyzed_join, sample_block); - else if (analyzed_join->forceMergeJoin() || analyzed_join->preferMergeJoin()) + else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join)) return std::make_shared(analyzed_join, sample_block); return std::make_shared(analyzed_join, sample_block); } From f683437a8cb0cd40f0cc45ab4745824028e3949a Mon Sep 17 00:00:00 2001 From: chertus Date: Thu, 20 Feb 2020 20:19:16 +0300 Subject: [PATCH 14/15] remove dangerous optimisation --- dbms/src/Interpreters/JoinSwitcher.cpp | 5 ----- dbms/src/Interpreters/JoinSwitcher.h | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/dbms/src/Interpreters/JoinSwitcher.cpp b/dbms/src/Interpreters/JoinSwitcher.cpp index d500f55fc5a..142a7a1af94 100644 --- a/dbms/src/Interpreters/JoinSwitcher.cpp +++ b/dbms/src/Interpreters/JoinSwitcher.cpp @@ -31,11 +31,6 @@ JoinSwitcher::JoinSwitcher(std::shared_ptr table_join_, const Bloc bool JoinSwitcher::addJoinedBlock(const Block & block, bool) { - /// Trying to make MergeJoin without lock - - if (switched.load(std::memory_order_relaxed)) - return join->addJoinedBlock(block); - std::lock_guard lock(switch_mutex); if (switched) diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index 487f35d4d6b..47ed2b8bc57 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -64,7 +64,7 @@ public: private: JoinPtr join; SizeLimits limits; - mutable std::atomic switched; + bool switched; mutable std::mutex switch_mutex; std::shared_ptr table_join; const Block right_sample_block; From aeefb78157f3c80f478a608ada1b5d24ec87d630 Mon Sep 17 00:00:00 2001 From: chertus Date: Thu, 20 Feb 2020 22:13:12 +0300 Subject: [PATCH 15/15] add comments to JoinSwitcher --- dbms/src/Interpreters/JoinSwitcher.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dbms/src/Interpreters/JoinSwitcher.h b/dbms/src/Interpreters/JoinSwitcher.h index 47ed2b8bc57..5e677b5205f 100644 --- a/dbms/src/Interpreters/JoinSwitcher.h +++ b/dbms/src/Interpreters/JoinSwitcher.h @@ -9,11 +9,17 @@ namespace DB { +/// Used when setting 'join_algorithm' set to JoinAlgorithm::AUTO. +/// Starts JOIN with join-in-memory algorithm and switches to join-on-disk on the fly if there's no memory to place right table. +/// Current join-in-memory and join-on-disk are JoinAlgorithm::HASH and JoinAlgorithm::PARTIAL_MERGE joins respectively. class JoinSwitcher : public IJoin { public: JoinSwitcher(std::shared_ptr table_join_, const Block & right_sample_block_); + /// Add block of data from right hand of JOIN into current join object. + /// If join-in-memory memory limit exceeded switches to join-on-disk and continue with it. + /// @returns false, if join-on-disk disk limit exceeded bool addJoinedBlock(const Block & block, bool check_limits = true) override; void joinBlock(Block & block, std::shared_ptr & not_processed) override @@ -69,6 +75,8 @@ private: std::shared_ptr table_join; const Block right_sample_block; + /// Change join-in-memory to join-on-disk moving right hand JOIN data from one to another. + /// Throws an error if join-on-disk do not support JOIN kind or strictness. void switchJoin(); };