Add replica priority for load_balancing

Make default 1, to match with <weight>
This commit is contained in:
Azat Khuzhin 2020-06-27 09:52:10 +03:00
parent 03def348c8
commit ebff4eae7d
7 changed files with 56 additions and 18 deletions

View File

@ -267,6 +267,8 @@
<replica>
<host>localhost</host>
<port>9000</port>
<!-- Optional. Priority of the replica for load_balancing. Default: 1 (less value has more priority). -->
<!-- <priority>1</priority> -->
</replica>
</shard>
</test_shard_localhost>

View File

@ -33,6 +33,8 @@ public:
virtual Entry get(const ConnectionTimeouts & timeouts,
const Settings * settings = nullptr,
bool force_connected = true) = 0;
virtual Int64 getPriority() const { return 1; }
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
@ -54,7 +56,8 @@ public:
const String & password_,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable)
Protocol::Secure secure_ = Protocol::Secure::Disable,
Int64 priority_ = 1)
: Base(max_connections_,
&Poco::Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_),
@ -64,7 +67,8 @@ public:
password(password_),
client_name(client_name_),
compression(compression_),
secure{secure_}
secure(secure_),
priority(priority_)
{
}
@ -93,6 +97,11 @@ public:
return host + ":" + toString(port);
}
Int64 getPriority() const override
{
return priority;
}
protected:
/** Creates a new object to put in the pool. */
ConnectionPtr allocObject() override
@ -111,8 +120,9 @@ private:
String password;
String client_name;
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
Int64 priority; /// priority from <remote_servers>
};

View File

@ -90,6 +90,14 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
return Base::get(max_ignored_errors, fallback_to_stale_replicas, try_get_entry, get_priority);
}
Int64 ConnectionPoolWithFailover::getPriority() const
{
return (*std::max_element(nested_pools.begin(), nested_pools.end(), [](const auto &a, const auto &b)
{
return a->getPriority() - b->getPriority();
}))->getPriority();
}
ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
{
const Base::PoolStates states = getPoolStates();

View File

@ -47,6 +47,8 @@ public:
const Settings * settings,
bool force_connected) override; /// From IConnectionPool
Int64 getPriority() const override; /// From IConnectionPool
/** Allocates up to the specified number of connections to work.
* Connections provide access to different replicas of one shard.
*/

View File

@ -64,6 +64,8 @@ public:
, shared_pool_states(nested_pools.size())
, log(log_)
{
for (size_t i = 0;i < nested_pools.size(); ++i)
shared_pool_states[i].config_priority = nested_pools[i]->getPriority();
}
struct TryResult
@ -304,6 +306,9 @@ template <typename TNestedPool>
struct PoolWithFailoverBase<TNestedPool>::PoolState
{
UInt64 error_count = 0;
/// Priority from the <remote_server> configuration.
Int64 config_priority = 1;
/// Priority from the GetPriorityFunc.
Int64 priority = 0;
UInt32 random = 0;
@ -314,8 +319,8 @@ struct PoolWithFailoverBase<TNestedPool>::PoolState
static bool compare(const PoolState & lhs, const PoolState & rhs)
{
return std::forward_as_tuple(lhs.error_count, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.priority, rhs.random);
return std::forward_as_tuple(lhs.error_count, lhs.config_priority, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.error_count, rhs.config_priority, rhs.priority, rhs.random);
}
private:

View File

@ -85,18 +85,20 @@ Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, cons
default_database = config.getString(config_prefix + ".default_database", "");
secure = config.getBool(config_prefix + ".secure", false) ? Protocol::Secure::Enable : Protocol::Secure::Disable;
compression = config.getBool(config_prefix + ".compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
priority = config.getInt(config_prefix + ".priority", 1);
const char * port_type = secure == Protocol::Secure::Enable ? "tcp_port_secure" : "tcp_port";
is_local = isLocal(config.getInt(port_type, 0));
}
Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_)
Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_, Int64 priority_)
: user(user_), password(password_)
{
auto parsed_host_port = parseAddress(host_port_, clickhouse_port);
host_name = parsed_host_port.first;
port = parsed_host_port.second;
secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = priority_;
is_local = isLocal(clickhouse_port);
}
@ -208,6 +210,7 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string)
address.user = unescapeForFileName(std::string(address_begin, has_pw ? colon : user_pw_end));
address.password = has_pw ? unescapeForFileName(std::string(colon + 1, user_pw_end)) : std::string();
address.default_database = has_db ? unescapeForFileName(std::string(has_db + 1, address_end)) : std::string();
// address.priority ignored
return address;
}
}
@ -301,7 +304,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
address.host_name, address.port,
address.default_database, address.user, address.password,
"server", address.compression, address.secure);
"server", address.compression,
address.secure, address.priority);
info.pool = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, settings.load_balancing);
@ -374,7 +378,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
"server", replica.compression, replica.secure);
"server", replica.compression,
replica.secure, replica.priority);
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local)
@ -413,7 +418,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote, bool secure)
const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote,
bool secure, Int64 priority)
{
UInt32 current_shard_num = 1;
@ -421,7 +427,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
{
Addresses current;
for (const auto & replica : shard)
current.emplace_back(replica, username, password, clickhouse_port, secure);
current.emplace_back(replica, username, password, clickhouse_port, secure, priority);
addresses_with_failover.emplace_back(current);
@ -435,7 +441,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
"server", replica.compression, replica.secure);
"server", replica.compression, replica.secure, replica.priority);
all_replicas.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
@ -454,7 +460,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
std::move(shard_local_addresses),
std::move(shard_pool),
std::move(all_replicas),
false
false // has_internal_replication
});
++current_shard_num;
}
@ -541,7 +547,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
address.password,
"server",
address.compression,
address.secure);
address.secure,
address.priority);
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};

View File

@ -28,7 +28,8 @@ public:
/// This parameter is needed only to check that some address is local (points to ourself).
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password,
UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false);
UInt16 clickhouse_port, bool treat_local_as_remote,
bool secure = false, Int64 priority = 1);
Cluster(const Cluster &)= delete;
Cluster & operator=(const Cluster &) = delete;
@ -44,7 +45,7 @@ public:
* <node>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database>, <compression>. <secure> if needed -->
* <!-- <user>, <password>, <default_database>, <compression>, <priority>. <secure> if needed -->
* </node>
* ...
* or in <shard> and inside in <replica> elements:
@ -52,7 +53,7 @@ public:
* <replica>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database>, <compression>. <secure> if needed -->
* <!-- <user>, <password>, <default_database>, <compression>, <priority>. <secure> if needed -->
* </replica>
* </shard>
*/
@ -73,6 +74,8 @@ public:
Protocol::Compression compression = Protocol::Compression::Enable;
Protocol::Secure secure = Protocol::Secure::Disable;
Int64 priority = 1;
Address() = default;
Address(
const Poco::Util::AbstractConfiguration & config,
@ -84,7 +87,8 @@ public:
const String & user_,
const String & password_,
UInt16 clickhouse_port,
bool secure_ = false);
bool secure_ = false,
Int64 priority_ = 1);
/// Returns 'escaped_host_name:port'
String toString() const;