diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 4bf446b107b..b562f986bee 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -122,6 +122,47 @@ String Cluster::Address::toStringFull() const + ((secure == Protocol::Secure::Enable) ? "+secure" : ""); } +void Cluster::Address::fromFullString(const String & full_string, Cluster::Address & address) +{ + const char * address_begin = full_string.data(); + const char * address_end = address_begin + full_string.size(); + + Protocol::Secure secure = Protocol::Secure::Disable; + const char * secure_tag = "+secure"; + if (endsWith(full_string, secure_tag)) + { + address_end -= strlen(secure_tag); + secure = Protocol::Secure::Enable; + } + + const char * user_pw_end = strchr(full_string.data(), '@'); + const char * colon = strchr(full_string.data(), ':'); + if (!user_pw_end || !colon) + throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR); + + const bool has_pw = colon < user_pw_end; + const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon; + if (!host_end) + throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR); + + const char * has_db = strchr(full_string.data(), '#'); + const char * port_end = has_db ? has_db : address_end; + + address.secure = secure; + address.port = parse(host_end + 1, port_end - (host_end + 1)); + address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end)); + address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end)); + address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string(); + address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string(); +} + +bool Cluster::Address::operator==(const Cluster::Address & other) const +{ + return other.host_name == host_name && other.port == port + && other.secure == secure && other.user == user + && other.password == password && other.default_database == default_database; +} + /// Implementation of Clusters class @@ -198,7 +239,6 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se const auto weight = config.getInt(prefix + ".weight", default_weight); addresses.emplace_back(config, prefix); - addresses.back().replica_num = 1; const auto & address = addresses.back(); ShardInfo info; @@ -253,7 +293,6 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se if (startsWith(replica_key, "replica")) { replica_addresses.emplace_back(config, partial_prefix + replica_key); - replica_addresses.back().replica_num = current_replica_num; ++current_replica_num; if (!replica_addresses.back().is_local) diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index 8bfbc073c61..4338447dbd7 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -59,7 +59,6 @@ public: String password; /// This database is selected when no database is specified for Distributed table String default_database; - UInt32 replica_num; /// The locality is determined at the initialization, and is not changed even if DNS is changed bool is_local; Protocol::Compression compression = Protocol::Compression::Enable; @@ -82,12 +81,16 @@ public: /// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database String toStringFull() const; + static void fromFullString(const String & address_full_string, Address & address); + /// Returns initially resolved address Poco::Net::SocketAddress getResolvedAddress() const { return initially_resolved_address; } + bool operator==(const Address & other) const; + private: Poco::Net::SocketAddress initially_resolved_address; }; diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 7cff6705fa5..9ef4e012d8f 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -48,40 +48,9 @@ namespace for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) { - const auto address = boost::copy_range(*it); - const char * address_begin = static_cast(address.data()); - const char * address_end = address_begin + address.size(); - - Protocol::Secure secure = Protocol::Secure::Disable; - const char * secure_tag = "+secure"; - if (endsWith(address, secure_tag)) - { - address_end -= strlen(secure_tag); - secure = Protocol::Secure::Enable; - } - - const char * user_pw_end = strchr(address.data(), '@'); - const char * colon = strchr(address.data(), ':'); - if (!user_pw_end || !colon) - throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port#default_database' pattern", - ErrorCodes::INCORRECT_FILE_NAME}; - - const bool has_pw = colon < user_pw_end; - const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon; - if (!host_end) - throw Exception{"Shard address '" + address + "' does not contain port", ErrorCodes::INCORRECT_FILE_NAME}; - - const char * has_db = strchr(address.data(), '#'); - const char * port_end = has_db ? has_db : address_end; - - const auto user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end)); - const auto password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string(); - const auto host = unescapeForFileName(std::string(user_pw_end + 1, host_end)); - const auto port = parse(host_end + 1, port_end - (host_end + 1)); - const auto database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) - : std::string(); - - pools.emplace_back(factory(host, port, secure, user, password, database)); + Cluster::Address address; + Cluster::Address::fromFullString(boost::copy_range(*it), address); + pools.emplace_back(factory(address)); } return pools; @@ -175,33 +144,29 @@ void StorageDistributedDirectoryMonitor::run() ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage) { auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.context.getSettingsRef()); - const auto pool_factory = [&storage, &timeouts](const std::string & host, const UInt16 port, - const Protocol::Secure secure, - const std::string & user, const std::string & password, - const std::string & default_database) -> ConnectionPoolPtr + const auto pool_factory = [&storage, &timeouts](const Cluster::Address & address) -> ConnectionPoolPtr { - ClusterPtr cluster = storage.getCluster(); - const auto shards_info = cluster->getShardsInfo(); - const auto shards_addresses = cluster->getShardsAddresses(); + const auto & cluster = storage.getCluster(); + const auto & shards_info = cluster->getShardsInfo(); + const auto & shards_addresses = cluster->getShardsAddresses(); /// existing connections pool have a higher priority for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) { - Cluster::Addresses replicas_addresses = shards_addresses[shard_index]; + const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index]; for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index) { - Cluster::Address replica_address = replicas_addresses[replica_index]; + const Cluster::Address & replica_address = replicas_addresses[replica_index]; - if (replica_address.host_name == host && replica_address.port == port - && replica_address.secure == secure && replica_address.user == user - && replica_address.password == password && replica_address.default_database == default_database) + if (address == replica_address) return shards_info[shard_index].per_replica_pools[replica_index]; } } - return std::make_shared(1, host, port, default_database, user, password, timeouts, - storage.getName() + '_' + user, Protocol::Compression::Enable, secure); + return std::make_shared( + 1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts, + storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure); }; auto pools = createPoolsForAddresses(name, pool_factory); diff --git a/dbms/src/Storages/System/StorageSystemClusters.cpp b/dbms/src/Storages/System/StorageSystemClusters.cpp index b0ad56e8eb5..b33b2d86d0e 100644 --- a/dbms/src/Storages/System/StorageSystemClusters.cpp +++ b/dbms/src/Storages/System/StorageSystemClusters.cpp @@ -26,44 +26,33 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes() void StorageSystemClusters::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const { - auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info, const Cluster::Address & address) + for (const auto & name_and_cluster : context.getClusters().getContainer()) { - size_t i = 0; - res_columns[i++]->insert(cluster_name); - res_columns[i++]->insert(shard_info.shard_num); - res_columns[i++]->insert(shard_info.weight); - res_columns[i++]->insert(address.replica_num); - res_columns[i++]->insert(address.host_name); - res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString()); - res_columns[i++]->insert(address.port); - res_columns[i++]->insert(shard_info.isLocal()); - res_columns[i++]->insert(address.user); - res_columns[i++]->insert(address.default_database); - }; - - auto clusters = context.getClusters().getContainer(); - for (const auto & entry : clusters) - { - const std::string cluster_name = entry.first; - const ClusterPtr cluster = entry.second; - const auto & addresses_with_failover = cluster->getShardsAddresses(); + const String & cluster_name = name_and_cluster.first; + const ClusterPtr & cluster = name_and_cluster.second; const auto & shards_info = cluster->getShardsInfo(); + const auto & addresses_with_failover = cluster->getShardsAddresses(); - if (!addresses_with_failover.empty()) + for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) { - auto it1 = addresses_with_failover.cbegin(); - auto it2 = shards_info.cbegin(); + const auto & shard_info = shards_info[shard_index]; + const auto & shard_addresses = addresses_with_failover[shard_index]; - while (it1 != addresses_with_failover.cend()) + for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index) { - const auto & addresses = *it1; - const auto & shard_info = *it2; + size_t i = 0; + const auto & address = shard_addresses[replica_index]; - for (const auto & address : addresses) - updateColumns(cluster_name, shard_info, address); - - ++it1; - ++it2; + res_columns[i++]->insert(cluster_name); + res_columns[i++]->insert(shard_info.shard_num); + res_columns[i++]->insert(shard_info.weight); + res_columns[i++]->insert(replica_index + 1); + res_columns[i++]->insert(address.host_name); + res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString()); + res_columns[i++]->insert(address.port); + res_columns[i++]->insert(shard_info.isLocal()); + res_columns[i++]->insert(address.user); + res_columns[i++]->insert(address.default_database); } } }