diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 74821f432e7..7cff6705fa5 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -175,17 +175,33 @@ void StorageDistributedDirectoryMonitor::run() ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage) { auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.context.getSettingsRef()); - const auto pool_factory = [&storage, &timeouts] (const std::string & host, const UInt16 port, - const Protocol::Secure secure, - const std::string & user, const std::string & password, - const std::string & default_database) + const auto pool_factory = [&storage, &timeouts](const std::string & host, const UInt16 port, + const Protocol::Secure secure, + const std::string & user, const std::string & password, + const std::string & default_database) -> ConnectionPoolPtr { - return std::make_shared( - 1, host, port, default_database, - user, password, timeouts, - storage.getName() + '_' + user, - Protocol::Compression::Enable, - secure); + ClusterPtr cluster = storage.getCluster(); + const auto shards_info = cluster->getShardsInfo(); + const auto shards_addresses = cluster->getShardsAddresses(); + + /// existing connections pool have a higher priority + for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) + { + Cluster::Addresses replicas_addresses = shards_addresses[shard_index]; + + for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index) + { + Cluster::Address replica_address = replicas_addresses[replica_index]; + + if (replica_address.host_name == host && replica_address.port == port + && replica_address.secure == secure && replica_address.user == user + && replica_address.password == password && replica_address.default_database == default_database) + return shards_info[shard_index].per_replica_pools[replica_index]; + } + } + + return std::make_shared(1, host, port, default_database, user, password, timeouts, + storage.getName() + '_' + user, Protocol::Compression::Enable, secure); }; auto pools = createPoolsForAddresses(name, pool_factory);