From 842b51c782f0cc5cf9bd7b695c466933310a7e6c Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Tue, 23 Jul 2024 13:30:40 +0000 Subject: [PATCH] Try assigning replica numbers consistently independent of initiator --- .../ClusterProxy/executeQuery.cpp | 67 +++++++++++- .../QueryPlan/ParallelReplicasLocalPlan.cpp | 7 +- .../QueryPlan/ParallelReplicasLocalPlan.h | 3 +- .../QueryPlan/ReadFromMergeTree.cpp | 8 +- src/Processors/QueryPlan/ReadFromMergeTree.h | 3 +- src/Processors/QueryPlan/ReadFromRemote.cpp | 102 ++++-------------- src/Processors/QueryPlan/ReadFromRemote.h | 6 +- .../ParallelReplicasReadingCoordinator.cpp | 3 +- 8 files changed, 101 insertions(+), 98 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index a8cf3022a61..80f6d7a864b 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -40,6 +40,7 @@ namespace ErrorCodes extern const int TOO_LARGE_DISTRIBUTED_DEPTH; extern const int LOGICAL_ERROR; extern const int UNEXPECTED_CLUSTER; + extern const int INCONSISTENT_CLUSTER_DEFINITION; } namespace ClusterProxy @@ -519,22 +520,75 @@ void executeQueryWithParallelReplicas( "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); } - const auto replica_count = std::min(settings.max_parallel_replicas.value, new_cluster->getShardsInfo().begin()->getAllNodeCount()); + const auto & shard = new_cluster->getShardsInfo().at(0); + size_t max_replicas_to_use = settings.max_parallel_replicas; + if (max_replicas_to_use > shard.getAllNodeCount()) + { + LOG_INFO( + getLogger("ReadFromParallelRemoteReplicasStep"), + "The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). " + "Will use the latter number to execute the query.", + settings.max_parallel_replicas, + shard.getAllNodeCount()); + max_replicas_to_use = shard.getAllNodeCount(); + } - auto coordinator = std::make_shared(replica_count, settings.parallel_replicas_mark_segment_size); + auto coordinator = std::make_shared(max_replicas_to_use, settings.parallel_replicas_mark_segment_size); auto external_tables = new_context->getExternalTables(); + std::vector shuffled_pool; + if (max_replicas_to_use < shard.getAllNodeCount()) + { + shuffled_pool = shard.pool->getShuffledPools(settings); + shuffled_pool.resize(max_replicas_to_use); + } + else + { + /// if all replicas in cluster are used for query execution + /// try to preserve replicas order as in cluster definition + /// it's important for data locality during query execution + auto priority_func = [](size_t i) { return Priority{static_cast(i)}; }; + shuffled_pool = shard.pool->getShuffledPools(settings, priority_func); + } + + std::vector pools_to_use; + pools_to_use.reserve(shuffled_pool.size()); + for (auto & pool : shuffled_pool) + pools_to_use.emplace_back(std::move(pool.pool)); + /// do not build local plan for distributed queries for now (address it later) if (settings.allow_experimental_analyzer && settings.parallel_replicas_local_plan && !shard_num) { + /// find local replica index in pool, to assign it as replica number + std::optional local_replica_number; + for (size_t i = 0, s = pools_to_use.size(); i < s; ++i) + { + const auto & hostname = pools_to_use[i]->getHost(); + const auto found = std::find_if( + begin(shard.local_addresses), + end(shard.local_addresses), + [&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; }); + if (found != shard.local_addresses.end()) + { + local_replica_number = i; + break; + } + } + if (!local_replica_number) + throw Exception( + ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, + "Local replica is not found in '{}' cluster definition, see 'cluster_for_parallel_replicas' setting", + new_cluster->getName()); + auto [local_plan, with_parallel_replicas] = createLocalPlanForParallelReplicas( query_ast, header, new_context, processed_stage, coordinator, - std::move(analyzed_read_from_merge_tree)); + std::move(analyzed_read_from_merge_tree), + local_replica_number.value()); if (!with_parallel_replicas) { @@ -542,6 +596,8 @@ void executeQueryWithParallelReplicas( return; } + LOG_DEBUG(logger, "Local replica got replica number {}", local_replica_number.value()); + auto read_from_remote = std::make_unique( query_ast, new_cluster, @@ -555,7 +611,8 @@ void executeQueryWithParallelReplicas( std::move(external_tables), getLogger("ReadFromParallelRemoteReplicasStep"), std::move(storage_limits), - /*exclude_local_replica*/ true); + std::move(pools_to_use), + local_replica_number); auto remote_plan = std::make_unique(); remote_plan->addStep(std::move(read_from_remote)); @@ -587,7 +644,7 @@ void executeQueryWithParallelReplicas( std::move(external_tables), getLogger("ReadFromParallelRemoteReplicasStep"), std::move(storage_limits), - /*exclude_local_replica*/ false); + std::move(pools_to_use)); query_plan.addStep(std::move(read_from_remote)); } diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index e8ff0f417dc..050044edd3a 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -27,7 +27,8 @@ std::pair, bool> createLocalPlanForParallelReplicas( ContextPtr context, QueryProcessingStage::Enum processed_stage, ParallelReplicasReadingCoordinatorPtr coordinator, - QueryPlanStepPtr analyzed_read_from_merge_tree) + QueryPlanStepPtr analyzed_read_from_merge_tree, + size_t replica_number) { checkStackSize(); @@ -84,8 +85,8 @@ std::pair, bool> createLocalPlanForParallelReplicas( MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional { return coordinator->handleRequest(std::move(req)); }; - auto read_from_merge_tree_parallel_replicas - = reading->createLocalParallelReplicasReadingStep(analyzed_result_ptr, std::move(all_ranges_cb), std::move(read_task_cb)); + auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep( + analyzed_result_ptr, std::move(all_ranges_cb), std::move(read_task_cb), replica_number); node->step = std::move(read_from_merge_tree_parallel_replicas); addConvertingActions(*query_plan, header, /*has_missing_objects=*/false); diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h index 2a49be6347a..ab0bbeaeeff 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.h @@ -14,5 +14,6 @@ std::pair, bool> createLocalPlanForParallelReplicas( ContextPtr context, QueryProcessingStage::Enum processed_stage, ParallelReplicasReadingCoordinatorPtr coordinator, - QueryPlanStepPtr read_from_merge_tree); + QueryPlanStepPtr read_from_merge_tree, + size_t replica_number); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 5922550eaf3..aea9d02fe01 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -348,9 +348,9 @@ ReadFromMergeTree::ReadFromMergeTree( std::unique_ptr ReadFromMergeTree::createLocalParallelReplicasReadingStep( AnalysisResultPtr analyzed_result_ptr_, MergeTreeAllRangesCallback all_ranges_callback_, - MergeTreeReadTaskCallback read_task_callback_) + MergeTreeReadTaskCallback read_task_callback_, + size_t replica_number) { - const auto number_of_local_replica = 0; const bool enable_parallel_reading = true; return std::make_unique( prepared_parts, @@ -364,11 +364,11 @@ std::unique_ptr ReadFromMergeTree::createLocalParallelReplica requested_num_streams, max_block_numbers_to_read, log, - analyzed_result_ptr_, + std::move(analyzed_result_ptr_), enable_parallel_reading, all_ranges_callback_, read_task_callback_, - number_of_local_replica); + replica_number); } Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 307b605c01c..69e20a7864d 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -129,7 +129,8 @@ public: std::unique_ptr createLocalParallelReplicasReadingStep( AnalysisResultPtr analyzed_result_ptr_, MergeTreeAllRangesCallback all_ranges_callback_, - MergeTreeReadTaskCallback read_task_callback_); + MergeTreeReadTaskCallback read_task_callback_, + size_t replica_number); static constexpr auto name = "ReadFromMergeTree"; String getName() const override { return name; } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 46d4aa29e70..3df46eb1987 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -371,7 +371,8 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( Tables external_tables_, LoggerPtr log_, std::shared_ptr storage_limits_, - bool exclude_local_replica_) + std::vector pools_to_use_, + std::optional exclude_pool_index_) : ISourceStep(DataStream{.header = std::move(header_)}) , cluster(cluster_) , query_ast(query_ast_) @@ -384,24 +385,20 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( , external_tables{external_tables_} , storage_limits(std::move(storage_limits_)) , log(log_) - , exclude_local_replica(exclude_local_replica_) + , pools_to_use(std::move(pools_to_use_)) + , exclude_pool_index(exclude_pool_index_) { chassert(cluster->getShardCount() == 1); std::vector replicas; - replicas.reserve(cluster->getShardsAddresses().front().size()); + replicas.reserve(pools_to_use.size()); - for (const auto & addr : cluster->getShardsAddresses().front()) + for (size_t i = 0, l = pools_to_use.size(); i < l; ++i) { - if (exclude_local_replica && addr.is_local) + if (exclude_pool_index.has_value() && i == exclude_pool_index) continue; - /// replace hostname with replica name if the hostname started with replica namespace, - /// it makes description shorter and more readable - if (!addr.database_replica_name.empty() && addr.host_name.starts_with(addr.database_replica_name)) - replicas.push_back(fmt::format("{}", addr.database_replica_name)); - else - replicas.push_back(fmt::format("{}", addr.host_name)); + replicas.push_back(pools_to_use[i]->getAddress()); } auto description = fmt::format("Query: {} Replicas: {}", formattedAST(query_ast), fmt::join(replicas, ", ")); @@ -421,86 +418,29 @@ void ReadFromParallelRemoteReplicasStep::enforceAggregationInOrder() void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { Pipes pipes; - const Settings & current_settings = context->getSettingsRef(); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - - const auto & shard = cluster->getShardsInfo().at(0); - size_t max_replicas_to_use = current_settings.max_parallel_replicas; - if (max_replicas_to_use > shard.getAllNodeCount()) - { - LOG_INFO( - getLogger("ReadFromParallelRemoteReplicasStep"), - "The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). " - "Will use the latter number to execute the query.", - current_settings.max_parallel_replicas, - shard.getAllNodeCount()); - max_replicas_to_use = shard.getAllNodeCount(); - } - - std::vector shuffled_pool; - if (max_replicas_to_use < shard.getAllNodeCount()) - { - shuffled_pool = shard.pool->getShuffledPools(current_settings); - } - else - { - /// try to preserve replicas order if all replicas in cluster are used for query execution - /// it's important for data locality during query execution - auto priority_func = [](size_t i) { return Priority{static_cast(i)}; }; - shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); - } - - std::vector pools_to_use; - pools_to_use.reserve(shuffled_pool.size()); - for (const auto & pool : shuffled_pool) - { - if (exclude_local_replica) - { - const auto & hostname = pool.pool->getHost(); - auto it = std::find_if( - begin(shard.local_addresses), - end(shard.local_addresses), - [&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; }); - if (it == shard.local_addresses.end()) - pools_to_use.push_back(pool.pool); - } - else - { - pools_to_use.push_back(pool.pool); - } - } - - pools_to_use.resize(std::min(pools_to_use.size(), max_replicas_to_use)); - // if local plan is used for local replica, we should exclude one remote replica - if (exclude_local_replica && !pools_to_use.empty()) - pools_to_use.resize(max_replicas_to_use - 1); - - LOG_DEBUG( - getLogger("ReadFromParallelRemoteReplicasStep"), - "Number of pools to use is {}. Originally {}", - pools_to_use.size(), - shuffled_pool.size()); - - if (pools_to_use.empty()) - return; std::vector addresses; addresses.reserve(pools_to_use.size()); - for (const auto & pool : pools_to_use) - addresses.emplace_back(pool->getAddress()); + for (size_t i = 0, l = pools_to_use.size(); i < l; ++i) + { + if (exclude_pool_index.has_value() && i == exclude_pool_index) + continue; + + addresses.emplace_back(pools_to_use[i]->getAddress()); + } LOG_DEBUG(getLogger("ReadFromParallelRemoteReplicasStep"), "Addresses to use: {}", fmt::join(addresses, ", ")); - /// when using local plan for local replica, 0 is assigned to local replica as replica num, - in this case, starting from 1 here - size_t replica_num = (exclude_local_replica ? 1 : 0); - for (const auto & pool : pools_to_use) + for (size_t i = 0, l = pools_to_use.size(); i < l; ++i) { + if (exclude_pool_index.has_value() && i == exclude_pool_index) + continue; + IConnections::ReplicaInfo replica_info{ /// we should use this number specifically because efficiency of data distribution by consistent hash depends on it. - .number_of_current_replica = replica_num, + .number_of_current_replica = i, }; - ++replica_num; - addPipeForSingeReplica(pipes, pool, replica_info); + addPipeForSingeReplica(pipes, pools_to_use[i], replica_info); } auto pipe = Pipe::unitePipes(std::move(pipes)); diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 442da098a17..74389c8f9eb 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -79,7 +79,8 @@ public: Tables external_tables_, LoggerPtr log_, std::shared_ptr storage_limits_, - bool exclude_local_replica = false); + std::vector pools_to_use, + std::optional exclude_pool_index_ = std::nullopt); String getName() const override { return "ReadFromRemoteParallelReplicas"; } @@ -102,7 +103,8 @@ private: Tables external_tables; std::shared_ptr storage_limits; LoggerPtr log; - bool exclude_local_replica; + std::vector pools_to_use; + std::optional exclude_pool_index; }; } diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index a1b36fe1299..a1264eabdea 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -185,7 +185,8 @@ public: void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) { if (++sent_initial_requests > replicas_count) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Initiator received more initial requests than there are replicas"); + throw Exception( + ErrorCodes::LOGICAL_ERROR, "Initiator received more initial requests than there are replicas: replica_num={}", announcement.replica_num); doHandleInitialAllRangesAnnouncement(std::move(announcement)); }