Fix right join - disabling PR lead to dup result

This commit is contained in:
Igor Nikonov 2024-10-28 23:25:38 +00:00
parent c952d9d815
commit b03a296542

View File

@ -665,11 +665,15 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
bool is_single_table_expression,
bool wrap_read_columns_in_subquery)
{
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "table_expression:\n{}", table_expression->dumpTree());
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
LOG_DEBUG(
getLogger(__PRETTY_FUNCTION__),
"pr_enabled={} table_expression:\n{}",
settings[Setting::allow_experimental_parallel_reading_from_replicas].toString(),
table_expression->dumpTree());
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
QueryProcessingStage::Enum from_stage = QueryProcessingStage::Enum::FetchColumns;
@ -914,11 +918,11 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// It is just a safety check needed until we have a proper sending plan to replicas.
/// If we have a non-trivial storage like View it might create its own Planner inside read(), run findTableForParallelReplicas()
/// and find some other table that might be used for reading with parallel replicas. It will lead to errors.
const bool other_table_already_chosen_for_reading_with_parallel_replicas
= planner_context->getGlobalPlannerContext()->parallel_replicas_table
&& !table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas;
if (other_table_already_chosen_for_reading_with_parallel_replicas)
planner_context->getMutableQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
// const bool other_table_already_chosen_for_reading_with_parallel_replicas
// = planner_context->getGlobalPlannerContext()->parallel_replicas_table
// && !table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas;
// if (other_table_already_chosen_for_reading_with_parallel_replicas)
// planner_context->getMutableQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
storage->read(
query_plan,
@ -930,6 +934,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
max_block_size,
max_streams);
LOG_DEBUG(getLogger("dumpQueryPlan"), "\n{}", dumpQueryPlan(query_plan));
auto parallel_replicas_enabled_for_storage = [](const StoragePtr & table, const Settings & query_settings)
{
if (!table->isMergeTree())
@ -1249,6 +1255,8 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
{
// LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "Join expression: {}", join_table_expression->dumpTree());
auto & join_node = join_table_expression->as<JoinNode &>();
if (left_join_tree_query_plan.from_stage != QueryProcessingStage::FetchColumns)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
@ -1921,6 +1929,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
"Expected 1 query plan for JOIN TREE. Actual {}",
query_plans_stack.size());
// LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "JOIN query plan:\n{}", dumpQueryPlan(query_plans_stack.back().query_plan));
return std::move(query_plans_stack.back());
}