mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 18:50:49 +00:00
second attempt
This commit is contained in:
parent
236f461420
commit
165afabdd2
@ -139,53 +139,17 @@ String Cluster::Address::toFullString() const
|
||||
{
|
||||
return
|
||||
((shard_number == 0) ? "" : "shard" + std::to_string(shard_number)) +
|
||||
((replica_number == 0) ? "" : "_replica" + std::to_string(replica_number)) + '@' +
|
||||
escapeForFileName(host_name) + ':' +
|
||||
std::to_string(port) +
|
||||
(default_database.empty() ? "" : ('#' +
|
||||
escapeForFileName(default_database))) +
|
||||
((secure == Protocol::Secure::Enable) ? "+secure" : "");
|
||||
((replica_number == 0) ? "" : "_replica" + std::to_string(replica_number));
|
||||
}
|
||||
|
||||
Cluster::Address Cluster::Address::fromFullString(const String & full_string)
|
||||
{
|
||||
const char * address_begin = full_string.data();
|
||||
const char * address_end = address_begin + full_string.size();
|
||||
|
||||
Protocol::Secure secure = Protocol::Secure::Disable;
|
||||
const char * secure_tag = "+secure";
|
||||
if (endsWith(full_string, secure_tag))
|
||||
{
|
||||
address_end -= strlen(secure_tag);
|
||||
secure = Protocol::Secure::Enable;
|
||||
}
|
||||
|
||||
|
||||
const char * underscore = strchr(full_string.data(), '_');
|
||||
const char * slash = strchr(full_string.data(), '/');
|
||||
const char * user_pw_end = strchr(full_string.data(), '@');
|
||||
const char * colon = strchr(full_string.data(), ':');
|
||||
const bool has_shard = startsWith(full_string, "shard");
|
||||
if (has_shard && !slash)
|
||||
throw Exception("Incorrect [shard{shard_number}[_replica{replica_number}]]/user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
|
||||
if (!user_pw_end || !colon)
|
||||
throw Exception("Incorrect user[:password]@host:port#default_database format " + full_string, ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
const bool has_pw = colon < user_pw_end;
|
||||
const char * host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
|
||||
if (!host_end)
|
||||
throw Exception("Incorrect address '" + full_string + "', it does not contain port", ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
const char * has_db = strchr(full_string.data(), '#');
|
||||
const char * port_end = has_db ? has_db : address_end;
|
||||
bool has_shard = startsWith("shard", full_string);
|
||||
bool underscore = strchr(full_string.data(), '_');
|
||||
|
||||
Address address;
|
||||
address.secure = secure;
|
||||
address.port = parse<UInt16>(host_end + 1, port_end - (host_end + 1));
|
||||
address.host_name = unescapeForFileName(std::string(user_pw_end + 1, host_end));
|
||||
address.user = unescapeForFileName(std::string(slash + 1, has_pw ? colon : user_pw_end));
|
||||
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
|
||||
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
|
||||
address.shard_number = has_shard ? parse<UInt32>(address_begin + 5) : 0;
|
||||
address.replica_number = underscore ? parse<UInt32>(underscore + 8) : 0;
|
||||
return address;
|
||||
|
@ -57,8 +57,9 @@ public:
|
||||
UInt16 port;
|
||||
String user;
|
||||
String password;
|
||||
UInt32 shard_number{};
|
||||
UInt32 replica_number{};
|
||||
UInt32 shard_number{}; /// shard serial number in configuration file
|
||||
UInt32 replica_number{}; /// replica serial number in this shard
|
||||
|
||||
/// This database is selected when no database is specified for Distributed table
|
||||
String default_database;
|
||||
/// The locality is determined at the initialization, and is not changed even if DNS is changed
|
||||
|
@ -188,25 +188,14 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
|
||||
{
|
||||
const auto & cluster = storage.getCluster();
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
const auto & shards_addresses = cluster->getShardsAddresses();
|
||||
// const auto & shards_addresses = cluster->getShardsInfo();
|
||||
|
||||
/// existing connections pool have a higher priority
|
||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||
{
|
||||
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
|
||||
return shards_info[address.shard_number].per_replica_pools[address.replica_number];
|
||||
|
||||
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
|
||||
{
|
||||
const Cluster::Address & replica_address = replicas_addresses[replica_index];
|
||||
|
||||
if (address == replica_address)
|
||||
return shards_info[shard_index].per_replica_pools[replica_index];
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_shared<ConnectionPool>(
|
||||
/*return std::make_shared<ConnectionPool>(
|
||||
1, address.host_name, address.port, address.default_database, address.user, address.password,
|
||||
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
|
||||
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);*/
|
||||
};
|
||||
|
||||
auto pools = createPoolsForAddresses(name, pool_factory);
|
||||
|
Loading…
Reference in New Issue
Block a user