mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
dbms: Server: add support for multiple connections in PoolWithFailoverBase. [#METR-14410]
This commit is contained in:
parent
83234d153c
commit
218bfa5316
@ -27,6 +27,12 @@ class IConnectionPool : private boost::noncopyable
|
||||
public:
|
||||
typedef PoolBase<Connection>::Entry Entry;
|
||||
virtual Entry get(Settings * settings = nullptr) = 0;
|
||||
|
||||
virtual std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr)
|
||||
{
|
||||
return std::vector<Entry>{ get(settings) };
|
||||
}
|
||||
|
||||
virtual ~IConnectionPool() {}
|
||||
};
|
||||
|
||||
|
@ -20,11 +20,11 @@ namespace DB
|
||||
*
|
||||
* Замечание: если один из вложенных пулов заблокируется из-за переполнения, то этот пул тоже заблокируется.
|
||||
*/
|
||||
class ConnectionPoolWithFailover : public PoolWithFailoverBase<IConnectionPool, Settings*>, public IConnectionPool
|
||||
class ConnectionPoolWithFailover : public PoolWithFailoverBase<IConnectionPool, Settings *>, public IConnectionPool
|
||||
{
|
||||
public:
|
||||
typedef IConnectionPool::Entry Entry;
|
||||
typedef PoolWithFailoverBase<IConnectionPool, Settings*> Base;
|
||||
typedef PoolWithFailoverBase<IConnectionPool, Settings *> Base;
|
||||
|
||||
ConnectionPoolWithFailover(ConnectionPools & nested_pools_,
|
||||
LoadBalancing load_balancing,
|
||||
@ -53,25 +53,17 @@ public:
|
||||
/** Выделяет соединение для работы. */
|
||||
Entry get(Settings * settings = nullptr) override
|
||||
{
|
||||
LoadBalancing load_balancing = default_load_balancing;
|
||||
if (settings)
|
||||
load_balancing = settings->load_balancing;
|
||||
|
||||
for (size_t i = 0; i < nested_pools.size(); ++i)
|
||||
{
|
||||
if (load_balancing == LoadBalancing::NEAREST_HOSTNAME)
|
||||
nested_pools[i].priority = hostname_differences[i];
|
||||
else if (load_balancing == LoadBalancing::RANDOM)
|
||||
nested_pools[i].priority = 0;
|
||||
else if (load_balancing == LoadBalancing::IN_ORDER)
|
||||
nested_pools[i].priority = i;
|
||||
else
|
||||
throw Exception("Unknown load_balancing_mode: " + toString(static_cast<int>(load_balancing)), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
applyLoadBalancing(settings);
|
||||
return Base::get(settings);
|
||||
}
|
||||
|
||||
/** Выделяет до указанного количества соединений. */
|
||||
std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr) override
|
||||
{
|
||||
applyLoadBalancing(settings);
|
||||
return Base::getMany(max_connections, settings);
|
||||
}
|
||||
|
||||
protected:
|
||||
bool tryGet(ConnectionPoolPtr pool, Settings * settings, Entry & out_entry, std::stringstream & fail_message) override
|
||||
{
|
||||
@ -94,6 +86,25 @@ protected:
|
||||
private:
|
||||
std::vector<size_t> hostname_differences; /// Расстояния от имени этого хоста до имен хостов пулов.
|
||||
LoadBalancing default_load_balancing;
|
||||
|
||||
void applyLoadBalancing(Settings * settings)
|
||||
{
|
||||
LoadBalancing load_balancing = default_load_balancing;
|
||||
if (settings)
|
||||
load_balancing = settings->load_balancing;
|
||||
|
||||
for (size_t i = 0; i < nested_pools.size(); ++i)
|
||||
{
|
||||
if (load_balancing == LoadBalancing::NEAREST_HOSTNAME)
|
||||
nested_pools[i].priority = hostname_differences[i];
|
||||
else if (load_balancing == LoadBalancing::RANDOM)
|
||||
nested_pools[i].priority = 0;
|
||||
else if (load_balancing == LoadBalancing::IN_ORDER)
|
||||
nested_pools[i].priority = i;
|
||||
else
|
||||
throw Exception("Unknown load_balancing_mode: " + toString(static_cast<int>(load_balancing)), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
@ -87,6 +87,11 @@ struct Settings
|
||||
M(SettingBool, compile, false) \
|
||||
/** Количество одинаковых по структуре запросов перед тем, как инициируется их компиляция. */ \
|
||||
M(SettingUInt64, min_count_to_compile, 0) \
|
||||
\
|
||||
/** Максимальное количество используемых реплик каждого шарда при выполнении запроса */ \
|
||||
M(SettingUInt64, max_parallel_replicas, 1) \
|
||||
M(SettingUInt64, parallel_replicas_count, 0) \
|
||||
M(SettingUInt64, parallel_replica_offset, 0) \
|
||||
|
||||
/// Всевозможные ограничения на выполнение запроса.
|
||||
Limits limits;
|
||||
|
Loading…
Reference in New Issue
Block a user