Fix crash with JOINs

This commit is contained in:
Igor Nikonov 2024-05-29 20:57:16 +00:00
parent 40aab93db1
commit e8a1a800dc
2 changed files with 12 additions and 5 deletions

View File

@ -405,7 +405,7 @@ void executeQueryWithParallelReplicas(
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits,
QueryPlanStepPtr read_from_merge_tree)
QueryPlanStepPtr analyzed_read_from_merge_tree)
{
auto logger = getLogger("executeQueryWithParallelReplicas");
LOG_DEBUG(logger, "Executing read from {}, header {}, query ({}), stage {} with parallel replicas",
@ -519,7 +519,7 @@ void executeQueryWithParallelReplicas(
new_context,
processed_stage,
coordinator,
std::move(read_from_merge_tree),
std::move(analyzed_read_from_merge_tree),
/*has_missing_objects=*/false);
DataStreams input_streams;
@ -563,7 +563,7 @@ void executeQueryWithParallelReplicas(
const PlannerContextPtr & planner_context,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits,
QueryPlanStepPtr read_from_merge_tree)
QueryPlanStepPtr analyzed_read_from_merge_tree)
{
QueryTreeNodePtr modified_query_tree = query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree, context);
@ -574,7 +574,7 @@ void executeQueryWithParallelReplicas(
auto modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
executeQueryWithParallelReplicas(
query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits, std::move(read_from_merge_tree));
query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits, std::move(analyzed_read_from_merge_tree));
}
void executeQueryWithParallelReplicas(

View File

@ -273,7 +273,14 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
}
chassert(reading);
const auto * analyzed_merge_tree = typeid_cast<const ReadFromMergeTree *>(read_from_merge_tree.get());
if (!read_from_merge_tree)
read_from_merge_tree = std::move(node->step);
auto * analyzed_merge_tree = typeid_cast<ReadFromMergeTree *>(read_from_merge_tree.get());
/// if no analysis is done yet, let's do it (happens with JOINs)
if (!analyzed_merge_tree->hasAnalyzedResult())
analyzed_merge_tree->setAnalyzedResult(analyzed_merge_tree->selectRangesToRead());
chassert(analyzed_merge_tree->hasAnalyzedResult());
CoordinationMode mode = CoordinationMode::Default;