From fa8aafa94222da68791492561aaa0ee70e395249 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 22 May 2024 21:28:33 +0000 Subject: [PATCH] Local plan for parallel replicas: save --- .../ClusterProxy/executeQuery.cpp | 83 +++++++++++++++---- src/Interpreters/ClusterProxy/executeQuery.h | 9 +- src/Planner/PlannerJoinTree.cpp | 4 +- .../QueryPlan/DistributedCreateLocalPlan.cpp | 2 - .../QueryPlan/DistributedCreateLocalPlan.h | 5 -- .../QueryPlan/ParallelReplicasLocalPlan.cpp | 78 +++++++++++++++++ .../QueryPlan/ParallelReplicasLocalPlan.h | 19 +++++ src/Processors/QueryPlan/ReadFromRemote.cpp | 7 +- src/Processors/QueryPlan/ReadFromRemote.h | 4 +- 9 files changed, 182 insertions(+), 29 deletions(-) create mode 100644 src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp create mode 100644 src/Processors/QueryPlan/ParallelReplicasLocalPlan.h diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index d1701d268f1..71912fa1081 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace DB { @@ -403,7 +404,8 @@ void executeQueryWithParallelReplicas( QueryProcessingStage::Enum processed_stage, const ASTPtr & query_ast, ContextPtr context, - std::shared_ptr storage_limits) + std::shared_ptr storage_limits, + QueryPlanStepPtr read_from_merge_tree) { const auto & settings = context->getSettingsRef(); @@ -486,21 +488,66 @@ void executeQueryWithParallelReplicas( auto coordinator = std::make_shared( new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size); auto external_tables = new_context->getExternalTables(); - auto read_from_remote = std::make_unique( - query_ast, - new_cluster, - storage_id, - std::move(coordinator), - header, - processed_stage, - new_context, - getThrottler(new_context), - std::move(scalars), - std::move(external_tables), - getLogger("ReadFromParallelRemoteReplicasStep"), - std::move(storage_limits)); - query_plan.addStep(std::move(read_from_remote)); + if (settings.allow_experimental_analyzer) + { + auto read_from_remote = std::make_unique( + query_ast, + new_cluster, + storage_id, + std::move(coordinator), + header, + processed_stage, + new_context, + getThrottler(new_context), + std::move(scalars), + std::move(external_tables), + getLogger("ReadFromParallelRemoteReplicasStep"), + std::move(storage_limits), + /*exclude_local_replica*/ true); + + auto remote_plan = std::make_unique(); + remote_plan->addStep(std::move(read_from_remote)); + + auto local_plan = createLocalPlanForParallelReplicas( + query_ast, + header, + new_context, + processed_stage, + coordinator, + std::move(read_from_merge_tree), + /*has_missing_objects=*/false); + + DataStreams input_streams; + input_streams.reserve(2); + input_streams.emplace_back(local_plan->getCurrentDataStream()); + input_streams.emplace_back(remote_plan->getCurrentDataStream()); + + std::vector plans; + plans.emplace_back(std::move(local_plan)); + plans.emplace_back(std::move(remote_plan)); + + auto union_step = std::make_unique(std::move(input_streams)); + query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + else { + auto read_from_remote = std::make_unique( + query_ast, + new_cluster, + storage_id, + std::move(coordinator), + header, + processed_stage, + new_context, + getThrottler(new_context), + std::move(scalars), + std::move(external_tables), + getLogger("ReadFromParallelRemoteReplicasStep"), + std::move(storage_limits), + /*exclude_local_replica*/ false); + + query_plan.addStep(std::move(read_from_remote)); + } } void executeQueryWithParallelReplicas( @@ -510,7 +557,8 @@ void executeQueryWithParallelReplicas( const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context, ContextPtr context, - std::shared_ptr storage_limits) + std::shared_ptr storage_limits, + QueryPlanStepPtr read_from_merge_tree) { QueryTreeNodePtr modified_query_tree = query_tree->clone(); rewriteJoinToGlobalJoin(modified_query_tree, context); @@ -520,7 +568,8 @@ void executeQueryWithParallelReplicas( = InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_tree, context, SelectQueryOptions(processed_stage).analyze()); auto modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree); - executeQueryWithParallelReplicas(query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits); + executeQueryWithParallelReplicas( + query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits, std::move(read_from_merge_tree)); } void executeQueryWithParallelReplicas( diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 6548edf8939..1b38d1921b1 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -30,6 +30,9 @@ using QueryTreeNodePtr = std::shared_ptr; class PlannerContext; using PlannerContextPtr = std::shared_ptr; +class IQueryPlanStep; +using QueryPlanStepPtr = std::unique_ptr; + namespace ClusterProxy { @@ -73,7 +76,8 @@ void executeQueryWithParallelReplicas( QueryProcessingStage::Enum processed_stage, const ASTPtr & query_ast, ContextPtr context, - std::shared_ptr storage_limits); + std::shared_ptr storage_limits, + QueryPlanStepPtr read_from_merge_tree = nullptr); void executeQueryWithParallelReplicas( QueryPlan & query_plan, @@ -90,7 +94,8 @@ void executeQueryWithParallelReplicas( const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context, ContextPtr context, - std::shared_ptr storage_limits); + std::shared_ptr storage_limits, + QueryPlanStepPtr read_from_merge_tree); } } diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index a6e4a8ebcde..275461fa109 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -934,6 +934,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres { from_stage = QueryProcessingStage::WithMergeableState; QueryPlan query_plan_parallel_replicas; + QueryPlanStepPtr reading_step = std::move(node->step); ClusterProxy::executeQueryWithParallelReplicas( query_plan_parallel_replicas, storage->getStorageID(), @@ -941,7 +942,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres table_expression_query_info.query_tree, table_expression_query_info.planner_context, query_context, - table_expression_query_info.storage_limits); + table_expression_query_info.storage_limits, + std::move(reading_step)); query_plan = std::move(query_plan_parallel_replicas); const Block & query_plan_header = query_plan.getCurrentDataStream().header; diff --git a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp index d4545482477..1f4f271fa6e 100644 --- a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp +++ b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp @@ -1,8 +1,6 @@ #include -#include #include -#include #include #include #include diff --git a/src/Processors/QueryPlan/DistributedCreateLocalPlan.h b/src/Processors/QueryPlan/DistributedCreateLocalPlan.h index 50545d9ae81..f59123a7d88 100644 --- a/src/Processors/QueryPlan/DistributedCreateLocalPlan.h +++ b/src/Processors/QueryPlan/DistributedCreateLocalPlan.h @@ -1,17 +1,12 @@ #pragma once #include -#include #include #include -#include namespace DB { -class PreparedSets; -using PreparedSetsPtr = std::shared_ptr; - std::unique_ptr createLocalPlan( const ASTPtr & query_ast, const Block & header, diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp new file mode 100644 index 00000000000..4d78e049b58 --- /dev/null +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + +void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects) +{ + if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header)) + return; + + auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name; + + auto get_converting_dag = [mode](const Block & block_, const Block & header_) + { + /// Convert header structure to expected. + /// Also we ignore constants from result and replace it with constants from header. + /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. + return ActionsDAG::makeConvertingActions( + block_.getColumnsWithTypeAndName(), + header_.getColumnsWithTypeAndName(), + mode, + true); + }; + + auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header); + auto converting = std::make_unique(plan.getCurrentDataStream(), convert_actions_dag); + plan.addStep(std::move(converting)); +} + +} + +std::unique_ptr createLocalPlanForParallelReplicas( + const ASTPtr & query_ast, + const Block & header, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + ParallelReplicasReadingCoordinatorPtr /*coordinator*/, + QueryPlanStepPtr /*read_from_merge_tree*/, + bool has_missing_objects) +{ + checkStackSize(); + + auto query_plan = std::make_unique(); + auto new_context = Context::createCopy(context); + + /// Do not push down limit to local plan, as it will break `rows_before_limit_at_least` counter. + if (processed_stage == QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit) + processed_stage = QueryProcessingStage::WithMergeableStateAfterAggregation; + + /// Do not apply AST optimizations, because query + /// is already optimized and some optimizations + /// can be applied only for non-distributed tables + /// and we can produce query, inconsistent with remote plans. + auto select_query_options = SelectQueryOptions(processed_stage).ignoreASTOptimizations(); + + /// For Analyzer, identifier in GROUP BY/ORDER BY/LIMIT BY lists has been resolved to + /// ConstantNode in QueryTree if it is an alias of a constant, so we should not replace + /// ConstantNode with ProjectionNode again(https://github.com/ClickHouse/ClickHouse/issues/62289). + new_context->setSetting("enable_positional_arguments", Field(false)); + auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options); + query_plan = std::make_unique(std::move(interpreter).extractQueryPlan()); + + addConvertingActions(*query_plan, header, has_missing_objects); + return query_plan; +} + +} diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h new file mode 100644 index 00000000000..89d2019f807 --- /dev/null +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +std::unique_ptr createLocalPlanForParallelReplicas( + const ASTPtr & query_ast, + const Block & header, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + ParallelReplicasReadingCoordinatorPtr coordinator, + QueryPlanStepPtr read_from_merge_tree, + bool has_missing_objects); +} diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index b4e35af85d6..6e6edfa1208 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -369,7 +369,8 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( Scalars scalars_, Tables external_tables_, LoggerPtr log_, - std::shared_ptr storage_limits_) + std::shared_ptr storage_limits_, + bool exclude_local_replica_) : ISourceStep(DataStream{.header = std::move(header_)}) , cluster(cluster_) , query_ast(query_ast_) @@ -382,6 +383,7 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( , external_tables{external_tables_} , storage_limits(std::move(storage_limits_)) , log(log_) + , exclude_local_replica(exclude_local_replica_) { chassert(cluster->getShardCount() == 1); @@ -410,6 +412,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder const auto & shard = cluster->getShardsInfo().at(0); size_t all_replicas_count = current_settings.max_parallel_replicas; + if (exclude_local_replica) + --all_replicas_count; + if (all_replicas_count > shard.getAllNodeCount()) { LOG_INFO( diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index eb15269155a..442da098a17 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -78,7 +78,8 @@ public: Scalars scalars_, Tables external_tables_, LoggerPtr log_, - std::shared_ptr storage_limits_); + std::shared_ptr storage_limits_, + bool exclude_local_replica = false); String getName() const override { return "ReadFromRemoteParallelReplicas"; } @@ -101,6 +102,7 @@ private: Tables external_tables; std::shared_ptr storage_limits; LoggerPtr log; + bool exclude_local_replica; }; }