mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
dbms: added nearest_host mode to ConnectionPoolWithFailover [#METR-9350]
This commit is contained in:
parent
cc3882b81f
commit
b9caf82430
@ -140,6 +140,10 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
const std::string & getHost() const
|
||||
{
|
||||
return host;
|
||||
}
|
||||
private:
|
||||
/** Максимально возможное количество соедиений. */
|
||||
unsigned max_connections;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/DNS.h>
|
||||
|
||||
#include <DB/Client/ConnectionPool.h>
|
||||
|
||||
@ -22,22 +23,20 @@ class ConnectionPoolWithFailover : public IConnectionPool
|
||||
public:
|
||||
typedef detail::ConnectionPoolEntry Entry;
|
||||
|
||||
|
||||
ConnectionPoolWithFailover(ConnectionPools & nested_pools_, size_t max_tries_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES)
|
||||
: nested_pools(nested_pools_.begin(), nested_pools_.end()), max_tries(max_tries_),
|
||||
ConnectionPoolWithFailover(ConnectionPools & nested_pools_, size_t load_balancing_mode_,
|
||||
size_t max_tries_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
|
||||
size_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD)
|
||||
: nested_pools(nested_pools_.begin(), nested_pools_.end(), load_balancing_mode_, decrease_error_period_), max_tries(max_tries_),
|
||||
log(&Logger::get("ConnectionPoolWithFailover"))
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
/** Выделяет соединение для работы. */
|
||||
Entry get()
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
|
||||
|
||||
for (size_t i = 0, size = nested_pools.size(); i < size; ++i)
|
||||
nested_pools[i].randomize();
|
||||
|
||||
nested_pools.update();
|
||||
std::sort(nested_pools.begin(), nested_pools.end());
|
||||
|
||||
std::stringstream fail_messages;
|
||||
@ -82,12 +81,25 @@ private:
|
||||
UInt64 error_count;
|
||||
UInt32 random;
|
||||
drand48_data rand_state;
|
||||
size_t load_balancing_mode;
|
||||
|
||||
PoolWithErrorCount(const ConnectionPoolPtr & pool_)
|
||||
: pool(pool_), error_count(0), random(0)
|
||||
{
|
||||
/// Инициализация плохая, но это не важно.
|
||||
srand48_r(reinterpret_cast<ptrdiff_t>(this), &rand_state);
|
||||
|
||||
std::string local_hostname = Poco::Net::DNS::hostName();
|
||||
|
||||
|
||||
ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*pool);
|
||||
const std::string & host = connection_pool.getHost();
|
||||
hostname_difference = 0;
|
||||
for (size_t i = 0; i < std::min(local_hostname.length(), host.length()); ++i)
|
||||
{
|
||||
if (local_hostname[i] != host[i])
|
||||
++hostname_difference;
|
||||
}
|
||||
}
|
||||
|
||||
void randomize()
|
||||
@ -99,16 +111,72 @@ private:
|
||||
|
||||
bool operator< (const PoolWithErrorCount & rhs) const
|
||||
{
|
||||
return error_count < rhs.error_count
|
||||
|| (error_count == rhs.error_count && random < rhs.random);
|
||||
if (load_balancing_mode == LoadBalancing::RANDOM)
|
||||
{
|
||||
return error_count < rhs.error_count
|
||||
|| (error_count == rhs.error_count && random < rhs.random);
|
||||
}
|
||||
else if (load_balancing_mode == LoadBalancing::NEAREST_HOSTNAME)
|
||||
{
|
||||
return error_count < rhs.error_count
|
||||
|| (error_count == rhs.error_count && hostname_difference < rhs.hostname_difference);
|
||||
}
|
||||
else
|
||||
throw Poco::Exception("Unsupported LoadBalancing mode = " + load_balancing_mode);
|
||||
}
|
||||
|
||||
private:
|
||||
/// берётся имя локального сервера (Poco::Net::DNS::hostName) и имя хоста из конфига; строки обрезаются до минимальной длины;
|
||||
/// затем считается количество отличающихся позиций
|
||||
/// Пример example01-01-1 и example01-02-2 отличаются в двух позициях.
|
||||
size_t hostname_difference;
|
||||
};
|
||||
|
||||
class PoolsWithErrorCount : public std::vector<PoolWithErrorCount>
|
||||
{
|
||||
public:
|
||||
PoolsWithErrorCount(DB::ConnectionPools::iterator first, DB::ConnectionPools::iterator last,
|
||||
size_t load_balancing_mode_, size_t decrease_error_period_) :
|
||||
std::vector<PoolWithErrorCount>(first, last), load_balancing_mode(load_balancing_mode_), last_get_time(0),
|
||||
decrease_error_period(decrease_error_period_)
|
||||
{
|
||||
for (PoolsWithErrorCount::iterator it = begin(); it != end(); ++it)
|
||||
{
|
||||
it->load_balancing_mode = load_balancing_mode;
|
||||
}
|
||||
}
|
||||
|
||||
void update()
|
||||
{
|
||||
if (load_balancing_mode == LoadBalancing::RANDOM)
|
||||
{
|
||||
for (PoolsWithErrorCount::iterator it = begin(); it != end(); ++it)
|
||||
it->randomize();
|
||||
}
|
||||
/// В режиме NEAREST_HOSTNAME каждые N секунд уменьшаем количество ошибок в 2 раза
|
||||
else if (last_get_time && load_balancing_mode == LoadBalancing::NEAREST_HOSTNAME)
|
||||
{
|
||||
time_t delta = time(0) - last_get_time;
|
||||
for (PoolsWithErrorCount::iterator it = begin(); it != end(); ++it)
|
||||
{
|
||||
it->error_count = it->error_count >> (delta/decrease_error_period);
|
||||
}
|
||||
}
|
||||
last_get_time = time(0);
|
||||
}
|
||||
|
||||
private:
|
||||
size_t load_balancing_mode;
|
||||
|
||||
/// время, когда последний раз вызывался update
|
||||
time_t last_get_time;
|
||||
time_t decrease_error_period;
|
||||
};
|
||||
|
||||
Poco::FastMutex mutex;
|
||||
|
||||
typedef std::vector<PoolWithErrorCount> PoolsWithErrorCount;
|
||||
PoolsWithErrorCount nested_pools;
|
||||
|
||||
|
||||
size_t max_tries;
|
||||
|
||||
Logger * log;
|
||||
|
@ -25,6 +25,7 @@
|
||||
#define DEFAULT_INTERACTIVE_DELAY 100000
|
||||
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
|
||||
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
|
||||
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD 300 /// каждый период уменьшаем счетчик ошибок в 2 раза
|
||||
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов.
|
||||
|
||||
/// Используется в методе reserve, когда известно число строк, но неизвестны их размеры.
|
||||
|
@ -10,6 +10,18 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace LoadBalancing
|
||||
{
|
||||
enum LoadBalancing
|
||||
{
|
||||
/// среди реплик с минимальным количеством ошибок выбирается случайная
|
||||
RANDOM = 1,
|
||||
/// среди реплик с минимальным количеством ошибок выбирается реплика
|
||||
/// с минимальным количеством отличающихся символов в имени реплики и имени локального хоста
|
||||
NEAREST_HOSTNAME = 2
|
||||
};
|
||||
}
|
||||
|
||||
/** Настройки выполнения запроса.
|
||||
*/
|
||||
struct Settings
|
||||
@ -48,14 +60,6 @@ struct Settings
|
||||
/// Использовать ли SplittingAggregator вместо обычного. Он быстрее для запросов с большим состоянием агрегации.
|
||||
bool use_splitting_aggregator;
|
||||
|
||||
enum LoadBalancing
|
||||
{
|
||||
/// среди реплик с минимальным количеством ошибок выбирается случайная
|
||||
RANDOM = 1,
|
||||
/// среди реплик с минимальным количеством ошибок выбирается реплика
|
||||
/// с минимальным количеством отличающихся символов в имени реплики и имени локального хоста
|
||||
NEAREST_HOSTNAME = 2
|
||||
};
|
||||
size_t load_balancing;
|
||||
|
||||
/// Всевозможные ограничения на выполнение запроса.
|
||||
@ -77,7 +81,7 @@ struct Settings
|
||||
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
|
||||
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES),
|
||||
sign_rewrite(false), extremes(false), use_uncompressed_cache(true), use_splitting_aggregator(false),
|
||||
load_balancing(NEAREST_HOSTNAME)
|
||||
load_balancing(LoadBalancing::RANDOM)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
|
||||
if (has_local_replics)
|
||||
++local_nodes_num;
|
||||
else
|
||||
pools.push_back(new ConnectionPoolWithFailover(replicas, settings.connections_with_failover_max_tries));
|
||||
pools.push_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
|
||||
}
|
||||
}
|
||||
else if (addresses.size())
|
||||
|
Loading…
Reference in New Issue
Block a user