This commit is contained in:
vdimir 2024-12-17 18:52:33 +00:00
parent eb99a17fed
commit 01ff5c1182
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
11 changed files with 123 additions and 24 deletions

View File

@ -78,4 +78,13 @@ const char * toString(JoinTableSide join_table_side)
}
}
JoinKind reverseJoinKind(JoinKind kind)
{
if (kind == JoinKind::Right)
return JoinKind::Left;
if (kind == JoinKind::Left)
return JoinKind::Right;
return kind;
}
}

View File

@ -30,6 +30,8 @@ constexpr bool isInnerOrRight(JoinKind kind) { return kind == JoinKind::Inner ||
constexpr bool isInnerOrLeft(JoinKind kind) { return kind == JoinKind::Inner || kind == JoinKind::Left; }
constexpr bool isPaste(JoinKind kind) { return kind == JoinKind::Paste; }
JoinKind reverseJoinKind(JoinKind kind);
/// Allows more optimal JOIN for typical cases.
enum class JoinStrictness : uint8_t
{

View File

@ -1095,7 +1095,7 @@ size_t TableJoin::getMaxMemoryUsage() const
return max_memory_usage;
}
void TableJoin::swapSides()
void TableJoin::swapSides(JoinKind updated_kind)
{
assertEnableEnalyzer();
@ -1111,10 +1111,7 @@ void TableJoin::swapSides()
std::swap(columns_from_left_table, columns_from_joined_table);
std::swap(result_columns_from_left_table, columns_added_by_join);
if (table_join.kind == JoinKind::Left)
table_join.kind = JoinKind::Right;
else if (table_join.kind == JoinKind::Right)
table_join.kind = JoinKind::Left;
setKind(updated_kind);
}
void TableJoin::assertEnableEnalyzer() const

View File

@ -290,7 +290,7 @@ public:
}
bool allowParallelHashJoin() const;
void swapSides();
void swapSides(JoinKind updated_kind);
bool joinUseNulls() const { return join_use_nulls; }

View File

@ -485,8 +485,15 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
{
table_join->setStorageJoin(storage_);
});
swap_inputs = false;
}
if (join_expression.is_using)
swap_inputs = false;
if (swap_inputs)
join_info.kind = reverseJoinKind(join_info.kind);
join_context.is_asof = join_info.strictness == JoinStrictness::Asof;
join_context.is_using = join_expression.is_using;
@ -503,7 +510,7 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
table_join_clauses.pop_back();
bool can_convert_to_cross = (isInner(join_info.kind) || isCrossOrComma(join_info.kind))
&& join_info.strictness == JoinStrictness::All
&& join_info.expression.disjunctive_conditions.empty();
&& join_expression.disjunctive_conditions.empty();
if (!can_convert_to_cross)
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "No equality condition found in JOIN ON expression {}",
@ -530,12 +537,12 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
if (join_info.strictness == JoinStrictness::Asof)
{
if (!join_info.expression.disjunctive_conditions.empty())
if (!join_expression.disjunctive_conditions.empty())
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "ASOF join does not support multiple disjuncts in JOIN ON expression");
/// Find strictly only one inequality in predicate list for ASOF join
chassert(table_join_clauses.size() == 1);
auto & join_predicates = join_info.expression.condition.predicates;
auto & join_predicates = join_expression.condition.predicates;
bool asof_predicate_found = false;
for (auto & predicate : join_predicates)
{
@ -552,10 +559,10 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
}
if (!asof_predicate_found)
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "ASOF join requires one inequality predicate in JOIN ON expression, in {}",
formatJoinCondition(join_info.expression.condition));
formatJoinCondition(join_expression.condition));
}
for (auto & join_condition : join_info.expression.disjunctive_conditions)
for (auto & join_condition : join_expression.disjunctive_conditions)
{
auto & table_join_clause = table_join_clauses.emplace_back();
addJoinConditionToTableJoin(join_condition, table_join_clause, expression_actions, query_context, join_context);
@ -575,15 +582,15 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
}
JoinActionRef residual_filter_condition(nullptr);
if (join_info.expression.disjunctive_conditions.empty())
if (join_expression.disjunctive_conditions.empty())
{
residual_filter_condition = concatMergeConditions(
join_info.expression.condition.residual_conditions, expression_actions.post_join_actions, query_context);
join_expression.condition.residual_conditions, expression_actions.post_join_actions, query_context);
}
else
{
bool need_residual_filter = !join_info.expression.condition.residual_conditions.empty();
for (const auto & join_condition : join_info.expression.disjunctive_conditions)
bool need_residual_filter = !join_expression.condition.residual_conditions.empty();
for (const auto & join_condition : join_expression.disjunctive_conditions)
{
need_residual_filter = need_residual_filter || !join_condition.residual_conditions.empty();
if (need_residual_filter)
@ -591,7 +598,7 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
}
if (need_residual_filter)
residual_filter_condition = buildSingleActionForJoinExpression(join_info.expression, expression_actions, query_context);
residual_filter_condition = buildSingleActionForJoinExpression(join_expression, expression_actions, query_context);
}
if (residual_filter_condition && canPushDownFromOn(join_info))
@ -656,6 +663,12 @@ JoinPtr JoinStepLogical::convertToPhysical(JoinActionRef & left_filter, JoinActi
Block left_sample_block = blockWithColumns(expression_actions.left_pre_join_actions.getResultColumns());
Block right_sample_block = blockWithColumns(expression_actions.right_pre_join_actions.getResultColumns());
if (swap_inputs)
{
table_join->swapSides(join_info.kind);
std::swap(left_sample_block, right_sample_block);
}
auto join_algorithm_ptr = chooseJoinAlgorithm(
table_join,
prepared_join_storage,

View File

@ -66,6 +66,9 @@ public:
const JoinInfo & getJoinInfo() const { return join_info; }
JoinInfo & getJoinInfo() { return join_info; }
void setSwapInputs() { swap_inputs = true; }
bool areInputsSwapped() const { return swap_inputs; }
JoinPtr convertToPhysical(JoinActionRef & left_filter, JoinActionRef & right_filter, JoinActionRef & post_filter, bool is_explain_logical);
JoinExpressionActions & getExpressionActions() { return expression_actions; }
@ -78,6 +81,7 @@ protected:
JoinExpressionActions expression_actions;
JoinInfo join_info;
bool swap_inputs = false;
Names required_output_columns;
ContextPtr query_context;

View File

@ -122,6 +122,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings &);
bool optimizeJoinLogical(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings &);
bool convertLogicalJoinToPhysical(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings & optimization_settings);
void optimizeDistinctInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);

View File

@ -31,6 +31,7 @@ namespace Setting
extern const SettingsBool query_plan_remove_redundant_sorting;
extern const SettingsBool optimize_sorting_by_input_stream_properties;
extern const SettingsBool query_plan_reuse_storage_ordering_for_window_functions;
extern const SettingsBoolAuto query_plan_join_swap_table;
extern const SettingsBool query_plan_split_filter;
}
@ -82,6 +83,8 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.force_projection_name = settings.optimize_projection ? from[Setting::force_optimize_projection_name].value : "";
settings.optimize_use_implicit_projections = settings.optimize_projection && from[Setting::optimize_use_implicit_projections];
settings.join_swap_table = from[Setting::query_plan_join_swap_table].get();
return settings;
}

View File

@ -40,6 +40,11 @@ struct QueryPlanOptimizationSettings
/// If convert OUTER JOIN to INNER JOIN optimization is enabled.
bool convert_outer_join_to_inner_join = true;
/// If we can swap probe/build tables in join
/// true/false - always/never swap
/// nullopt - swap if it's beneficial
std::optional<bool> join_swap_table = std::nullopt;
/// If reorder-functions-after-sorting optimization is enabled.
bool execute_functions_after_sorting = true;

View File

@ -35,11 +35,20 @@ namespace DB::Setting
namespace DB::QueryPlanOptimizations
{
static std::optional<UInt64> estimateReadRowsCount(QueryPlan::Node & node, bool has_filter = false)
static std::optional<UInt64> estimateReadRowsCount(QueryPlan::Node & node, bool has_filter, Stack & steps_stack)
{
IQueryPlanStep * step = node.step.get();
steps_stack.push_back({.node = &node});
SCOPE_EXIT({ steps_stack.pop_back(); });
constexpr size_t max_chain_length = 64;
if (steps_stack.size() >= max_chain_length)
return {};
if (const auto * reading = typeid_cast<const ReadFromMergeTree *>(step))
{
optimizePrimaryKeyConditionAndLimit(steps_stack);
ReadFromMergeTree::AnalysisResultPtr analyzed_result = nullptr;
analyzed_result = analyzed_result ? analyzed_result : reading->getAnalyzedResult();
analyzed_result = analyzed_result ? analyzed_result : reading->selectRangesToRead();
@ -52,7 +61,9 @@ static std::optional<UInt64> estimateReadRowsCount(QueryPlan::Node & node, bool
is_filtered_by_index = is_filtered_by_index
|| (idx_stat.type == ReadFromMergeTree::IndexType::PrimaryKey && !idx_stat.used_keys.empty())
|| idx_stat.type == ReadFromMergeTree::IndexType::Skip
|| idx_stat.type == ReadFromMergeTree::IndexType::MinMax;
|| idx_stat.type == ReadFromMergeTree::IndexType::MinMax
|| idx_stat.type == ReadFromMergeTree::IndexType::Partition;
if (is_filtered_by_index)
break;
}
@ -73,13 +84,19 @@ static std::optional<UInt64> estimateReadRowsCount(QueryPlan::Node & node, bool
return {};
if (typeid_cast<ExpressionStep *>(step))
return estimateReadRowsCount(*node.children.front(), has_filter);
return estimateReadRowsCount(*node.children.front(), has_filter, steps_stack);
if (typeid_cast<FilterStep *>(step))
return estimateReadRowsCount(*node.children.front(), true);
return estimateReadRowsCount(*node.children.front(), true, steps_stack);
return {};
}
static std::optional<UInt64> estimateReadRowsCount(QueryPlan::Node & node)
{
Stack steps_stack;
return estimateReadRowsCount(node, false, steps_stack);
}
bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings &)
{
auto * join_step = typeid_cast<JoinStep *>(node.step.get());
@ -107,7 +124,7 @@ bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryP
{
auto lhs_extimation = estimateReadRowsCount(*node.children[0]);
auto rhs_extimation = estimateReadRowsCount(*node.children[1]);
LOG_TRACE(getLogger("optimizeJoin"), "Left table estimation: {}, right table estimation: {}",
LOG_TRACE(getLogger("optimizeJoinLegacy"), "Left table estimation: {}, right table estimation: {}",
lhs_extimation.transform(toString<UInt64>).value_or("unknown"),
rhs_extimation.transform(toString<UInt64>).value_or("unknown"));
@ -130,7 +147,8 @@ bool optimizeJoinLegacy(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryP
const auto & right_stream_input_header = headers.back();
auto updated_table_join = std::make_shared<TableJoin>(table_join);
updated_table_join->swapSides();
auto updated_kind = reverseJoinKind(table_join.kind());
updated_table_join->swapSides(updated_kind);
auto updated_join = join->clone(updated_table_join, right_stream_input_header, left_stream_input_header);
join_step->setJoin(std::move(updated_join), /* swap_streams= */ true);
return true;
@ -257,6 +275,9 @@ bool convertLogicalJoinToPhysical(QueryPlan::Node & node, QueryPlan::Nodes & nod
if (node.children.size() >= 2)
new_right_node = makeExpressionNodeOnTopOf(node.children.at(1), std::move(join_expression_actions.right_pre_join_actions), right_filter.column_name, nodes);
if (join_step->areInputsSwapped() && new_right_node)
std::swap(new_left_node, new_right_node);
const auto & settings = join_step->getContext()->getSettingsRef();
auto & new_join_node = nodes.emplace_back();
@ -304,4 +325,47 @@ bool convertLogicalJoinToPhysical(QueryPlan::Node & node, QueryPlan::Nodes & nod
return true;
}
bool optimizeJoinLogical(QueryPlan::Node & node, QueryPlan::Nodes &, const QueryPlanOptimizationSettings & optimization_settings)
{
auto * join_step = typeid_cast<JoinStepLogical *>(node.step.get());
if (!join_step)
return false;
if (join_step->hasPreparedJoinStorage())
return false;
if (node.children.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStepLogical should have exactly 2 children, but has {}", node.children.size());
bool need_swap = false;
if (!optimization_settings.join_swap_table.has_value())
{
auto lhs_extimation = estimateReadRowsCount(*node.children[0]);
auto rhs_extimation = estimateReadRowsCount(*node.children[1]);
LOG_TRACE(getLogger("optimizeJoin"), "Left table estimation: {}, right table estimation: {}",
lhs_extimation.transform(toString<UInt64>).value_or("unknown"),
rhs_extimation.transform(toString<UInt64>).value_or("unknown"));
if (lhs_extimation && rhs_extimation && *lhs_extimation < *rhs_extimation)
need_swap = true;
}
else if (optimization_settings.join_swap_table.value())
{
need_swap = true;
}
if (!need_swap)
return false;
/// fixme: USING clause handled specially in join algorithm, so swap breaks it
/// fixme: Swapping for SEMI and ANTI joins should be alright, need to try to enable it and test
const auto & join_info = join_step->getJoinInfo();
if (join_info.expression.is_using || join_info.strictness != JoinStrictness::All)
return true;
join_step->setSwapInputs();
return true;
}
}

View File

@ -472,8 +472,9 @@ void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_sett
QueryPlanOptimizations::optimizeTreeFirstPass(optimization_settings, *root, nodes);
bool has_optimized_join = QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::convertLogicalJoinToPhysical);
if (!has_optimized_join)
QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::optimizeJoinLogical);
bool has_join_logical = QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::convertLogicalJoinToPhysical);
if (!has_join_logical)
QueryPlanOptimizations::optimizeTreeWithDFS(optimization_settings, *root, nodes, QueryPlanOptimizations::optimizeJoinLegacy);
QueryPlanOptimizations::optimizeTreeSecondPass(optimization_settings, *root, nodes);