From e8a1a800dcdc44001c6c9f08f78390df9b3133e3 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 29 May 2024 20:57:16 +0000 Subject: [PATCH] Fix crash with JOINs --- src/Interpreters/ClusterProxy/executeQuery.cpp | 8 ++++---- src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp | 9 ++++++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 464ce2ec586..0eeae112062 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -405,7 +405,7 @@ void executeQueryWithParallelReplicas( const ASTPtr & query_ast, ContextPtr context, std::shared_ptr storage_limits, - QueryPlanStepPtr read_from_merge_tree) + QueryPlanStepPtr analyzed_read_from_merge_tree) { auto logger = getLogger("executeQueryWithParallelReplicas"); LOG_DEBUG(logger, "Executing read from {}, header {}, query ({}), stage {} with parallel replicas", @@ -519,7 +519,7 @@ void executeQueryWithParallelReplicas( new_context, processed_stage, coordinator, - std::move(read_from_merge_tree), + std::move(analyzed_read_from_merge_tree), /*has_missing_objects=*/false); DataStreams input_streams; @@ -563,7 +563,7 @@ void executeQueryWithParallelReplicas( const PlannerContextPtr & planner_context, ContextPtr context, std::shared_ptr storage_limits, - QueryPlanStepPtr read_from_merge_tree) + QueryPlanStepPtr analyzed_read_from_merge_tree) { QueryTreeNodePtr modified_query_tree = query_tree->clone(); rewriteJoinToGlobalJoin(modified_query_tree, context); @@ -574,7 +574,7 @@ void executeQueryWithParallelReplicas( auto modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree); executeQueryWithParallelReplicas( - query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits, std::move(read_from_merge_tree)); + query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits, std::move(analyzed_read_from_merge_tree)); } void executeQueryWithParallelReplicas( diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 48184cb2b5f..e847a0ece26 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -273,7 +273,14 @@ std::unique_ptr createLocalPlanForParallelReplicas( } chassert(reading); - const auto * analyzed_merge_tree = typeid_cast(read_from_merge_tree.get()); + if (!read_from_merge_tree) + read_from_merge_tree = std::move(node->step); + + auto * analyzed_merge_tree = typeid_cast(read_from_merge_tree.get()); + /// if no analysis is done yet, let's do it (happens with JOINs) + if (!analyzed_merge_tree->hasAnalyzedResult()) + analyzed_merge_tree->setAnalyzedResult(analyzed_merge_tree->selectRangesToRead()); + chassert(analyzed_merge_tree->hasAnalyzedResult()); CoordinationMode mode = CoordinationMode::Default;