Cleanup logic around join_algorithm setting

This commit is contained in:
vdimir 2022-07-15 14:57:58 +00:00
parent 466fceb3ee
commit 96bcae419c
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
13 changed files with 120 additions and 92 deletions

View File

@ -355,7 +355,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).", 0) \
M(OverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(Bool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \
M(JoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge', 'parallel_hash'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \
M(JoinAlgorithm, join_algorithm, JoinAlgorithm::DEFAULT, "Specify join algorithm.", 0) \
M(UInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \
M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \
M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \

View File

@ -31,7 +31,8 @@ IMPLEMENT_SETTING_ENUM(JoinStrictness, ErrorCodes::UNKNOWN_JOIN,
IMPLEMENT_SETTING_MULTI_ENUM(JoinAlgorithm, ErrorCodes::UNKNOWN_JOIN,
{{"auto", JoinAlgorithm::AUTO},
{{"default", JoinAlgorithm::DEFAULT},
{"auto", JoinAlgorithm::AUTO},
{"hash", JoinAlgorithm::HASH},
{"partial_merge", JoinAlgorithm::PARTIAL_MERGE},
{"prefer_partial_merge", JoinAlgorithm::PREFER_PARTIAL_MERGE},

View File

@ -38,7 +38,8 @@ DECLARE_SETTING_ENUM(JoinStrictness)
enum class JoinAlgorithm
{
AUTO = 0,
DEFAULT = 0,
AUTO,
HASH,
PARTIAL_MERGE,
PREFER_PARTIAL_MERGE,

View File

@ -89,7 +89,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_IDENTIFIER;
extern const int UNKNOWN_TYPE_OF_AST_NODE;
extern const int UNSUPPORTED_METHOD;
}
namespace
@ -1079,34 +1078,58 @@ static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoi
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}
static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block, ContextPtr context)
{
/// HashJoin with Dictionary optimisation
if (analyzed_join->tryInitDictJoin(right_sample_block, context))
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block);
bool allow_merge_join = analyzed_join->allowMergeJoin();
if (analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join))
static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, std::unique_ptr<QueryPlan> & joined_plan, ContextPtr context)
{
Block right_sample_block = joined_plan->getCurrentDataStream().header;
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
{
if (analyzed_join->allowParallelHashJoin())
if (JoinPtr kvjoin = tryKeyValueJoin(analyzed_join, right_sample_block))
{
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, right_sample_block);
/// Do not need to execute plan for right part
joined_plan.reset();
return kvjoin;
}
/// It's not a hash join actually, that's why we check JoinAlgorithm::DIRECT
/// It's would be fixed in https://github.com/ClickHouse/ClickHouse/pull/38956
if (analyzed_join->tryInitDictJoin(right_sample_block, context))
{
joined_plan.reset();
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join))
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARTIAL_MERGE) ||
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE))
{
if (MergeJoin::isSupported(analyzed_join))
return std::make_shared<MergeJoin>(analyzed_join, right_sample_block);
}
else if (analyzed_join->forceFullSortingMergeJoin())
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::HASH) ||
/// partial_merge is preferred, but can't be used for specified kind of join, fallback to hash
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE) ||
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARALLEL_HASH))
{
if (analyzed_join->getClauses().size() != 1)
throw Exception("Full sorting merge join is supported only for single-condition joins", ErrorCodes::NOT_IMPLEMENTED);
if (analyzed_join->isSpecialStorage())
throw Exception("Full sorting merge join is not supported for special storage", ErrorCodes::NOT_IMPLEMENTED);
if (analyzed_join->allowParallelHashJoin())
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, right_sample_block);
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE))
{
if (FullSortingMergeJoin::isSupported(analyzed_join))
return std::make_shared<FullSortingMergeJoin>(analyzed_join, right_sample_block);
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::AUTO))
return std::make_shared<JoinSwitcher>(analyzed_join, right_sample_block);
throw Exception("Can't execute any of specified algorithms for specified strictness/kind and right storage type",
ErrorCodes::NOT_IMPLEMENTED);
}
static std::unique_ptr<QueryPlan> buildJoinedPlan(
@ -1164,27 +1187,26 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block)
{
auto error_or_null = [&](const String & msg)
{
if (analyzed_join->isForcedAlgorithm(JoinAlgorithm::DIRECT))
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, "Can't use '{}' join algorithm: {}", JoinAlgorithm::DIRECT, msg);
return nullptr;
};
if (!analyzed_join->isAllowedAlgorithm(JoinAlgorithm::DIRECT))
if (!analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
return nullptr;
auto storage = analyzed_join->getStorageKeyValue();
if (!storage)
return error_or_null("unsupported storage");
{
return nullptr;
}
if (!isInnerOrLeft(analyzed_join->kind()))
return error_or_null("illegal kind");
{
return nullptr;
}
if (analyzed_join->strictness() != ASTTableJoin::Strictness::All &&
analyzed_join->strictness() != ASTTableJoin::Strictness::Any &&
analyzed_join->strictness() != ASTTableJoin::Strictness::RightAny)
return error_or_null("illegal strictness");
{
return nullptr;
}
const auto & clauses = analyzed_join->getClauses();
bool only_one_key = clauses.size() == 1 &&
@ -1194,15 +1216,16 @@ std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> a
!clauses[0].on_filter_condition_right;
if (!only_one_key)
return error_or_null("multiple keys is not allowed");
{
return nullptr;
}
String key_name = clauses[0].key_names_right[0];
String original_key_name = analyzed_join->getOriginalName(key_name);
const auto & storage_primary_key = storage->getPrimaryKey();
if (storage_primary_key.size() != 1 || storage_primary_key[0] != original_key_name)
{
return error_or_null(fmt::format("key '{}'{} doesn't match storage '{}'",
key_name, (key_name != original_key_name ? " (aka '" + original_key_name + "')" : ""), fmt::join(storage_primary_key, ",")));
return nullptr;
}
return std::make_shared<DirectKeyValueJoin>(analyzed_join, right_sample_block, storage);
@ -1240,18 +1263,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeJoin(
joined_plan->addStep(std::move(converting_step));
}
const Block & right_sample_block = joined_plan->getCurrentDataStream().header;
if (JoinPtr kvjoin = tryKeyValueJoin(analyzed_join, right_sample_block))
{
joined_plan.reset();
return kvjoin;
}
JoinPtr join = chooseJoinAlgorithm(analyzed_join, right_sample_block, getContext());
/// Do not make subquery for join over dictionary.
if (analyzed_join->getDictionaryReader())
joined_plan.reset();
JoinPtr join = chooseJoinAlgorithm(analyzed_join, joined_plan, getContext());
return join;
}

View File

@ -34,14 +34,26 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addJoinedBlock should not be called");
}
void checkTypesOfKeys(const Block & left_block) const override
static bool isSupported(const std::shared_ptr<TableJoin> & table_join)
{
if (table_join->getClauses().size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin supports only one join key");
if (!table_join->oneDisjunct())
return false;
bool support_storage = !table_join->isSpecialStorage();
const auto & on_expr = table_join->getOnlyClause();
bool support_conditions = !on_expr.on_filter_condition_left && !on_expr.on_filter_condition_right;
/// Key column can change nullability and it's not handled on type conversion stage, so algorithm should be aware of it
if (table_join->hasUsing() && table_join->joinUseNulls())
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "FullSortingMergeJoin doesn't support USING with join_use_nulls");
bool support_using_and_nulls = !table_join->hasUsing() || !table_join->joinUseNulls();
return support_conditions && support_using_and_nulls && support_storage;
}
void checkTypesOfKeys(const Block & left_block) const override
{
if (!isSupported(table_join))
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "FullSortingMergeJoin doesn't support specified query");
const auto & onexpr = table_join->getOnlyClause();
for (size_t i = 0; i < onexpr.key_names_left.size(); ++i)

View File

@ -718,7 +718,7 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
bool multiple_disjuncts = !table_join->oneDisjunct();
/// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any).
bool save_key_columns = !table_join->forceHashJoin() || isRightOrFull(kind) || multiple_disjuncts;
bool save_key_columns = table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO) || isRightOrFull(kind) || multiple_disjuncts;
if (save_key_columns)
{
saved_block_sample = right_table_keys.cloneEmpty();

View File

@ -311,7 +311,8 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
{
table_join->setStorageJoin(storage_join);
}
else if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage); storage_dict)
else if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage);
storage_dict && join_algorithm.isSet(JoinAlgorithm::DIRECT))
{
table_join->setStorageJoin(storage_dict);
}

View File

@ -1135,6 +1135,20 @@ void MergeJoin::addConditionJoinColumn(Block & block, JoinTableSide block_side)
}
}
bool MergeJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
{
auto kind = table_join->kind();
auto strictness = table_join->strictness();
bool is_any = (strictness == ASTTableJoin::Strictness::Any);
bool is_all = (strictness == ASTTableJoin::Strictness::All);
bool is_semi = (strictness == ASTTableJoin::Strictness::Semi);
bool all_join = is_all && (isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind));
bool special_left = isInnerOrLeft(kind) && (is_any || is_semi);
return (all_join || special_left) && table_join->oneDisjunct();
}
MergeJoin::RightBlockInfo::RightBlockInfo(std::shared_ptr<Block> block_, size_t block_number_, size_t & skip_, RowBitmaps * bitmaps_)
: block(block_)

View File

@ -37,6 +37,8 @@ public:
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
static bool isSupported(const std::shared_ptr<TableJoin> & table_join);
private:
friend class NotJoinedMerge;

View File

@ -363,7 +363,7 @@ void TableJoin::addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, boo
* For `JOIN ON expr1 == expr2` we will infer common type later in makeTableJoin,
* when part of plan built and types of expression will be known.
*/
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage(), forceFullSortingMergeJoin());
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage(), isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE));
if (auto it = left_type_map.find(col.name); it != left_type_map.end())
{
@ -409,18 +409,6 @@ bool TableJoin::oneDisjunct() const
return clauses.size() == 1;
}
bool TableJoin::allowMergeJoin() const
{
bool is_any = (strictness() == ASTTableJoin::Strictness::Any);
bool is_all = (strictness() == ASTTableJoin::Strictness::All);
bool is_semi = (strictness() == ASTTableJoin::Strictness::Semi);
bool all_join = is_all && (isInner(kind()) || isLeft(kind()) || isRight(kind()) || isFull(kind()));
bool special_left = isLeft(kind()) && (is_any || is_semi);
return (all_join || special_left) && oneDisjunct();
}
bool TableJoin::needStreamWithNonJoinedRows() const
{
if (strictness() == ASTTableJoin::Strictness::Asof ||
@ -511,7 +499,7 @@ TableJoin::createConvertingActions(
const ColumnsWithTypeAndName & left_sample_columns,
const ColumnsWithTypeAndName & right_sample_columns)
{
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage(), forceFullSortingMergeJoin());
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage(), isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE));
NameToNameMap left_key_column_rename;
NameToNameMap right_key_column_rename;

View File

@ -193,24 +193,20 @@ public:
bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool isAllowedAlgorithm(JoinAlgorithm val) const { return join_algorithm.isSet(val) || join_algorithm.isSet(JoinAlgorithm::AUTO); }
bool isForcedAlgorithm(JoinAlgorithm val) const { return join_algorithm == MultiEnum<JoinAlgorithm>(val); }
bool preferMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PREFER_PARTIAL_MERGE); }
bool forceMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARTIAL_MERGE); }
bool isEnabledAlgorithm(JoinAlgorithm val) const
{
/// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
/// It's behaviour that was initially supported by clickhouse.
bool is_enbaled_by_default = val == JoinAlgorithm::DEFAULT
|| val == JoinAlgorithm::HASH
|| val == JoinAlgorithm::DIRECT;
if (join_algorithm.isSet(JoinAlgorithm::DEFAULT) && is_enbaled_by_default)
return true;
return join_algorithm.isSet(val);
}
bool allowParallelHashJoin() const;
bool forceFullSortingMergeJoin() const { return !isSpecialStorage() && join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); }
bool forceHashJoin() const
{
/// HashJoin always used for DictJoin
return dictionary_reader
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::HASH)
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARALLEL_HASH);
}
bool joinUseNulls() const { return join_use_nulls; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Common/checkStackSize.h>
#include <Core/SettingsEnums.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/LogicalExpressionsOptimizer.h>
@ -683,7 +684,7 @@ bool tryJoinOnConst(TableJoin & analyzed_join, ASTPtr & on_expression, ContextPt
else
return false;
if (!analyzed_join.forceHashJoin())
if (!analyzed_join.isEnabledAlgorithm(JoinAlgorithm::HASH))
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"JOIN ON constant ({}) supported only with join algorithm 'hash'",
queryToString(on_expression));
@ -770,7 +771,7 @@ void collectJoinedColumns(TableJoin & analyzed_join, ASTTableJoin & table_join,
data.asofToJoinKeys();
}
if (!analyzed_join.oneDisjunct() && !analyzed_join.forceHashJoin())
if (!analyzed_join.oneDisjunct() && !analyzed_join.isEnabledAlgorithm(JoinAlgorithm::HASH))
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Only `hash` join supports multiple ORs for keys in JOIN ON section");
}
}

View File

@ -47,16 +47,16 @@ SELECT '--- totals';
SELECT rdb.key % 2, sum(k), max(value2) FROM t2 INNER JOIN rdb ON rdb.key == t2.k GROUP BY (rdb.key % 2) WITH TOTALS;
SELECT '---';
SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k; -- { serverError UNSUPPORTED_METHOD }
SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k; -- { serverError UNSUPPORTED_METHOD }
SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k; -- { serverError UNSUPPORTED_METHOD }
SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k; -- { serverError UNSUPPORTED_METHOD }
SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
DROP TABLE IF EXISTS rdb;