diff --git a/docs/en/sql-reference/dictionaries/index.md b/docs/en/sql-reference/dictionaries/index.md index 9f6c9868ae6..60f00840042 100644 --- a/docs/en/sql-reference/dictionaries/index.md +++ b/docs/en/sql-reference/dictionaries/index.md @@ -1286,6 +1286,7 @@ Setting fields: - `table` – Name of the table and schema if exists. - `connection_string` – Connection string. - `invalidate_query` – Query for checking the dictionary status. Optional parameter. Read more in the section [Refreshing dictionary data using LIFETIME](#refreshing-dictionary-data-using-lifetime). +- `background_reconnect` – Reconnect to replica in background if connection fails. Optional parameter. - `query` – The custom query. Optional parameter. :::note @@ -1877,6 +1878,7 @@ Setting fields: - `table` – Name of the table. - `where` – The selection criteria. The syntax for conditions is the same as for `WHERE` clause in PostgreSQL. For example, `id > 10 AND id < 20`. Optional parameter. - `invalidate_query` – Query for checking the dictionary status. Optional parameter. Read more in the section [Refreshing dictionary data using LIFETIME](#refreshing-dictionary-data-using-lifetime). +- `background_reconnect` – Reconnect to replica in background if connection fails. Optional parameter. - `query` – The custom query. Optional parameter. :::note diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index ad456791b0a..b142c1407a8 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -91,6 +91,7 @@ #include #include #include +#include #include "MetricsTransmitter.h" #include #include @@ -2242,6 +2243,8 @@ try if (dns_cache_updater) dns_cache_updater->start(); + auto replicas_reconnector = ReplicasReconnector::init(global_context); + /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database].toString(); diff --git a/src/Common/ReplicasReconnector.h b/src/Common/ReplicasReconnector.h new file mode 100644 index 00000000000..1f15c50de50 --- /dev/null +++ b/src/Common/ReplicasReconnector.h @@ -0,0 +1,116 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_INITIALIZED; + extern const int LOGICAL_ERROR; +} + +namespace ServerSetting +{ + extern const ServerSettingsUInt64 bg_reconnect_mysql_dict_interval; +} + +class ReplicasReconnector : private boost::noncopyable +{ +public: + using Reconnector = std::function; + using ReconnectorsList = std::list; + + ReplicasReconnector(const ReplicasReconnector &) = delete; + + ~ReplicasReconnector() + { + emergency_stop = true; + task_handle->deactivate(); + instance_ptr = nullptr; + } + + [[nodiscard]] + static std::unique_ptr init(ContextPtr context) + { + if (instance_ptr) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas reconnector is already initialized."); + + std::unique_ptr ret(new ReplicasReconnector(context)); + instance_ptr = ret.get(); + return ret; + } + + static ReplicasReconnector & instance() + { + if (!instance_ptr) + throw Exception(ErrorCodes::NOT_INITIALIZED, "Replicas reconnector is not initialized."); + + return *instance_ptr; + } + + void add(const Reconnector & reconnector) + { + std::lock_guard lock(mutex); + reconnectors.push_back(reconnector); + task_handle->activateAndSchedule(); + } + +private: + inline static ReplicasReconnector * instance_ptr = nullptr; + ReconnectorsList reconnectors; + std::mutex mutex; + std::atomic_bool emergency_stop{false}; + BackgroundSchedulePoolTaskHolder task_handle; + LoggerPtr log = nullptr; + + explicit ReplicasReconnector(ContextPtr context) + : task_handle(context->getSchedulePool().createTask("ReplicasReconnector", [this]{ run(); })) + , log(getLogger("ReplicasReconnector")) + { + } + + void run() + { + auto interval_milliseconds = Context::getGlobalContextInstance()->getServerSettings()[ServerSetting::bg_reconnect_mysql_dict_interval]; + std::unique_lock lock(mutex); + + for (auto it = reconnectors.cbegin(); !emergency_stop && it != reconnectors.end();) + { + bool res = true; + lock.unlock(); + + try + { + res = (*it)(interval_milliseconds); + } + catch (...) + { + LOG_WARNING(log, "Failed reconnection routine."); + } + + lock.lock(); + + if (res) + ++it; + else + it = reconnectors.erase(it); + } + + if (!reconnectors.empty()) + task_handle->scheduleAfter(Context::getGlobalContextInstance()->getServerSettings()[ServerSetting::bg_reconnect_mysql_dict_interval]); + } +}; + +} diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index b52e4a60571..9091f0d09fd 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -58,7 +58,7 @@ public: explicit ThreadFromThreadPool(ThreadPoolImpl& parent_pool); // Shift the thread state from Preparing to Running to allow the worker to start. - void start(ThreadList::iterator& it); + void start(typename ThreadList::iterator& it); void join(); @@ -195,7 +195,7 @@ private: const bool shutdown_on_exception = true; boost::heap::priority_queue> jobs; - ThreadFromThreadPool::ThreadList threads; + typename ThreadFromThreadPool::ThreadList threads; std::exception_ptr first_exception; std::stack on_destroy_callbacks; diff --git a/src/Common/mysqlxx/Pool.cpp b/src/Common/mysqlxx/Pool.cpp index d61f8e751d0..1911181d9d6 100644 --- a/src/Common/mysqlxx/Pool.cpp +++ b/src/Common/mysqlxx/Pool.cpp @@ -179,7 +179,7 @@ Pool::Entry Pool::get(uint64_t wait_timeout) initialize(); for (;;) { - LOG_TRACE(log, "{}: Iterating through existing MySQL connections", getDescription()); + LOG_TRACE(log, "{}: Iterating through existing MySQL connections", getDescriptionImpl()); for (auto & connection : connections) { @@ -190,18 +190,18 @@ Pool::Entry Pool::get(uint64_t wait_timeout) } } - LOG_TRACE(log, "{}: Trying to allocate a new connection.", getDescription()); + LOG_TRACE(log, "{}: Trying to allocate a new connection.", getDescriptionImpl()); if (connections.size() < static_cast(max_connections)) { Connection * conn = allocConnection(); if (conn) return Entry(conn, this); - LOG_TRACE(log, "{}: Unable to create a new connection: Allocation failed.", getDescription()); + LOG_TRACE(log, "{}: Unable to create a new connection: Allocation failed.", getDescriptionImpl()); } else { - LOG_TRACE(log, "{}: Unable to create a new connection: Max number of connections has been reached.", getDescription()); + LOG_TRACE(log, "{}: Unable to create a new connection: Max number of connections has been reached.", getDescriptionImpl()); } if (!wait_timeout) @@ -211,7 +211,7 @@ Pool::Entry Pool::get(uint64_t wait_timeout) throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)"); lock.unlock(); - LOG_TRACE(log, "{}: Sleeping for {} seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); + LOG_TRACE(log, "{}: Sleeping for {} seconds.", getDescriptionImpl(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); lock.lock(); } @@ -236,7 +236,7 @@ Pool::Entry Pool::tryGet() return res; } - LOG_DEBUG(log, "{}: Idle connection to MySQL server cannot be recovered, dropping it.", getDescription()); + LOG_DEBUG(log, "{}: Idle connection to MySQL server cannot be recovered, dropping it.", getDescriptionImpl()); /// This one is disconnected, cannot be reestablished and so needs to be disposed of. connection_it = connections.erase(connection_it); @@ -259,7 +259,7 @@ Pool::Entry Pool::tryGet() void Pool::removeConnection(Connection* connection) { - LOG_TRACE(log, "{}: Removing connection.", getDescription()); + LOG_TRACE(log, "{}: Removing connection.", getDescriptionImpl()); std::lock_guard lock(mutex); if (connection) @@ -381,18 +381,22 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time) { LOG_ERROR(log, "Failed to connect to MySQL ({}): {}", description, e.what()); - if ((!was_successful && !dont_throw_if_failed_first_time) + if (!online + || (!was_successful && !dont_throw_if_failed_first_time) || e.errnum() == ER_ACCESS_DENIED_ERROR || e.errnum() == ER_DBACCESS_DENIED_ERROR || e.errnum() == ER_BAD_DB_ERROR) { + online = false; throw; } + online = false; return nullptr; } connections.push_back(conn_ptr.get()); + online = true; was_successful = true; return conn_ptr.release(); } diff --git a/src/Common/mysqlxx/PoolWithFailover.cpp b/src/Common/mysqlxx/PoolWithFailover.cpp index 8c7dae739b7..ddc9eb607e3 100644 --- a/src/Common/mysqlxx/PoolWithFailover.cpp +++ b/src/Common/mysqlxx/PoolWithFailover.cpp @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -15,6 +17,38 @@ namespace DB::ErrorCodes using namespace mysqlxx; +auto connectionReistablisher(std::weak_ptr pool) +{ + return [weak_pool = pool](UInt64 interval_milliseconds) + { + auto shared_pool = weak_pool.lock(); + if (!shared_pool) + return false; + + if (!shared_pool->isOnline()) + { + try + { + shared_pool->get(); + Poco::Util::Application::instance().logger().information("Reistablishing connection to " + shared_pool->getDescription() + " has succeeded."); + } + catch (const Poco::Exception & e) + { + if (interval_milliseconds >= 1000) + Poco::Util::Application::instance().logger().warning("Reistablishing connection to " + shared_pool->getDescription() + " has failed: " + e.displayText()); + } + catch (...) + { + if (interval_milliseconds >= 1000) + Poco::Util::Application::instance().logger().warning("Reistablishing connection to " + shared_pool->getDescription() + " has failed."); + } + } + + return true; + }; +} + + PoolWithFailover::PoolWithFailover( const Poco::Util::AbstractConfiguration & config_, const std::string & config_name_, @@ -24,6 +58,7 @@ PoolWithFailover::PoolWithFailover( : max_tries(max_tries_) , shareable(config_.getBool(config_name_ + ".share_connection", false)) , wait_timeout(UINT64_MAX) + , bg_reconnect(config_.getBool(config_name_ + ".background_reconnect", false)) { if (config_.has(config_name_ + ".replica")) { @@ -40,6 +75,9 @@ PoolWithFailover::PoolWithFailover( replicas_by_priority[priority].emplace_back( std::make_shared(config_, replica_name, default_connections_, max_connections_, config_name_.c_str())); + + if (bg_reconnect) + DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_by_priority[priority].back()))); } } @@ -57,7 +95,10 @@ PoolWithFailover::PoolWithFailover( { replicas_by_priority[0].emplace_back( std::make_shared(config_, config_name_, default_connections_, max_connections_)); + if (bg_reconnect) + DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_by_priority[0].back()))); } + } @@ -82,10 +123,12 @@ PoolWithFailover::PoolWithFailover( size_t max_tries_, uint64_t wait_timeout_, size_t connect_timeout_, - size_t rw_timeout_) + size_t rw_timeout_, + bool bg_reconnect_) : max_tries(max_tries_) , shareable(false) , wait_timeout(wait_timeout_) + , bg_reconnect(bg_reconnect_) { /// Replicas have the same priority, but traversed replicas are moved to the end of the queue. for (const auto & [host, port] : addresses) @@ -97,7 +140,10 @@ PoolWithFailover::PoolWithFailover( rw_timeout_, default_connections_, max_connections_)); + if (bg_reconnect) + DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_by_priority[0].back()))); } + } @@ -105,6 +151,7 @@ PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) : max_tries{other.max_tries} , shareable{other.shareable} , wait_timeout(other.wait_timeout) + , bg_reconnect(other.bg_reconnect) { if (shareable) { @@ -117,9 +164,14 @@ PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) Replicas replicas; replicas.reserve(priority_replicas.second.size()); for (const auto & pool : priority_replicas.second) + { replicas.emplace_back(std::make_shared(*pool)); + if (bg_reconnect) + DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas.back()))); + } replicas_by_priority.emplace(priority_replicas.first, std::move(replicas)); } + } } @@ -150,6 +202,9 @@ PoolWithFailover::Entry PoolWithFailover::get() { PoolPtr & pool = replicas[i]; + if (bg_reconnect && !pool->isOnline()) + continue; + try { Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet(); diff --git a/src/Common/mysqlxx/mysqlxx/Pool.h b/src/Common/mysqlxx/mysqlxx/Pool.h index f9bb06ea444..2df5e55388d 100644 --- a/src/Common/mysqlxx/mysqlxx/Pool.h +++ b/src/Common/mysqlxx/mysqlxx/Pool.h @@ -188,11 +188,17 @@ public: /// Get description of database. std::string getDescription() const { - return description; + std::lock_guard lock(mutex); + return getDescriptionImpl(); } void removeConnection(Connection * connection); + bool isOnline() + { + return online; + } + protected: LoggerPtr log = getLogger("mysqlxx::Pool"); @@ -209,7 +215,7 @@ private: /// List of connections. Connections connections; /// Lock for connections list access - std::mutex mutex; + mutable std::mutex mutex; /// Description of connection. std::string description; @@ -234,8 +240,16 @@ private: /// Initialises class if it wasn't. void initialize(); + /// Pool is online. + std::atomic online{true}; + /** Create new connection. */ Connection * allocConnection(bool dont_throw_if_failed_first_time = false); + + std::string getDescriptionImpl() const + { + return description; + } }; } diff --git a/src/Common/mysqlxx/mysqlxx/PoolWithFailover.h b/src/Common/mysqlxx/mysqlxx/PoolWithFailover.h index db3c499846c..24bf1b28e66 100644 --- a/src/Common/mysqlxx/mysqlxx/PoolWithFailover.h +++ b/src/Common/mysqlxx/mysqlxx/PoolWithFailover.h @@ -85,6 +85,8 @@ namespace mysqlxx bool shareable; /// Timeout for waiting free connection. uint64_t wait_timeout = 0; + /// Attempt to reconnect in background thread + bool bg_reconnect = false; public: using Entry = Pool::Entry; @@ -126,7 +128,8 @@ namespace mysqlxx size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, uint64_t wait_timeout_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_CONNECTION_WAIT_TIMEOUT, size_t connect_timeout = MYSQLXX_DEFAULT_TIMEOUT, - size_t rw_timeout = MYSQLXX_DEFAULT_RW_TIMEOUT); + size_t rw_timeout = MYSQLXX_DEFAULT_RW_TIMEOUT, + bool bg_reconnect_ = false); PoolWithFailover(const PoolWithFailover & other); diff --git a/src/Core/PostgreSQL/PoolWithFailover.cpp b/src/Core/PostgreSQL/PoolWithFailover.cpp index 054fc3b2226..29f6a2f4d3d 100644 --- a/src/Core/PostgreSQL/PoolWithFailover.cpp +++ b/src/Core/PostgreSQL/PoolWithFailover.cpp @@ -1,8 +1,10 @@ #include "PoolWithFailover.h" +#include #if USE_LIBPQXX #include "Utils.h" +#include #include #include #include @@ -22,16 +24,76 @@ namespace ErrorCodes namespace postgres { +auto PoolWithFailover::connectionReistablisher(std::weak_ptr pool, size_t pool_wait_timeout) +{ + return [weak_pool = pool, pool_wait_timeout](UInt64 interval_milliseconds) + { + auto shared_pool = weak_pool.lock(); + if (!shared_pool) + return false; + + if (!shared_pool->online) + { + auto logger = getLogger("PostgreSQLConnectionPool"); + + ConnectionPtr connection; + auto connection_available = shared_pool->pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout); + + if (!connection_available) + { + LOG_WARNING(logger, "Reistablishing connection to {} has failed: unable to fetch connection within the timeout.", connection->getInfoForLog()); + return true; + } + + try + { + /// Create a new connection or reopen an old connection if it became invalid. + if (!connection) + connection = std::make_unique(shared_pool->connection_info); + + connection->connect(); + shared_pool->online = true; + LOG_DEBUG(logger, "Reistablishing connection to {} has succeeded.", connection->getInfoForLog()); + } + catch (const pqxx::broken_connection & pqxx_error) + { + if (interval_milliseconds >= 1000) + LOG_ERROR(logger, "Reistablishing connection to {} has failed: {}", connection->getInfoForLog(), pqxx_error.what()); + shared_pool->online = false; + shared_pool->pool->returnObject(std::move(connection)); + } + catch (const Poco::Exception & e) + { + if (interval_milliseconds >= 1000) + LOG_ERROR(logger, "Reistablishing connection to {} has failed: {}", connection->getInfoForLog(), e.displayText()); + shared_pool->online = false; + shared_pool->pool->returnObject(std::move(connection)); + } + catch (...) + { + if (interval_milliseconds >= 1000) + LOG_ERROR(logger, "Reistablishing connection to {} has failed.", connection->getInfoForLog()); + shared_pool->online = false; + shared_pool->pool->returnObject(std::move(connection)); + } + } + + return true; + }; +} + PoolWithFailover::PoolWithFailover( const ReplicasConfigurationByPriority & configurations_by_priority, size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_, bool auto_close_connection_, - size_t connection_attempt_timeout_) + size_t connection_attempt_timeout_, + bool bg_reconnect_) : pool_wait_timeout(pool_wait_timeout_) , max_tries(max_tries_) , auto_close_connection(auto_close_connection_) + , bg_reconnect(bg_reconnect_) { LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}", pool_size, pool_wait_timeout, max_tries_); @@ -47,7 +109,9 @@ PoolWithFailover::PoolWithFailover( replica_configuration.username, replica_configuration.password, connection_attempt_timeout_); - replicas_with_priority[priority].emplace_back(connection_info, pool_size); + replicas_with_priority[priority].emplace_back(std::make_shared(connection_info, pool_size)); + if (bg_reconnect) + DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_with_priority[priority].back()), pool_wait_timeout)); } } } @@ -58,10 +122,12 @@ PoolWithFailover::PoolWithFailover( size_t pool_wait_timeout_, size_t max_tries_, bool auto_close_connection_, - size_t connection_attempt_timeout_) + size_t connection_attempt_timeout_, + bool bg_reconnect_) : pool_wait_timeout(pool_wait_timeout_) , max_tries(max_tries_) , auto_close_connection(auto_close_connection_) + , bg_reconnect(bg_reconnect_) { LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}", pool_size, pool_wait_timeout, max_tries_); @@ -77,7 +143,9 @@ PoolWithFailover::PoolWithFailover( configuration.username, configuration.password, connection_attempt_timeout_); - replicas_with_priority[0].emplace_back(connection_string, pool_size); + replicas_with_priority[0].emplace_back(std::make_shared(connection_string, pool_size)); + if (bg_reconnect) + DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_with_priority[0].back()), pool_wait_timeout)); } } @@ -98,8 +166,11 @@ ConnectionHolderPtr PoolWithFailover::get() { auto & replica = replicas[i]; + if (bg_reconnect && !replica->online) + continue; + ConnectionPtr connection; - auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout); + auto connection_available = replica->pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout); if (!connection_available) { @@ -112,29 +183,32 @@ ConnectionHolderPtr PoolWithFailover::get() /// Create a new connection or reopen an old connection if it became invalid. if (!connection) { - connection = std::make_unique(replica.connection_info); + connection = std::make_unique(replica->connection_info); LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog()); } connection->connect(); + replica->online = true; } catch (const pqxx::broken_connection & pqxx_error) { LOG_ERROR(log, "Connection error: {}", pqxx_error.what()); error_message = PreformattedMessage::create( "Try {}. Connection to {} failed with error: {}\n", - try_idx + 1, DB::backQuote(replica.connection_info.host_port), pqxx_error.what()); + try_idx + 1, DB::backQuote(replica->connection_info.host_port), pqxx_error.what()); - replica.pool->returnObject(std::move(connection)); + replica->online = false; + replica->pool->returnObject(std::move(connection)); continue; } catch (...) { - replica.pool->returnObject(std::move(connection)); + replica->online = false; + replica->pool->returnObject(std::move(connection)); throw; } - auto connection_holder = std::make_unique(replica.pool, std::move(connection), auto_close_connection); + auto connection_holder = std::make_unique(replica->pool, std::move(connection), auto_close_connection); /// Move all traversed replicas to the end. if (replicas.size() > 1) diff --git a/src/Core/PostgreSQL/PoolWithFailover.h b/src/Core/PostgreSQL/PoolWithFailover.h index 2237c752367..6b37cb6353a 100644 --- a/src/Core/PostgreSQL/PoolWithFailover.h +++ b/src/Core/PostgreSQL/PoolWithFailover.h @@ -1,6 +1,7 @@ #pragma once #include "config.h" +#include #if USE_LIBPQXX @@ -29,7 +30,8 @@ public: size_t pool_wait_timeout, size_t max_tries_, bool auto_close_connection_, - size_t connection_attempt_timeout_); + size_t connection_attempt_timeout_, + bool bg_reconnect_ = false); explicit PoolWithFailover( const DB::StoragePostgreSQL::Configuration & configuration, @@ -37,7 +39,8 @@ public: size_t pool_wait_timeout, size_t max_tries_, bool auto_close_connection_, - size_t connection_attempt_timeout_); + size_t connection_attempt_timeout_, + bool bg_reconnect_ = false); PoolWithFailover(const PoolWithFailover & other) = delete; @@ -48,19 +51,25 @@ private: { ConnectionInfo connection_info; PoolPtr pool; + /// Pool is online. + std::atomic online{true}; PoolHolder(const ConnectionInfo & connection_info_, size_t pool_size) : connection_info(connection_info_), pool(std::make_shared(pool_size)) {} }; /// Highest priority is 0, the bigger the number in map, the less the priority - using Replicas = std::vector; + using PoolHolderPtr = std::shared_ptr; + using Replicas = std::vector; using ReplicasWithPriority = std::map; + static auto connectionReistablisher(std::weak_ptr pool, size_t pool_wait_timeout); + ReplicasWithPriority replicas_with_priority; size_t pool_wait_timeout; size_t max_tries; bool auto_close_connection; + bool bg_reconnect; std::mutex mutex; LoggerPtr log = getLogger("PostgreSQLConnectionPool"); }; diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index bda109ec3c3..d05e13fee15 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -30,6 +30,7 @@ namespace DB // clang-format off #define LIST_OF_SERVER_SETTINGS(DECLARE, ALIAS) \ + DECLARE(UInt64, bg_reconnect_mysql_dict_interval, 1000, "Interval in milliseconds for reconnection attempts of failed MySQL and Postgres dictionaries having `background_reconnect` enabled.", 0) \ DECLARE(Bool, show_addresses_in_stack_traces, true, "If it is set true will show addresses in stack traces", 0) \ DECLARE(Bool, shutdown_wait_unfinished_queries, false, "If set true ClickHouse will wait for running queries finish before shutdown.", 0) \ DECLARE(UInt64, shutdown_wait_unfinished, 5, "Delay in seconds to wait for unfinished queries", 0) \ diff --git a/src/Dictionaries/MySQLDictionarySource.cpp b/src/Dictionaries/MySQLDictionarySource.cpp index e79b73e6587..4fc0bbcbd1a 100644 --- a/src/Dictionaries/MySQLDictionarySource.cpp +++ b/src/Dictionaries/MySQLDictionarySource.cpp @@ -119,6 +119,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory) .update_field = named_collection->getOrDefault("update_field", ""), .update_lag = named_collection->getOrDefault("update_lag", 1), .dont_check_update_time = named_collection->getOrDefault("dont_check_update_time", false), + .bg_reconnect = named_collection->getOrDefault("background_reconnect", false), }); const auto & settings = global_context->getSettingsRef(); @@ -147,7 +148,8 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory) .invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""), .update_field = config.getString(settings_config_prefix + ".update_field", ""), .update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1), - .dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false) + .dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false), + .bg_reconnect = config.getBool(settings_config_prefix + ".background_reconnect", false), }); pool = std::make_shared( diff --git a/src/Dictionaries/MySQLDictionarySource.h b/src/Dictionaries/MySQLDictionarySource.h index d9eea3f3e26..6b6de9a6578 100644 --- a/src/Dictionaries/MySQLDictionarySource.h +++ b/src/Dictionaries/MySQLDictionarySource.h @@ -39,6 +39,7 @@ public: const std::string update_field; const UInt64 update_lag; const bool dont_check_update_time; + const bool bg_reconnect; }; MySQLDictionarySource( diff --git a/src/Dictionaries/PostgreSQLDictionarySource.cpp b/src/Dictionaries/PostgreSQLDictionarySource.cpp index fbf199567b3..6735072bd82 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.cpp +++ b/src/Dictionaries/PostgreSQLDictionarySource.cpp @@ -37,7 +37,7 @@ namespace ErrorCodes } static const ValidateKeysMultiset dictionary_allowed_keys = { - "host", "port", "user", "password", "db", "database", "table", "schema", + "host", "port", "user", "password", "db", "database", "table", "schema", "background_reconnect", "update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"}; #if USE_LIBPQXX @@ -217,6 +217,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory) std::optional dictionary_configuration; postgres::PoolWithFailover::ReplicasConfigurationByPriority replicas_by_priority; + bool bg_reconnect = false; + auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, context) : nullptr; if (named_collection) { @@ -242,6 +244,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory) .update_lag = named_collection->getOrDefault("update_lag", 1), }); + bg_reconnect = named_collection->getOrDefault("background_reconnect", false); + replicas_by_priority[0].emplace_back(common_configuration); } else @@ -269,6 +273,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory) .update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1) }); + bg_reconnect = config.getBool(fmt::format("{}.background_reconnect", settings_config_prefix), false); if (config.has(settings_config_prefix + ".replica")) { @@ -319,7 +324,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory) settings[Setting::postgresql_connection_pool_wait_timeout], settings[Setting::postgresql_connection_pool_retries], settings[Setting::postgresql_connection_pool_auto_close_connection], - settings[Setting::postgresql_connection_attempt_timeout]); + settings[Setting::postgresql_connection_attempt_timeout], + bg_reconnect); return std::make_unique(dict_struct, dictionary_configuration.value(), pool, sample_block); diff --git a/tests/integration/test_dictionaries_mysql/configs/bg_reconnect.xml b/tests/integration/test_dictionaries_mysql/configs/bg_reconnect.xml new file mode 100644 index 00000000000..b2f32214b43 --- /dev/null +++ b/tests/integration/test_dictionaries_mysql/configs/bg_reconnect.xml @@ -0,0 +1,3 @@ + + 7000 + diff --git a/tests/integration/test_dictionaries_mysql/test.py b/tests/integration/test_dictionaries_mysql/test.py index 0dd62d819ec..bf391b63d82 100644 --- a/tests/integration/test_dictionaries_mysql/test.py +++ b/tests/integration/test_dictionaries_mysql/test.py @@ -7,9 +7,14 @@ import pymysql.cursors import pytest from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager DICTS = ["configs/dictionaries/mysql_dict1.xml", "configs/dictionaries/mysql_dict2.xml"] -CONFIG_FILES = ["configs/remote_servers.xml", "configs/named_collections.xml"] +CONFIG_FILES = [ + "configs/remote_servers.xml", + "configs/named_collections.xml", + "configs/bg_reconnect.xml", +] USER_CONFIGS = ["configs/users.xml"] cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( @@ -436,3 +441,78 @@ def execute_mysql_query(connection, query): def create_mysql_table(conn, table_name): with conn.cursor() as cursor: cursor.execute(create_table_mysql_template.format(table_name)) + + +def test_background_dictionary_reconnect(started_cluster): + mysql_connection = get_mysql_conn(started_cluster) + + execute_mysql_query(mysql_connection, "DROP TABLE IF EXISTS test.dict;") + execute_mysql_query( + mysql_connection, + "CREATE TABLE test.dict (id Integer, value Text);", + ) + execute_mysql_query( + mysql_connection, "INSERT INTO test.dict VALUES (1, 'Value_1');" + ) + + query = instance.query + query( + f""" + DROP DICTIONARY IF EXISTS dict; + CREATE DICTIONARY dict + ( + id UInt64, + value String + ) + PRIMARY KEY id + LAYOUT(DIRECT()) + SOURCE(MYSQL( + USER 'root' + PASSWORD 'clickhouse' + DB 'test' + QUERY $doc$SELECT * FROM test.dict;$doc$ + BACKGROUND_RECONNECT 'true' + REPLICA(HOST 'mysql80' PORT 3306 PRIORITY 1))) + """ + ) + + result = query("SELECT value FROM dict WHERE id = 1") + assert result == "Value_1\n" + + class MySQL_Instance: + pass + + mysql_instance = MySQL_Instance() + mysql_instance.ip_address = started_cluster.mysql8_ip + + with PartitionManager() as pm: + # Break connection to mysql server + pm.partition_instances( + instance, mysql_instance, action="REJECT --reject-with tcp-reset" + ) + + # Exhaust possible connection pool and initiate reconnection attempts + for _ in range(5): + try: + result = query("SELECT value FROM dict WHERE id = 1") + except Exception as e: + pass + + counter = 0 + # Based on bg_reconnect_mysql_dict_interval = 7000 in "configs/bg_reconnect.xml": + # connection should not be available for about 5-7 seconds + while counter <= 8: + try: + counter += 1 + time.sleep(1) + result = query("SELECT value FROM dict WHERE id = 1") + break + except Exception as e: + pass + + query("DROP DICTIONARY IF EXISTS dict;") + execute_mysql_query(mysql_connection, "DROP TABLE IF EXISTS test.dict;") + + assert ( + counter >= 4 and counter <= 8 + ), f"Connection reistablisher didn't meet anticipated time interval [4..8]: {counter}" diff --git a/tests/integration/test_dictionaries_postgresql/configs/bg_reconnect.xml b/tests/integration/test_dictionaries_postgresql/configs/bg_reconnect.xml new file mode 100644 index 00000000000..b2f32214b43 --- /dev/null +++ b/tests/integration/test_dictionaries_postgresql/configs/bg_reconnect.xml @@ -0,0 +1,3 @@ + + 7000 + diff --git a/tests/integration/test_dictionaries_postgresql/test.py b/tests/integration/test_dictionaries_postgresql/test.py index f8f07d04c62..6ab42cbc3df 100644 --- a/tests/integration/test_dictionaries_postgresql/test.py +++ b/tests/integration/test_dictionaries_postgresql/test.py @@ -7,6 +7,7 @@ import pytest from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager from helpers.postgres_utility import get_postgres_conn cluster = ClickHouseCluster(__file__) @@ -16,6 +17,7 @@ node1 = cluster.add_instance( "configs/config.xml", "configs/dictionaries/postgres_dict.xml", "configs/named_collections.xml", + "configs/bg_reconnect.xml", ], with_postgres=True, with_postgres_cluster=True, @@ -588,6 +590,86 @@ def test_named_collection_from_ddl(started_cluster): ) +def test_background_dictionary_reconnect(started_cluster): + postgres_conn = get_postgres_conn( + ip=started_cluster.postgres_ip, + database=True, + port=started_cluster.postgres_port, + ) + + postgres_conn.cursor().execute("DROP TABLE IF EXISTS dict") + postgres_conn.cursor().execute( + f""" + CREATE TABLE dict ( + id integer NOT NULL, value text NOT NULL, PRIMARY KEY (id)) + """ + ) + + postgres_conn.cursor().execute("INSERT INTO dict VALUES (1, 'Value_1')") + + query = node1.query + query( + f""" + DROP DICTIONARY IF EXISTS dict; + CREATE DICTIONARY dict + ( + id UInt64, + value String + ) + PRIMARY KEY id + LAYOUT(DIRECT()) + SOURCE(POSTGRESQL( + USER 'postgres' + PASSWORD 'mysecretpassword' + DB 'postgres_database' + QUERY $doc$SELECT * FROM dict;$doc$ + BACKGROUND_RECONNECT 'true' + REPLICA(HOST '{started_cluster.postgres_ip}' PORT {started_cluster.postgres_port} PRIORITY 1))) + """ + ) + + result = query("SELECT value FROM dict WHERE id = 1") + assert result == "Value_1\n" + + class PostgreSQL_Instance: + pass + + postgres_instance = PostgreSQL_Instance() + postgres_instance.ip_address = started_cluster.postgres_ip + + with PartitionManager() as pm: + # Break connection to mysql server + pm.partition_instances( + node1, postgres_instance, action="REJECT --reject-with tcp-reset" + ) + + # Exhaust possible connection pool and initiate reconnection attempts + for _ in range(5): + try: + result = query("SELECT value FROM dict WHERE id = 1") + except Exception as e: + pass + + counter = 0 + # Based on bg_reconnect_mysql_dict_interval = 7000 in "configs/bg_reconnect.xml": + # connection should not be available for about 5-7 seconds + while counter <= 8: + try: + counter += 1 + time.sleep(1) + result = query("SELECT value FROM dict WHERE id = 1") + break + except Exception as e: + pass + + query("DROP DICTIONARY IF EXISTS dict;") + postgres_conn.cursor().execute("DROP TABLE IF EXISTS dict") + + assert ( + counter >= 4 and counter <= 8 + ), f"Connection reistablisher didn't meet anticipated time interval [4..8]: {counter}" + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...")