mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 05:22:17 +00:00
fix excessive copy
This commit is contained in:
parent
4798ab1924
commit
994d1c0fe0
@ -122,6 +122,47 @@ String Cluster::Address::toStringFull() const
|
||||
+ ((secure == Protocol::Secure::Enable) ? "+secure" : "");
|
||||
}
|
||||
|
||||
void Cluster::Address::fromFullString(const String & full_string, Cluster::Address & address)
|
||||
{
|
||||
const char * address_begin = full_string.data();
|
||||
const char * address_end = address_begin + full_string.size();
|
||||
|
||||
Protocol::Secure secure = Protocol::Secure::Disable;
|
||||
const char * secure_tag = "+secure";
|
||||
if (endsWith(full_string, secure_tag))
|
||||
{
|
||||
address_end -= strlen(secure_tag);
|
||||
secure = Protocol::Secure::Enable;
|
||||
}
|
||||
|
||||
const char * user_pw_end = strchr(full_string.data(), '@');
|
||||
const char * colon = strchr(full_string.data(), ':');
|
||||
if (!user_pw_end || !colon)
|
||||
throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
const bool has_pw = colon < user_pw_end;
|
||||
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
|
||||
if (!host_end)
|
||||
throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
const char * has_db = strchr(full_string.data(), '#');
|
||||
const char * port_end = has_db ? has_db : address_end;
|
||||
|
||||
address.secure = secure;
|
||||
address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
|
||||
address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end));
|
||||
address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
|
||||
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
|
||||
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
|
||||
}
|
||||
|
||||
bool Cluster::Address::operator==(const Cluster::Address & other) const
|
||||
{
|
||||
return other.host_name == host_name && other.port == port
|
||||
&& other.secure == secure && other.user == user
|
||||
&& other.password == password && other.default_database == default_database;
|
||||
}
|
||||
|
||||
|
||||
/// Implementation of Clusters class
|
||||
|
||||
@ -198,7 +239,6 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
const auto weight = config.getInt(prefix + ".weight", default_weight);
|
||||
|
||||
addresses.emplace_back(config, prefix);
|
||||
addresses.back().replica_num = 1;
|
||||
const auto & address = addresses.back();
|
||||
|
||||
ShardInfo info;
|
||||
@ -253,7 +293,6 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
if (startsWith(replica_key, "replica"))
|
||||
{
|
||||
replica_addresses.emplace_back(config, partial_prefix + replica_key);
|
||||
replica_addresses.back().replica_num = current_replica_num;
|
||||
++current_replica_num;
|
||||
|
||||
if (!replica_addresses.back().is_local)
|
||||
|
@ -59,7 +59,6 @@ public:
|
||||
String password;
|
||||
/// This database is selected when no database is specified for Distributed table
|
||||
String default_database;
|
||||
UInt32 replica_num;
|
||||
/// The locality is determined at the initialization, and is not changed even if DNS is changed
|
||||
bool is_local;
|
||||
Protocol::Compression compression = Protocol::Compression::Enable;
|
||||
@ -82,12 +81,16 @@ public:
|
||||
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
|
||||
String toStringFull() const;
|
||||
|
||||
static void fromFullString(const String & address_full_string, Address & address);
|
||||
|
||||
/// Returns initially resolved address
|
||||
Poco::Net::SocketAddress getResolvedAddress() const
|
||||
{
|
||||
return initially_resolved_address;
|
||||
}
|
||||
|
||||
bool operator==(const Address & other) const;
|
||||
|
||||
private:
|
||||
Poco::Net::SocketAddress initially_resolved_address;
|
||||
};
|
||||
|
@ -48,40 +48,9 @@ namespace
|
||||
|
||||
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
|
||||
{
|
||||
const auto address = boost::copy_range<std::string>(*it);
|
||||
const char * address_begin = static_cast<const char*>(address.data());
|
||||
const char * address_end = address_begin + address.size();
|
||||
|
||||
Protocol::Secure secure = Protocol::Secure::Disable;
|
||||
const char * secure_tag = "+secure";
|
||||
if (endsWith(address, secure_tag))
|
||||
{
|
||||
address_end -= strlen(secure_tag);
|
||||
secure = Protocol::Secure::Enable;
|
||||
}
|
||||
|
||||
const char * user_pw_end = strchr(address.data(), '@');
|
||||
const char * colon = strchr(address.data(), ':');
|
||||
if (!user_pw_end || !colon)
|
||||
throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port#default_database' pattern",
|
||||
ErrorCodes::INCORRECT_FILE_NAME};
|
||||
|
||||
const bool has_pw = colon < user_pw_end;
|
||||
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
|
||||
if (!host_end)
|
||||
throw Exception{"Shard address '" + address + "' does not contain port", ErrorCodes::INCORRECT_FILE_NAME};
|
||||
|
||||
const char * has_db = strchr(address.data(), '#');
|
||||
const char * port_end = has_db ? has_db : address_end;
|
||||
|
||||
const auto user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
|
||||
const auto password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
|
||||
const auto host = unescapeForFileName(std::string(user_pw_end + 1, host_end));
|
||||
const auto port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
|
||||
const auto database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end))
|
||||
: std::string();
|
||||
|
||||
pools.emplace_back(factory(host, port, secure, user, password, database));
|
||||
Cluster::Address address;
|
||||
Cluster::Address::fromFullString(boost::copy_range<std::string>(*it), address);
|
||||
pools.emplace_back(factory(address));
|
||||
}
|
||||
|
||||
return pools;
|
||||
@ -175,33 +144,29 @@ void StorageDistributedDirectoryMonitor::run()
|
||||
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
|
||||
{
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.context.getSettingsRef());
|
||||
const auto pool_factory = [&storage, &timeouts](const std::string & host, const UInt16 port,
|
||||
const Protocol::Secure secure,
|
||||
const std::string & user, const std::string & password,
|
||||
const std::string & default_database) -> ConnectionPoolPtr
|
||||
const auto pool_factory = [&storage, &timeouts](const Cluster::Address & address) -> ConnectionPoolPtr
|
||||
{
|
||||
ClusterPtr cluster = storage.getCluster();
|
||||
const auto shards_info = cluster->getShardsInfo();
|
||||
const auto shards_addresses = cluster->getShardsAddresses();
|
||||
const auto & cluster = storage.getCluster();
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
const auto & shards_addresses = cluster->getShardsAddresses();
|
||||
|
||||
/// existing connections pool have a higher priority
|
||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||
{
|
||||
Cluster::Addresses replicas_addresses = shards_addresses[shard_index];
|
||||
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
|
||||
|
||||
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
|
||||
{
|
||||
Cluster::Address replica_address = replicas_addresses[replica_index];
|
||||
const Cluster::Address & replica_address = replicas_addresses[replica_index];
|
||||
|
||||
if (replica_address.host_name == host && replica_address.port == port
|
||||
&& replica_address.secure == secure && replica_address.user == user
|
||||
&& replica_address.password == password && replica_address.default_database == default_database)
|
||||
if (address == replica_address)
|
||||
return shards_info[shard_index].per_replica_pools[replica_index];
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_shared<ConnectionPool>(1, host, port, default_database, user, password, timeouts,
|
||||
storage.getName() + '_' + user, Protocol::Compression::Enable, secure);
|
||||
return std::make_shared<ConnectionPool>(
|
||||
1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts,
|
||||
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
|
||||
};
|
||||
|
||||
auto pools = createPoolsForAddresses(name, pool_factory);
|
||||
|
@ -26,44 +26,33 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
|
||||
|
||||
void StorageSystemClusters::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
|
||||
{
|
||||
auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info, const Cluster::Address & address)
|
||||
for (const auto & name_and_cluster : context.getClusters().getContainer())
|
||||
{
|
||||
size_t i = 0;
|
||||
res_columns[i++]->insert(cluster_name);
|
||||
res_columns[i++]->insert(shard_info.shard_num);
|
||||
res_columns[i++]->insert(shard_info.weight);
|
||||
res_columns[i++]->insert(address.replica_num);
|
||||
res_columns[i++]->insert(address.host_name);
|
||||
res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString());
|
||||
res_columns[i++]->insert(address.port);
|
||||
res_columns[i++]->insert(shard_info.isLocal());
|
||||
res_columns[i++]->insert(address.user);
|
||||
res_columns[i++]->insert(address.default_database);
|
||||
};
|
||||
|
||||
auto clusters = context.getClusters().getContainer();
|
||||
for (const auto & entry : clusters)
|
||||
{
|
||||
const std::string cluster_name = entry.first;
|
||||
const ClusterPtr cluster = entry.second;
|
||||
const auto & addresses_with_failover = cluster->getShardsAddresses();
|
||||
const String & cluster_name = name_and_cluster.first;
|
||||
const ClusterPtr & cluster = name_and_cluster.second;
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
const auto & addresses_with_failover = cluster->getShardsAddresses();
|
||||
|
||||
if (!addresses_with_failover.empty())
|
||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||
{
|
||||
auto it1 = addresses_with_failover.cbegin();
|
||||
auto it2 = shards_info.cbegin();
|
||||
const auto & shard_info = shards_info[shard_index];
|
||||
const auto & shard_addresses = addresses_with_failover[shard_index];
|
||||
|
||||
while (it1 != addresses_with_failover.cend())
|
||||
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
|
||||
{
|
||||
const auto & addresses = *it1;
|
||||
const auto & shard_info = *it2;
|
||||
size_t i = 0;
|
||||
const auto & address = shard_addresses[replica_index];
|
||||
|
||||
for (const auto & address : addresses)
|
||||
updateColumns(cluster_name, shard_info, address);
|
||||
|
||||
++it1;
|
||||
++it2;
|
||||
res_columns[i++]->insert(cluster_name);
|
||||
res_columns[i++]->insert(shard_info.shard_num);
|
||||
res_columns[i++]->insert(shard_info.weight);
|
||||
res_columns[i++]->insert(replica_index + 1);
|
||||
res_columns[i++]->insert(address.host_name);
|
||||
res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString());
|
||||
res_columns[i++]->insert(address.port);
|
||||
res_columns[i++]->insert(shard_info.isLocal());
|
||||
res_columns[i++]->insert(address.user);
|
||||
res_columns[i++]->insert(address.default_database);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user