From 2f643191f6c83b1d15cc0467c8aaae374c3c9110 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 30 Dec 2014 17:11:02 +0300 Subject: [PATCH] dbms: Server: add support for multiple connections in PoolWithFailoverBase. [#METR-14410] --- dbms/include/DB/Client/ConnectionPool.h | 6 +++ .../DB/Client/ConnectionPoolWithFailover.h | 43 ++++++++++++------- dbms/include/DB/Interpreters/Settings.h | 5 +++ 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/dbms/include/DB/Client/ConnectionPool.h b/dbms/include/DB/Client/ConnectionPool.h index 92f7d6706c4..aece5616f36 100644 --- a/dbms/include/DB/Client/ConnectionPool.h +++ b/dbms/include/DB/Client/ConnectionPool.h @@ -27,6 +27,12 @@ class IConnectionPool : private boost::noncopyable public: typedef PoolBase::Entry Entry; virtual Entry get(Settings * settings = nullptr) = 0; + + virtual std::vector getMany(unsigned max_connections, Settings * settings = nullptr) + { + return std::vector{ get(settings) }; + } + virtual ~IConnectionPool() {} }; diff --git a/dbms/include/DB/Client/ConnectionPoolWithFailover.h b/dbms/include/DB/Client/ConnectionPoolWithFailover.h index f018b413367..169d71aa836 100644 --- a/dbms/include/DB/Client/ConnectionPoolWithFailover.h +++ b/dbms/include/DB/Client/ConnectionPoolWithFailover.h @@ -20,11 +20,11 @@ namespace DB * * Замечание: если один из вложенных пулов заблокируется из-за переполнения, то этот пул тоже заблокируется. */ -class ConnectionPoolWithFailover : public PoolWithFailoverBase, public IConnectionPool +class ConnectionPoolWithFailover : public PoolWithFailoverBase, public IConnectionPool { public: typedef IConnectionPool::Entry Entry; - typedef PoolWithFailoverBase Base; + typedef PoolWithFailoverBase Base; ConnectionPoolWithFailover(ConnectionPools & nested_pools_, LoadBalancing load_balancing, @@ -53,23 +53,17 @@ public: /** Выделяет соединение для работы. */ Entry get(Settings * settings = nullptr) override { - LoadBalancing load_balancing = default_load_balancing; - if (settings) - load_balancing = settings->load_balancing; - - for (size_t i = 0; i < nested_pools.size(); ++i) - { - if (load_balancing == LoadBalancing::NEAREST_HOSTNAME) - nested_pools[i].priority = hostname_differences[i]; - else if (load_balancing == LoadBalancing::RANDOM) - nested_pools[i].priority = 0; - else - throw Exception("Unknown load_balancing_mode: " + toString(static_cast(load_balancing)), ErrorCodes::LOGICAL_ERROR); - } - + applyLoadBalancing(settings); return Base::get(settings); } + /** Выделяет до указанного количества соединений. */ + std::vector getMany(unsigned max_connections, Settings * settings = nullptr) override + { + applyLoadBalancing(settings); + return Base::getMany(max_connections, settings); + } + protected: bool tryGet(ConnectionPoolPtr pool, Settings * settings, Entry & out_entry, std::stringstream & fail_message) override { @@ -92,6 +86,23 @@ protected: private: std::vector hostname_differences; /// Расстояния от имени этого хоста до имен хостов пулов. LoadBalancing default_load_balancing; + + void applyLoadBalancing(Settings * settings) + { + LoadBalancing load_balancing = default_load_balancing; + if (settings) + load_balancing = settings->load_balancing; + + for (size_t i = 0; i < nested_pools.size(); ++i) + { + if (load_balancing == LoadBalancing::NEAREST_HOSTNAME) + nested_pools[i].priority = hostname_differences[i]; + else if (load_balancing == LoadBalancing::RANDOM) + nested_pools[i].priority = 0; + else + throw Exception("Unknown load_balancing_mode: " + toString(static_cast(load_balancing)), ErrorCodes::LOGICAL_ERROR); + } + } }; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 40a644c4e9e..317f77f6832 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -87,6 +87,11 @@ struct Settings M(SettingBool, compile, false) \ /** Количество одинаковых по структуре запросов перед тем, как инициируется их компиляция. */ \ M(SettingUInt64, min_count_to_compile, 0) \ + \ + /** Максимальное количество используемых реплик каждого шарда при выполнении запроса */ \ + M(SettingUInt64, max_parallel_replicas, 1) \ + M(SettingUInt64, parallel_replicas_count, 0) \ + M(SettingUInt64, parallel_replica_offset, 0) \ /// Всевозможные ограничения на выполнение запроса. Limits limits;