diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index bd5a4793872..6822d8b0a71 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -20,34 +20,7 @@ namespace DB { -namespace -{ - -void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects) -{ - if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header)) - return; - - auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name; - - auto get_converting_dag = [mode](const Block & block_, const Block & header_) - { - /// Convert header structure to expected. - /// Also we ignore constants from result and replace it with constants from header. - /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. - return ActionsDAG::makeConvertingActions( - block_.getColumnsWithTypeAndName(), - header_.getColumnsWithTypeAndName(), - mode, - true); - }; - - auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header); - auto converting = std::make_unique(plan.getCurrentDataStream(), convert_actions_dag); - plan.addStep(std::move(converting)); -} - -} +void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects); std::unique_ptr createLocalPlanForParallelReplicas( const ASTPtr & query_ast, @@ -55,8 +28,7 @@ std::unique_ptr createLocalPlanForParallelReplicas( ContextPtr context, QueryProcessingStage::Enum processed_stage, ParallelReplicasReadingCoordinatorPtr coordinator, - QueryPlanStepPtr analyzed_read_from_merge_tree, - bool has_missing_objects) + QueryPlanStepPtr analyzed_read_from_merge_tree) { checkStackSize(); @@ -112,7 +84,7 @@ std::unique_ptr createLocalPlanForParallelReplicas( = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica); node->step = std::move(read_from_merge_tree_parallel_replicas); - addConvertingActions(*query_plan, header, has_missing_objects); + addConvertingActions(*query_plan, header, /*has_missing_objects=*/false); return query_plan; } diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h index 89d2019f807..123754458a1 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h @@ -14,6 +14,5 @@ std::unique_ptr createLocalPlanForParallelReplicas( ContextPtr context, QueryProcessingStage::Enum processed_stage, ParallelReplicasReadingCoordinatorPtr coordinator, - QueryPlanStepPtr read_from_merge_tree, - bool has_missing_objects); + QueryPlanStepPtr read_from_merge_tree); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 71caece30ed..1de701cff0b 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -339,9 +339,9 @@ ReadFromMergeTree::ReadFromMergeTree( std::unique_ptr ReadFromMergeTree::createLocalParallelReplicasReadingStep( const ReadFromMergeTree * analyzed_merge_tree, std::optional all_ranges_callback_, - std::optional read_task_callback_, - std::optional number_of_current_replica_) + std::optional read_task_callback_) { + const auto number_of_local_replica = 0; return std::make_unique( prepared_parts, alter_conversions_for_parts, @@ -356,9 +356,9 @@ std::unique_ptr ReadFromMergeTree::createLocalParallelReplica log, (analyzed_merge_tree ? analyzed_merge_tree->analyzed_result_ptr : nullptr), true, - all_ranges_callback_, - read_task_callback_, - number_of_current_replica_); + all_ranges_callback, + read_task_callback, + number_of_local_replica); } Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 079f95be908..59a85413dd9 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -123,14 +123,13 @@ public: AnalysisResultPtr analyzed_result_ptr_, bool enable_parallel_reading_, std::optional all_ranges_callback_ = std::nullopt, - std::optional read_task_callback_ = std::nullopt, + std::optional read_task_callback_ = std::nullopt); std::optional number_of_current_replica_ = std::nullopt); std::unique_ptr createLocalParallelReplicasReadingStep( const ReadFromMergeTree * analyzed_merge_tree, - std::optional all_ranges_callback_, - std::optional read_task_callback_, - std::optional number_of_current_replica_); + std::optional all_ranges_callback, + std::optional read_task_callback); static constexpr auto name = "ReadFromMergeTree"; String getName() const override { return name; } @@ -291,8 +290,6 @@ private: bool enable_vertical_final = false; bool enable_remove_parts_from_snapshot_optimization = true; std::optional number_of_current_replica; - - friend class ReadFromMergeTreeCoordinated; }; } diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h index afde5f336af..8b463fda395 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h @@ -30,8 +30,8 @@ public: /// needed to report total rows to read void setProgressCallback(ProgressCallback callback); - void initialize(CoordinationMode mode); private: + void initialize(CoordinationMode mode); std::mutex mutex; const size_t replicas_count{0}; diff --git a/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql index 337b05263d0..7dbc389b55c 100644 --- a/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql +++ b/tests/queries/0_stateless/02769_parallel_replicas_unavailable_shards.sql @@ -6,7 +6,7 @@ SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=1 SET send_logs_level='error'; -- with local plan for initiator, the query can be executed fast on initator, we can simply not come to the point where unavailable replica can be detected -- therefore disable local plan for now -SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*) SETTINGS log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79', allow_experimental_analyzer=0; +SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*) SETTINGS log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79', parallel_replicas_local_plan=0; SYSTEM FLUSH LOGS;