From 42cb2ed81b3a807db5ec9b34dc8901cf8123308c Mon Sep 17 00:00:00 2001 From: Kiran Date: Tue, 7 Jan 2020 23:50:12 +0530 Subject: [PATCH] handled flattening nodes in circular topology --- dbms/src/Interpreters/Cluster.cpp | 50 ++++++++++++++++++------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index e645a8553d4..bc5f453e02b 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -467,6 +467,7 @@ std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{}, addresses_with_failover{} { + std::set> hosts; if (!from.addresses_with_failover.empty()) { for (size_t shard_index : ext::range(0, from.shards_info.size())) @@ -475,27 +476,36 @@ Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{ for (size_t replica_index : ext::range(0, replicas.size())) { ShardInfo info; - Address address; - address = replicas[replica_index]; - if (address.is_local) - info.local_addresses.push_back(replicas[replica_index]); + Address address = replicas[replica_index]; + auto position = find_if(hosts.begin(), hosts.end(), [=](auto item) { + return std::get<0>(item) == address.host_name && std::get<1>(item) == address.port; + }); + if (position == hosts.end()) + { + if (address.is_local) + info.local_addresses.push_back(replicas[replica_index]); + hosts.insert(std::tuple (address.host_name, address.port)); + ConnectionPoolPtr pool = std::make_shared( + settings.distributed_connections_pool_size, + address.host_name, + address.port, + address.default_database, + address.user, + address.password, + "server", + address.compression, + address.secure); - ConnectionPoolPtr pool = std::make_shared( - settings.distributed_connections_pool_size, - address.host_name, - address.port, - address.default_database, - address.user, - address.password, - "server", - address.compression, - address.secure); - - info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); - info.per_replica_pools = {std::move(pool)}; - std ::vector newAddress = {address}; - addresses_with_failover.emplace_back(newAddress); - shards_info.emplace_back(std::move(info)); + info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); + info.per_replica_pools = {std::move(pool)}; + std ::vector newAddress = {address}; + addresses_with_failover.emplace_back(newAddress); + shards_info.emplace_back(std::move(info)); + } + else + { + continue; + } } } }