Fix FULL JOINs queries

This commit is contained in:
Igor Nikonov 2024-12-02 12:59:01 +00:00
parent 56159ca90d
commit dc9c1439b0
2 changed files with 32 additions and 10 deletions

View File

@ -667,7 +667,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
bool is_single_table_expression, bool is_single_table_expression,
bool wrap_read_columns_in_subquery) bool wrap_read_columns_in_subquery)
{ {
// LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", StackTrace().toString()); LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", StackTrace().toString());
const auto & query_context = planner_context->getQueryContext(); const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef(); const auto & settings = query_context->getSettingsRef();
@ -962,7 +962,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// query_plan can be empty if there is nothing to read /// query_plan can be empty if there is nothing to read
if (query_plan.isInitialized() && parallel_replicas_enabled_for_storage(storage, settings)) if (query_plan.isInitialized() && parallel_replicas_enabled_for_storage(storage, settings))
{ {
const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node) const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node, const QueryTreeNodePtr & table_expression_node)
{ {
const JoinNode * join_node = join_tree_node->as<JoinNode>(); const JoinNode * join_node = join_tree_node->as<JoinNode>();
if (!join_node) if (!join_node)
@ -970,12 +970,25 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
const auto join_kind = join_node->getKind(); const auto join_kind = join_node->getKind();
const auto join_strictness = join_node->getStrictness(); const auto join_strictness = join_node->getStrictness();
if (join_kind == JoinKind::Left || join_kind == JoinKind::Right if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All))
|| (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All)) {
// Current implementation is rely on choosing left table for inner join (1).
// Without the check below right table can be selected for PR execution in a query
// which will lead to incorrect result
// Example: SELECT * FROM t1 FULL JOIN t2 INNER JOIN t3
// Here, joins done in the following order (t1 FULL JOIN t2) INNER JOIN t3
// J1 i.e. (t1 FULL JOIN t2) can't be choosen because FULL JOIN can't be executed with PR
// J1 INNER JOIN t3 shouldn't be parallelized since (1) and t3 is on right side
// To parallelize INNER JOIN, the query it can be rewritten into
// SELECT * FROM t3 INNER JOIN (SELECT * FROM t1 FULL JOIN t2) as j1
if (join_node->getLeftTableExpression() == table_expression_node)
return true;
}
if (join_kind == JoinKind::Right && join_node->getRightTableExpression() == table_expression_node)
return true; return true;
return false; return false;
}(parent_join_tree); }(parent_join_tree, table_expression);
if (query_context->canUseParallelReplicasCustomKey() && query_context->getClientInfo().distributed_depth == 0) if (query_context->canUseParallelReplicasCustomKey() && query_context->getClientInfo().distributed_depth == 0)
{ {
@ -1177,6 +1190,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
subquery_planner_context = planner_context->getGlobalPlannerContext(); subquery_planner_context = planner_context->getGlobalPlannerContext();
auto subquery_options = select_query_options.subquery(); auto subquery_options = select_query_options.subquery();
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "subquery_planner");
Planner subquery_planner(table_expression, subquery_options, subquery_planner_context); Planner subquery_planner(table_expression, subquery_options, subquery_planner_context);
/// Propagate storage limits to subquery /// Propagate storage limits to subquery
subquery_planner.addStorageLimits(*select_query_info.storage_limits); subquery_planner.addStorageLimits(*select_query_info.storage_limits);
@ -1285,17 +1300,19 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
const SelectQueryInfo & select_query_info) const SelectQueryInfo & select_query_info)
{ {
auto & join_node = join_table_expression->as<JoinNode &>(); auto & join_node = join_table_expression->as<JoinNode &>();
if (left_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns) if (left_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns
&& left_join_tree_query_plan.from_stage != QueryProcessingStage::WithMergeableState)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"JOIN {} left table expression expected to process query to fetch columns stage. Actual {}", "JOIN {} left table expression expected to process query to fetch columns or mergeable stage. Actual {}",
join_node.formatASTForErrorMessage(), join_node.formatASTForErrorMessage(),
QueryProcessingStage::toString(left_join_tree_query_plan.from_stage)); QueryProcessingStage::toString(left_join_tree_query_plan.from_stage));
auto left_plan = std::move(left_join_tree_query_plan.query_plan); auto left_plan = std::move(left_join_tree_query_plan.query_plan);
auto left_plan_output_columns = left_plan.getCurrentHeader().getColumnsWithTypeAndName(); auto left_plan_output_columns = left_plan.getCurrentHeader().getColumnsWithTypeAndName();
if (right_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns) if (right_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns
&& right_join_tree_query_plan.from_stage != QueryProcessingStage::WithMergeableState)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"JOIN {} right table expression expected to process query to fetch columns stage. Actual {}", "JOIN {} right table expression expected to process query to fetch columns or mergeable stage. Actual {}",
join_node.formatASTForErrorMessage(), join_node.formatASTForErrorMessage(),
QueryProcessingStage::toString(right_join_tree_query_plan.from_stage)); QueryProcessingStage::toString(right_join_tree_query_plan.from_stage));
@ -1724,6 +1741,8 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
auto & r_mapping = right_join_tree_query_plan.query_node_to_plan_step_mapping; auto & r_mapping = right_join_tree_query_plan.query_node_to_plan_step_mapping;
mapping.insert(r_mapping.begin(), r_mapping.end()); mapping.insert(r_mapping.begin(), r_mapping.end());
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", dumpQueryPlan(result_plan));
return JoinTreeQueryPlan{ return JoinTreeQueryPlan{
.query_plan = std::move(result_plan), .query_plan = std::move(result_plan),
.from_stage = QueryProcessingStage::FetchColumns, .from_stage = QueryProcessingStage::FetchColumns,
@ -1833,6 +1852,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
const ColumnIdentifierSet & outer_scope_columns, const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context) PlannerContextPtr & planner_context)
{ {
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "{}", StackTrace().toString());
const QueryTreeNodePtr & join_tree_node = query_node->as<QueryNode &>().getJoinTree(); const QueryTreeNodePtr & join_tree_node = query_node->as<QueryNode &>().getJoinTree();
auto table_expressions_stack = buildTableExpressionsStack(join_tree_node); auto table_expressions_stack = buildTableExpressionsStack(join_tree_node);
size_t table_expressions_stack_size = table_expressions_stack.size(); size_t table_expressions_stack_size = table_expressions_stack.size();

View File

@ -5611,7 +5611,8 @@ void StorageReplicatedMergeTree::read(
cluster->getName()); cluster->getName());
} }
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); } readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams);
}
void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl( void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl(
QueryPlan & query_plan, QueryPlan & query_plan,