From 01ff5c1182b4ee62ecf5b5797dd532a63b766302 Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 17 Dec 2024 18:52:33 +0000 Subject: [PATCH] swap --- src/Core/Joins.cpp | 9 +++ src/Core/Joins.h | 2 + src/Interpreters/TableJoin.cpp | 7 +- src/Interpreters/TableJoin.h | 2 +- src/Processors/QueryPlan/JoinStepLogical.cpp | 33 +++++--- src/Processors/QueryPlan/JoinStepLogical.h | 4 + .../QueryPlan/Optimizations/Optimizations.h | 1 + .../QueryPlanOptimizationSettings.cpp | 3 + .../QueryPlanOptimizationSettings.h | 5 ++ .../QueryPlan/Optimizations/optimizeJoin.cpp | 76 +++++++++++++++++-- src/Processors/QueryPlan/QueryPlan.cpp | 5 +- 11 files changed, 123 insertions(+), 24 deletions(-) diff --git a/src/Core/Joins.cpp b/src/Core/Joins.cpp index 77568223d71..62ae7ce5f86 100644 --- a/src/Core/Joins.cpp +++ b/src/Core/Joins.cpp @@ -78,4 +78,13 @@ const char * toString(JoinTableSide join_table_side) } } +JoinKind reverseJoinKind(JoinKind kind) +{ + if (kind == JoinKind::Right) + return JoinKind::Left; + if (kind == JoinKind::Left) + return JoinKind::Right; + return kind; +} + } diff --git a/src/Core/Joins.h b/src/Core/Joins.h index 0964bf86e6b..25fc8ee4e9a 100644 --- a/src/Core/Joins.h +++ b/src/Core/Joins.h @@ -30,6 +30,8 @@ constexpr bool isInnerOrRight(JoinKind kind) { return kind == JoinKind::Inner || constexpr bool isInnerOrLeft(JoinKind kind) { return kind == JoinKind::Inner || kind == JoinKind::Left; } constexpr bool isPaste(JoinKind kind) { return kind == JoinKind::Paste; } +JoinKind reverseJoinKind(JoinKind kind); + /// Allows more optimal JOIN for typical cases. enum class JoinStrictness : uint8_t { diff --git a/src/Interpreters/TableJoin.cpp b/src/Interpreters/TableJoin.cpp index abad818e83c..d79cad38722 100644 --- a/src/Interpreters/TableJoin.cpp +++ b/src/Interpreters/TableJoin.cpp @@ -1095,7 +1095,7 @@ size_t TableJoin::getMaxMemoryUsage() const return max_memory_usage; } -void TableJoin::swapSides() +void TableJoin::swapSides(JoinKind updated_kind) { assertEnableEnalyzer(); @@ -1111,10 +1111,7 @@ void TableJoin::swapSides() std::swap(columns_from_left_table, columns_from_joined_table); std::swap(result_columns_from_left_table, columns_added_by_join); - if (table_join.kind == JoinKind::Left) - table_join.kind = JoinKind::Right; - else if (table_join.kind == JoinKind::Right) - table_join.kind = JoinKind::Left; + setKind(updated_kind); } void TableJoin::assertEnableEnalyzer() const diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 9c15b6c0a17..ee4c458bf83 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -290,7 +290,7 @@ public: } bool allowParallelHashJoin() const; - void swapSides(); + void swapSides(JoinKind updated_kind); bool joinUseNulls() const { return join_use_nulls; } diff --git a/src/Processors/QueryPlan/JoinStepLogical.cpp b/src/Processors/QueryPlan/JoinStepLogical.cpp index 5449ed4541e..c0900c5f6f4 100644 --- a/src/Processors/QueryPlan/JoinStepLogical.cpp +++ b/src/Processors/QueryPlan/JoinStepLogical.cpp @@ -485,8 +485,15 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi { table_join->setStorageJoin(storage_); }); + swap_inputs = false; } + if (join_expression.is_using) + swap_inputs = false; + + if (swap_inputs) + join_info.kind = reverseJoinKind(join_info.kind); + join_context.is_asof = join_info.strictness == JoinStrictness::Asof; join_context.is_using = join_expression.is_using; @@ -503,7 +510,7 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi table_join_clauses.pop_back(); bool can_convert_to_cross = (isInner(join_info.kind) || isCrossOrComma(join_info.kind)) && join_info.strictness == JoinStrictness::All - && join_info.expression.disjunctive_conditions.empty(); + && join_expression.disjunctive_conditions.empty(); if (!can_convert_to_cross) throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "No equality condition found in JOIN ON expression {}", @@ -530,12 +537,12 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi if (join_info.strictness == JoinStrictness::Asof) { - if (!join_info.expression.disjunctive_conditions.empty()) + if (!join_expression.disjunctive_conditions.empty()) throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "ASOF join does not support multiple disjuncts in JOIN ON expression"); /// Find strictly only one inequality in predicate list for ASOF join chassert(table_join_clauses.size() == 1); - auto & join_predicates = join_info.expression.condition.predicates; + auto & join_predicates = join_expression.condition.predicates; bool asof_predicate_found = false; for (auto & predicate : join_predicates) { @@ -552,10 +559,10 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi } if (!asof_predicate_found) throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "ASOF join requires one inequality predicate in JOIN ON expression, in {}", - formatJoinCondition(join_info.expression.condition)); + formatJoinCondition(join_expression.condition)); } - for (auto & join_condition : join_info.expression.disjunctive_conditions) + for (auto & join_condition : join_expression.disjunctive_conditions) { auto & table_join_clause = table_join_clauses.emplace_back(); addJoinConditionToTableJoin(join_condition, table_join_clause, expression_actions, query_context, join_context); @@ -575,15 +582,15 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi } JoinActionRef residual_filter_condition(nullptr); - if (join_info.expression.disjunctive_conditions.empty()) + if (join_expression.disjunctive_conditions.empty()) { residual_filter_condition = concatMergeConditions( - join_info.expression.condition.residual_conditions, expression_actions.post_join_actions, query_context); + join_expression.condition.residual_conditions, expression_actions.post_join_actions, query_context); } else { - bool need_residual_filter = !join_info.expression.condition.residual_conditions.empty(); - for (const auto & join_condition : join_info.expression.disjunctive_conditions) + bool need_residual_filter = !join_expression.condition.residual_conditions.empty(); + for (const auto & join_condition : join_expression.disjunctive_conditions) { need_residual_filter = need_residual_filter || !join_condition.residual_conditions.empty(); if (need_residual_filter) @@ -591,7 +598,7 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi } if (need_residual_filter) - residual_filter_condition = buildSingleActionForJoinExpression(join_info.expression, expression_actions, query_context); + residual_filter_condition = buildSingleActionForJoinExpression(join_expression, expression_actions, query_context); } if (residual_filter_condition && canPushDownFromOn(join_info)) @@ -656,6 +663,12 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi Block left_sample_block = blockWithColumns(expression_actions.left_pre_join_actions.getResultColumns()); Block right_sample_block = blockWithColumns(expression_actions.right_pre_join_actions.getResultColumns()); + if (swap_inputs) + { + table_join->swapSides(join_info.kind); + std::swap(left_sample_block, right_sample_block); + } + auto join_algorithm_ptr = chooseJoinAlgorithm( table_join, prepared_join_storage, diff --git a/src/Processors/QueryPlan/JoinStepLogical.h b/src/Processors/QueryPlan/JoinStepLogical.h index ad6dce41008..02900d03b1a 100644 --- a/src/Processors/QueryPlan/JoinStepLogical.h +++ b/src/Processors/QueryPlan/JoinStepLogical.h @@ -66,6 +66,9 @@ public: const JoinInfo & getJoinInfo() const { return join_info; } JoinInfo & getJoinInfo() { return join_info; } + void setSwapInputs() { swap_inputs = true; } + bool areInputsSwapped() const { return swap_inputs; } + JoinPtr convertToPhysical(JoinActionRef & left_filter, JoinActionRef & right_filter, JoinActionRef & post_filter, bool is_explain_logical); JoinExpressionActions & getExpressionActions() { return expression_actions; } @@ -78,6 +81,7 @@ protected: JoinExpressionActions expression_actions; JoinInfo join_info; + bool swap_inputs = false; Names required_output_columns; ContextPtr query_context; diff --git a/src/Processors/QueryPlan/Optimizations/Optimizations.h b/src/Processors/QueryPlan/Optimizations/Optimizations.h index ce79c1f545c..1113f2e2a5b 100644 --- a/src/Processors/QueryPlan/Optimizations/Optimizations.h +++ b/src/Processors/QueryPlan/Optimizations/Optimizations.h @@ -122,6 +122,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes); void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes); void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &); bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings &); +bool optimizeJoinLogical(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings &); bool convertLogicalJoinToPhysical(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings & optimization_settings); void optimizeDistinctInOrder(QueryPlan::Node & node, QueryPlan::Nodes &); diff --git a/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp b/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp index df3b170d002..387450d0a8b 100644 --- a/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp +++ b/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp @@ -31,6 +31,7 @@ namespace Setting extern const SettingsBool query_plan_remove_redundant_sorting; extern const SettingsBool optimize_sorting_by_input_stream_properties; extern const SettingsBool query_plan_reuse_storage_ordering_for_window_functions; + extern const SettingsBoolAuto query_plan_join_swap_table; extern const SettingsBool query_plan_split_filter; } @@ -82,6 +83,8 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const settings.force_projection_name = settings.optimize_projection ? from[Setting::force_optimize_projection_name].value : ""; settings.optimize_use_implicit_projections = settings.optimize_projection && from[Setting::optimize_use_implicit_projections]; + settings.join_swap_table = from[Setting::query_plan_join_swap_table].get(); + return settings; } diff --git a/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h b/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h index 4a25779dab0..5ba1f91e7aa 100644 --- a/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h +++ b/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h @@ -40,6 +40,11 @@ struct QueryPlanOptimizationSettings /// If convert OUTER JOIN to INNER JOIN optimization is enabled. bool convert_outer_join_to_inner_join = true; + /// If we can swap probe/build tables in join + /// true/false - always/never swap + /// nullopt - swap if it's beneficial + std::optional join_swap_table = std::nullopt; + /// If reorder-functions-after-sorting optimization is enabled. bool execute_functions_after_sorting = true; diff --git a/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp b/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp index 46f79fa8c39..4e3e4b576f2 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp @@ -35,11 +35,20 @@ namespace DB::Setting namespace DB::QueryPlanOptimizations { -static std::optional estimateReadRowsCount(QueryPlan::Node & node, bool has_filter = false) +static std::optional estimateReadRowsCount(QueryPlan::Node & node, bool has_filter, Stack & steps_stack) { IQueryPlanStep * step = node.step.get(); + steps_stack.push_back({.node = &node}); + SCOPE_EXIT({ steps_stack.pop_back(); }); + + constexpr size_t max_chain_length = 64; + if (steps_stack.size() >= max_chain_length) + return {}; + if (const auto * reading = typeid_cast(step)) { + optimizePrimaryKeyConditionAndLimit(steps_stack); + ReadFromMergeTree::AnalysisResultPtr analyzed_result = nullptr; analyzed_result = analyzed_result ? analyzed_result : reading->getAnalyzedResult(); analyzed_result = analyzed_result ? analyzed_result : reading->selectRangesToRead(); @@ -52,7 +61,9 @@ static std::optional estimateReadRowsCount(QueryPlan::Node & node, bool is_filtered_by_index = is_filtered_by_index || (idx_stat.type == ReadFromMergeTree::IndexType::PrimaryKey && !idx_stat.used_keys.empty()) || idx_stat.type == ReadFromMergeTree::IndexType::Skip - || idx_stat.type == ReadFromMergeTree::IndexType::MinMax; + || idx_stat.type == ReadFromMergeTree::IndexType::MinMax + || idx_stat.type == ReadFromMergeTree::IndexType::Partition; + if (is_filtered_by_index) break; } @@ -73,13 +84,19 @@ static std::optional estimateReadRowsCount(QueryPlan::Node & node, bool return {}; if (typeid_cast(step)) - return estimateReadRowsCount(*node.children.front(), has_filter); + return estimateReadRowsCount(*node.children.front(), has_filter, steps_stack); if (typeid_cast(step)) - return estimateReadRowsCount(*node.children.front(), true); + return estimateReadRowsCount(*node.children.front(), true, steps_stack); return {}; } +static std::optional estimateReadRowsCount(QueryPlan::Node & node) +{ + Stack steps_stack; + return estimateReadRowsCount(node, false, steps_stack); +} + bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings &) { auto * join_step = typeid_cast(node.step.get()); @@ -107,7 +124,7 @@ bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryP { auto lhs_extimation = estimateReadRowsCount(*node.children[0]); auto rhs_extimation = estimateReadRowsCount(*node.children[1]); - LOG_TRACE(getLogger("optimizeJoin"), "Left table estimation: {}, right table estimation: {}", + LOG_TRACE(getLogger("optimizeJoinLegacy"), "Left table estimation: {}, right table estimation: {}", lhs_extimation.transform(toString).value_or("unknown"), rhs_extimation.transform(toString).value_or("unknown")); @@ -130,7 +147,8 @@ bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryP const auto & right_stream_input_header = headers.back(); auto updated_table_join = std::make_shared(table_join); - updated_table_join->swapSides(); + auto updated_kind = reverseJoinKind(table_join.kind()); + updated_table_join->swapSides(updated_kind); auto updated_join = join->clone(updated_table_join, right_stream_input_header, left_stream_input_header); join_step->setJoin(std::move(updated_join), /* swap_streams= */ true); return true; @@ -257,6 +275,9 @@ bool convertLogicalJoinToPhysical(QueryPlan::Node & node, QueryPlan::Nodes & nod if (node.children.size() >= 2) new_right_node = makeExpressionNodeOnTopOf(node.children.at(1), std::move(join_expression_actions.right_pre_join_actions), right_filter.column_name, nodes); + if (join_step->areInputsSwapped() && new_right_node) + std::swap(new_left_node, new_right_node); + const auto & settings = join_step->getContext()->getSettingsRef(); auto & new_join_node = nodes.emplace_back(); @@ -304,4 +325,47 @@ bool convertLogicalJoinToPhysical(QueryPlan::Node & node, QueryPlan::Nodes & nod return true; } +bool optimizeJoinLogical(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings & optimization_settings) +{ + auto * join_step = typeid_cast(node.step.get()); + if (!join_step) + return false; + + if (join_step->hasPreparedJoinStorage()) + return false; + + if (node.children.size() != 2) + throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStepLogical should have exactly 2 children, but has {}", node.children.size()); + + bool need_swap = false; + if (!optimization_settings.join_swap_table.has_value()) + { + auto lhs_extimation = estimateReadRowsCount(*node.children[0]); + auto rhs_extimation = estimateReadRowsCount(*node.children[1]); + LOG_TRACE(getLogger("optimizeJoin"), "Left table estimation: {}, right table estimation: {}", + lhs_extimation.transform(toString).value_or("unknown"), + rhs_extimation.transform(toString).value_or("unknown")); + + if (lhs_extimation && rhs_extimation && *lhs_extimation < *rhs_extimation) + need_swap = true; + } + else if (optimization_settings.join_swap_table.value()) + { + need_swap = true; + } + + if (!need_swap) + return false; + + /// fixme: USING clause handled specially in join algorithm, so swap breaks it + /// fixme: Swapping for SEMI and ANTI joins should be alright, need to try to enable it and test + const auto & join_info = join_step->getJoinInfo(); + if (join_info.expression.is_using || join_info.strictness != JoinStrictness::All) + return true; + + join_step->setSwapInputs(); + + return true; +} + } diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index a1979510fd2..831ca2d8957 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -472,8 +472,9 @@ void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_sett QueryPlanOptimizations::optimizeTreeFirstPass(optimization_settings, *root, nodes); - bool has_optimized_join = QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::convertLogicalJoinToPhysical); - if (!has_optimized_join) + QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::optimizeJoinLogical); + bool has_join_logical = QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::convertLogicalJoinToPhysical); + if (!has_join_logical) QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::optimizeJoinLegacy); QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes);