From dc9c1439b02c3435a4130be9ed11a43296f0c9e8 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Mon, 2 Dec 2024 12:59:01 +0000 Subject: [PATCH] Fix FULL JOINs queries --- src/Planner/PlannerJoinTree.cpp | 39 ++++++++++++++++----- src/Storages/StorageReplicatedMergeTree.cpp | 3 +- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index b02ec653403..06e3ade80f2 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -667,7 +667,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres bool is_single_table_expression, bool wrap_read_columns_in_subquery) { - // LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", StackTrace().toString()); + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", StackTrace().toString()); const auto & query_context = planner_context->getQueryContext(); const auto & settings = query_context->getSettingsRef(); @@ -962,7 +962,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// query_plan can be empty if there is nothing to read if (query_plan.isInitialized() && parallel_replicas_enabled_for_storage(storage, settings)) { - const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node) + const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node, const QueryTreeNodePtr & table_expression_node) { const JoinNode * join_node = join_tree_node->as(); if (!join_node) @@ -970,12 +970,25 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres const auto join_kind = join_node->getKind(); const auto join_strictness = join_node->getStrictness(); - if (join_kind == JoinKind::Left || join_kind == JoinKind::Right - || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All)) + if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All)) + { + // Current implementation is rely on choosing left table for inner join (1). + // Without the check below right table can be selected for PR execution in a query + // which will lead to incorrect result + // Example: SELECT * FROM t1 FULL JOIN t2 INNER JOIN t3 + // Here, joins done in the following order (t1 FULL JOIN t2) INNER JOIN t3 + // J1 i.e. (t1 FULL JOIN t2) can't be choosen because FULL JOIN can't be executed with PR + // J1 INNER JOIN t3 shouldn't be parallelized since (1) and t3 is on right side + // To parallelize INNER JOIN, the query it can be rewritten into + // SELECT * FROM t3 INNER JOIN (SELECT * FROM t1 FULL JOIN t2) as j1 + if (join_node->getLeftTableExpression() == table_expression_node) + return true; + } + if (join_kind == JoinKind::Right && join_node->getRightTableExpression() == table_expression_node) return true; return false; - }(parent_join_tree); + }(parent_join_tree, table_expression); if (query_context->canUseParallelReplicasCustomKey() && query_context->getClientInfo().distributed_depth == 0) { @@ -1177,6 +1190,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres subquery_planner_context = planner_context->getGlobalPlannerContext(); auto subquery_options = select_query_options.subquery(); + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "subquery_planner"); + Planner subquery_planner(table_expression, subquery_options, subquery_planner_context); /// Propagate storage limits to subquery subquery_planner.addStorageLimits(*select_query_info.storage_limits); @@ -1285,17 +1300,19 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode( const SelectQueryInfo & select_query_info) { auto & join_node = join_table_expression->as(); - if (left_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns) + if (left_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns + && left_join_tree_query_plan.from_stage != QueryProcessingStage::WithMergeableState) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, - "JOIN {} left table expression expected to process query to fetch columns stage. Actual {}", + "JOIN {} left table expression expected to process query to fetch columns or mergeable stage. Actual {}", join_node.formatASTForErrorMessage(), QueryProcessingStage::toString(left_join_tree_query_plan.from_stage)); auto left_plan = std::move(left_join_tree_query_plan.query_plan); auto left_plan_output_columns = left_plan.getCurrentHeader().getColumnsWithTypeAndName(); - if (right_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns) + if (right_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns + && right_join_tree_query_plan.from_stage != QueryProcessingStage::WithMergeableState) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, - "JOIN {} right table expression expected to process query to fetch columns stage. Actual {}", + "JOIN {} right table expression expected to process query to fetch columns or mergeable stage. Actual {}", join_node.formatASTForErrorMessage(), QueryProcessingStage::toString(right_join_tree_query_plan.from_stage)); @@ -1724,6 +1741,8 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode( auto & r_mapping = right_join_tree_query_plan.query_node_to_plan_step_mapping; mapping.insert(r_mapping.begin(), r_mapping.end()); + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", dumpQueryPlan(result_plan)); + return JoinTreeQueryPlan{ .query_plan = std::move(result_plan), .from_stage = QueryProcessingStage::FetchColumns, @@ -1833,6 +1852,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node, const ColumnIdentifierSet & outer_scope_columns, PlannerContextPtr & planner_context) { + LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "{}", StackTrace().toString()); + const QueryTreeNodePtr & join_tree_node = query_node->as().getJoinTree(); auto table_expressions_stack = buildTableExpressionsStack(join_tree_node); size_t table_expressions_stack_size = table_expressions_stack.size(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index bd476625081..daa8068367d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5611,7 +5611,8 @@ void StorageReplicatedMergeTree::read( cluster->getName()); } - readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); } + readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); +} void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl( QueryPlan & query_plan,