From f8d4aabfe0df44638184017ee90bfbe63ee79e90 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Thu, 6 Jun 2024 14:28:17 +0000 Subject: [PATCH] Initiliaze working set on pipelie initialization, right after analysis --- src/Planner/PlannerJoinTree.cpp | 8 ++-- .../QueryPlan/ParallelReplicasLocalPlan.cpp | 39 ++++--------------- .../QueryPlan/ReadFromMergeTree.cpp | 39 +++++++++++++++---- src/Processors/QueryPlan/ReadFromMergeTree.h | 6 ++- 4 files changed, 48 insertions(+), 44 deletions(-) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 25206763a40..26e78ea69ac 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -908,13 +908,13 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres chassert(reading); - auto result_ptr = reading->selectRangesToRead(); - UInt64 rows_to_read = result_ptr->selected_rows; - reading->setAnalyzedResult(std::move(result_ptr)); - // (2) if it's ReadFromMergeTree - run index analysis and check number of rows to read if (settings.parallel_replicas_min_number_of_rows_per_replica > 0) { + auto result_ptr = reading->selectRangesToRead(); + UInt64 rows_to_read = result_ptr->selected_rows; + reading->setAnalyzedResult(std::move(result_ptr)); + if (table_expression_query_info.limit > 0 && table_expression_query_info.limit < rows_to_read) rows_to_read = table_expression_query_info.limit; diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 8ea826b861c..27d86f656c5 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -273,44 +273,21 @@ std::unique_ptr createLocalPlanForParallelReplicas( } chassert(reading); - if (!analyzed_read_from_merge_tree) - analyzed_read_from_merge_tree = std::move(node->step); - auto * analyzed_merge_tree = typeid_cast(analyzed_read_from_merge_tree.get()); - /// if no analysis is done yet, let's do it (happens with JOINs) - if (!analyzed_merge_tree->hasAnalyzedResult()) - analyzed_merge_tree->setAnalyzedResult(analyzed_merge_tree->selectRangesToRead()); + ReadFromMergeTree * analyzed_merge_tree = nullptr; + if (analyzed_read_from_merge_tree.get()) + analyzed_merge_tree = typeid_cast(analyzed_read_from_merge_tree.get()); - chassert(analyzed_merge_tree->hasAnalyzedResult()); - - CoordinationMode mode = CoordinationMode::Default; - switch (analyzed_merge_tree->getReadType()) - { - case ReadFromMergeTree::ReadType::Default: - mode = CoordinationMode::Default; - break; - case ReadFromMergeTree::ReadType::InOrder: - mode = CoordinationMode::WithOrder; - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - mode = CoordinationMode::ReverseOrder; - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - chassert(false); - UNREACHABLE(); - } - - const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1; - coordinator->handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement( - mode, analyzed_merge_tree->getAnalysisResult().parts_with_ranges.getDescriptions(), number_of_local_replica)); - - MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement) {}; + MergeTreeAllRangesCallback all_ranges_cb + = [coordinator](InitialAllRangesAnnouncement announcement) { coordinator->handleInitialAllRangesAnnouncement(announcement); }; MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional { return coordinator->handleRequest(std::move(req)); }; + const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1; + auto read_from_merge_tree_parallel_replicas - = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb, number_of_local_replica); + = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica); node->step = std::move(read_from_merge_tree_parallel_replicas); addConvertingActions(*query_plan, header, has_missing_objects); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 7a13f072e4a..1aa48dbcff8 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -337,7 +337,6 @@ ReadFromMergeTree::ReadFromMergeTree( std::unique_ptr ReadFromMergeTree::createLocalParallelReplicasReadingStep( const ReadFromMergeTree * analyzed_merge_tree, - bool enable_parallel_reading_, std::optional all_ranges_callback_, std::optional read_task_callback_, std::optional number_of_current_replica_) @@ -354,8 +353,8 @@ std::unique_ptr ReadFromMergeTree::createLocalParallelReplica requested_num_streams, max_block_numbers_to_read, log, - analyzed_merge_tree->analyzed_result_ptr, - enable_parallel_reading_, + (analyzed_merge_tree ? analyzed_merge_tree->analyzed_result_ptr : nullptr), + true, all_ranges_callback_, read_task_callback_, number_of_current_replica_); @@ -1424,11 +1423,8 @@ static void buildIndexes( const auto & settings = context->getSettingsRef(); - indexes.emplace(ReadFromMergeTree::Indexes{{ - filter_actions_dag, - context, - primary_key_column_names, - primary_key.expression}, {}, {}, {}, {}, false, {}}); + indexes.emplace( + ReadFromMergeTree::Indexes{KeyCondition{filter_actions_dag, context, primary_key_column_names, primary_key.expression}}); if (metadata_snapshot->hasPartitionKey()) { @@ -1951,6 +1947,33 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons { auto result = getAnalysisResult(); + if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator()) + { + CoordinationMode mode = CoordinationMode::Default; + switch (result.read_type) + { + case ReadFromMergeTree::ReadType::Default: + mode = CoordinationMode::Default; + break; + case ReadFromMergeTree::ReadType::InOrder: + mode = CoordinationMode::WithOrder; + break; + case ReadFromMergeTree::ReadType::InReverseOrder: + mode = CoordinationMode::ReverseOrder; + break; + case ReadFromMergeTree::ReadType::ParallelReplicas: + chassert(false); + UNREACHABLE(); + } + + chassert(number_of_current_replica.has_value()); + chassert(all_ranges_callback.has_value()); + + /// initialize working set from local replica + all_ranges_callback.value()( + InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value())); + } + if (enable_remove_parts_from_snapshot_optimization) { /// Do not keep data parts in snapshot. diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index b541481fa62..dc1d20cc807 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -126,7 +126,6 @@ public: std::unique_ptr createLocalParallelReplicasReadingStep( const ReadFromMergeTree * analyzed_merge_tree, - bool enable_parallel_reading_, std::optional all_ranges_callback_, std::optional read_task_callback_, std::optional number_of_current_replica_); @@ -151,6 +150,11 @@ public: struct Indexes { + explicit Indexes(KeyCondition key_condition_) + : key_condition(std::move(key_condition_)) + , use_skip_indexes(false) + {} + KeyCondition key_condition; std::optional partition_pruner; std::optional minmax_idx_condition;