dbms: Server: add support for multiple connections in PoolWithFailoverBase. [#METR-14410]

This commit is contained in:
Alexey Arno 2014-12-30 17:11:02 +03:00
parent 00324f2f9c
commit 2f643191f6
3 changed files with 38 additions and 16 deletions

View File

@ -27,6 +27,12 @@ class IConnectionPool : private boost::noncopyable
public:
typedef PoolBase<Connection>::Entry Entry;
virtual Entry get(Settings * settings = nullptr) = 0;
virtual std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr)
{
return std::vector<Entry>{ get(settings) };
}
virtual ~IConnectionPool() {}
};

View File

@ -20,11 +20,11 @@ namespace DB
*
* Замечание: если один из вложенных пулов заблокируется из-за переполнения, то этот пул тоже заблокируется.
*/
class ConnectionPoolWithFailover : public PoolWithFailoverBase<IConnectionPool, Settings*>, public IConnectionPool
class ConnectionPoolWithFailover : public PoolWithFailoverBase<IConnectionPool, Settings *>, public IConnectionPool
{
public:
typedef IConnectionPool::Entry Entry;
typedef PoolWithFailoverBase<IConnectionPool, Settings*> Base;
typedef PoolWithFailoverBase<IConnectionPool, Settings *> Base;
ConnectionPoolWithFailover(ConnectionPools & nested_pools_,
LoadBalancing load_balancing,
@ -53,23 +53,17 @@ public:
/** Выделяет соединение для работы. */
Entry get(Settings * settings = nullptr) override
{
LoadBalancing load_balancing = default_load_balancing;
if (settings)
load_balancing = settings->load_balancing;
for (size_t i = 0; i < nested_pools.size(); ++i)
{
if (load_balancing == LoadBalancing::NEAREST_HOSTNAME)
nested_pools[i].priority = hostname_differences[i];
else if (load_balancing == LoadBalancing::RANDOM)
nested_pools[i].priority = 0;
else
throw Exception("Unknown load_balancing_mode: " + toString(static_cast<int>(load_balancing)), ErrorCodes::LOGICAL_ERROR);
}
applyLoadBalancing(settings);
return Base::get(settings);
}
/** Выделяет до указанного количества соединений. */
std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr) override
{
applyLoadBalancing(settings);
return Base::getMany(max_connections, settings);
}
protected:
bool tryGet(ConnectionPoolPtr pool, Settings * settings, Entry & out_entry, std::stringstream & fail_message) override
{
@ -92,6 +86,23 @@ protected:
private:
std::vector<size_t> hostname_differences; /// Расстояния от имени этого хоста до имен хостов пулов.
LoadBalancing default_load_balancing;
void applyLoadBalancing(Settings * settings)
{
LoadBalancing load_balancing = default_load_balancing;
if (settings)
load_balancing = settings->load_balancing;
for (size_t i = 0; i < nested_pools.size(); ++i)
{
if (load_balancing == LoadBalancing::NEAREST_HOSTNAME)
nested_pools[i].priority = hostname_differences[i];
else if (load_balancing == LoadBalancing::RANDOM)
nested_pools[i].priority = 0;
else
throw Exception("Unknown load_balancing_mode: " + toString(static_cast<int>(load_balancing)), ErrorCodes::LOGICAL_ERROR);
}
}
};

View File

@ -87,6 +87,11 @@ struct Settings
M(SettingBool, compile, false) \
/** Количество одинаковых по структуре запросов перед тем, как инициируется их компиляция. */ \
M(SettingUInt64, min_count_to_compile, 0) \
\
/** Максимальное количество используемых реплик каждого шарда при выполнении запроса */ \
M(SettingUInt64, max_parallel_replicas, 1) \
M(SettingUInt64, parallel_replicas_count, 0) \
M(SettingUInt64, parallel_replica_offset, 0) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;