Use cluster connections pool in DirectoryMonitor

This commit is contained in:
zhang2014 2018-12-02 01:25:33 +08:00
parent c399038e19
commit 4798ab1924

View File

@ -175,17 +175,33 @@ void StorageDistributedDirectoryMonitor::run()
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.context.getSettingsRef());
const auto pool_factory = [&storage, &timeouts] (const std::string & host, const UInt16 port,
const Protocol::Secure secure,
const std::string & user, const std::string & password,
const std::string & default_database)
const auto pool_factory = [&storage, &timeouts](const std::string & host, const UInt16 port,
const Protocol::Secure secure,
const std::string & user, const std::string & password,
const std::string & default_database) -> ConnectionPoolPtr
{
return std::make_shared<ConnectionPool>(
1, host, port, default_database,
user, password, timeouts,
storage.getName() + '_' + user,
Protocol::Compression::Enable,
secure);
ClusterPtr cluster = storage.getCluster();
const auto shards_info = cluster->getShardsInfo();
const auto shards_addresses = cluster->getShardsAddresses();
/// existing connections pool have a higher priority
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
Cluster::Addresses replicas_addresses = shards_addresses[shard_index];
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
{
Cluster::Address replica_address = replicas_addresses[replica_index];
if (replica_address.host_name == host && replica_address.port == port
&& replica_address.secure == secure && replica_address.user == user
&& replica_address.password == password && replica_address.default_database == default_database)
return shards_info[shard_index].per_replica_pools[replica_index];
}
}
return std::make_shared<ConnectionPool>(1, host, port, default_database, user, password, timeouts,
storage.getName() + '_' + user, Protocol::Compression::Enable, secure);
};
auto pools = createPoolsForAddresses(name, pool_factory);