Merge pull request #71101 from ClickHouse/imp-mysql-bg-reconnect

Add connection reestablisher to mysql and postgres dictionary pools
This commit is contained in:
Nikita Mikhaylov 2024-12-16 11:30:25 +00:00 committed by GitHub
commit 0a42c6df2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 489 additions and 31 deletions

View File

@ -1286,6 +1286,7 @@ Setting fields:
- `table` Name of the table and schema if exists.
- `connection_string` Connection string.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Refreshing dictionary data using LIFETIME](#refreshing-dictionary-data-using-lifetime).
- `background_reconnect` Reconnect to replica in background if connection fails. Optional parameter.
- `query` The custom query. Optional parameter.
:::note
@ -1877,6 +1878,7 @@ Setting fields:
- `table` Name of the table.
- `where` The selection criteria. The syntax for conditions is the same as for `WHERE` clause in PostgreSQL. For example, `id > 10 AND id < 20`. Optional parameter.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Refreshing dictionary data using LIFETIME](#refreshing-dictionary-data-using-lifetime).
- `background_reconnect` Reconnect to replica in background if connection fails. Optional parameter.
- `query` The custom query. Optional parameter.
:::note

View File

@ -91,6 +91,7 @@
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Common/Config/ConfigReloader.h>
#include <Server/HTTPHandlerFactory.h>
#include <Common/ReplicasReconnector.h>
#include "MetricsTransmitter.h"
#include <Common/StatusFile.h>
#include <Server/TCPHandlerFactory.h>
@ -2242,6 +2243,8 @@ try
if (dns_cache_updater)
dns_cache_updater->start();
auto replicas_reconnector = ReplicasReconnector::init(global_context);
/// Set current database name before loading tables and databases because
/// system logs may copy global context.
std::string default_database = server_settings[ServerSetting::default_database].toString();

View File

@ -0,0 +1,116 @@
#pragma once
#include <Core/ServerSettings.h>
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <boost/noncopyable.hpp>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <utility>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_INITIALIZED;
extern const int LOGICAL_ERROR;
}
namespace ServerSetting
{
extern const ServerSettingsUInt64 bg_reconnect_mysql_dict_interval;
}
class ReplicasReconnector : private boost::noncopyable
{
public:
using Reconnector = std::function<bool(UInt64)>;
using ReconnectorsList = std::list<Reconnector>;
ReplicasReconnector(const ReplicasReconnector &) = delete;
~ReplicasReconnector()
{
emergency_stop = true;
task_handle->deactivate();
instance_ptr = nullptr;
}
[[nodiscard]]
static std::unique_ptr<ReplicasReconnector> init(ContextPtr context)
{
if (instance_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replicas reconnector is already initialized.");
std::unique_ptr<ReplicasReconnector> ret(new ReplicasReconnector(context));
instance_ptr = ret.get();
return ret;
}
static ReplicasReconnector & instance()
{
if (!instance_ptr)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Replicas reconnector is not initialized.");
return *instance_ptr;
}
void add(const Reconnector & reconnector)
{
std::lock_guard lock(mutex);
reconnectors.push_back(reconnector);
task_handle->activateAndSchedule();
}
private:
inline static ReplicasReconnector * instance_ptr = nullptr;
ReconnectorsList reconnectors;
std::mutex mutex;
std::atomic_bool emergency_stop{false};
BackgroundSchedulePoolTaskHolder task_handle;
LoggerPtr log = nullptr;
explicit ReplicasReconnector(ContextPtr context)
: task_handle(context->getSchedulePool().createTask("ReplicasReconnector", [this]{ run(); }))
, log(getLogger("ReplicasReconnector"))
{
}
void run()
{
auto interval_milliseconds = Context::getGlobalContextInstance()->getServerSettings()[ServerSetting::bg_reconnect_mysql_dict_interval];
std::unique_lock lock(mutex);
for (auto it = reconnectors.cbegin(); !emergency_stop && it != reconnectors.end();)
{
bool res = true;
lock.unlock();
try
{
res = (*it)(interval_milliseconds);
}
catch (...)
{
LOG_WARNING(log, "Failed reconnection routine.");
}
lock.lock();
if (res)
++it;
else
it = reconnectors.erase(it);
}
if (!reconnectors.empty())
task_handle->scheduleAfter(Context::getGlobalContextInstance()->getServerSettings()[ServerSetting::bg_reconnect_mysql_dict_interval]);
}
};
}

View File

@ -58,7 +58,7 @@ public:
explicit ThreadFromThreadPool(ThreadPoolImpl& parent_pool);
// Shift the thread state from Preparing to Running to allow the worker to start.
void start(ThreadList::iterator& it);
void start(typename ThreadList::iterator& it);
void join();
@ -195,7 +195,7 @@ private:
const bool shutdown_on_exception = true;
boost::heap::priority_queue<JobWithPriority,boost::heap::stable<true>> jobs;
ThreadFromThreadPool::ThreadList threads;
typename ThreadFromThreadPool::ThreadList threads;
std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;

View File

@ -179,7 +179,7 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
initialize();
for (;;)
{
LOG_TRACE(log, "{}: Iterating through existing MySQL connections", getDescription());
LOG_TRACE(log, "{}: Iterating through existing MySQL connections", getDescriptionImpl());
for (auto & connection : connections)
{
@ -190,18 +190,18 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
}
}
LOG_TRACE(log, "{}: Trying to allocate a new connection.", getDescription());
LOG_TRACE(log, "{}: Trying to allocate a new connection.", getDescriptionImpl());
if (connections.size() < static_cast<size_t>(max_connections))
{
Connection * conn = allocConnection();
if (conn)
return Entry(conn, this);
LOG_TRACE(log, "{}: Unable to create a new connection: Allocation failed.", getDescription());
LOG_TRACE(log, "{}: Unable to create a new connection: Allocation failed.", getDescriptionImpl());
}
else
{
LOG_TRACE(log, "{}: Unable to create a new connection: Max number of connections has been reached.", getDescription());
LOG_TRACE(log, "{}: Unable to create a new connection: Max number of connections has been reached.", getDescriptionImpl());
}
if (!wait_timeout)
@ -211,7 +211,7 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
lock.unlock();
LOG_TRACE(log, "{}: Sleeping for {} seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
LOG_TRACE(log, "{}: Sleeping for {} seconds.", getDescriptionImpl(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
@ -236,7 +236,7 @@ Pool::Entry Pool::tryGet()
return res;
}
LOG_DEBUG(log, "{}: Idle connection to MySQL server cannot be recovered, dropping it.", getDescription());
LOG_DEBUG(log, "{}: Idle connection to MySQL server cannot be recovered, dropping it.", getDescriptionImpl());
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
connection_it = connections.erase(connection_it);
@ -259,7 +259,7 @@ Pool::Entry Pool::tryGet()
void Pool::removeConnection(Connection* connection)
{
LOG_TRACE(log, "{}: Removing connection.", getDescription());
LOG_TRACE(log, "{}: Removing connection.", getDescriptionImpl());
std::lock_guard lock(mutex);
if (connection)
@ -381,18 +381,22 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
{
LOG_ERROR(log, "Failed to connect to MySQL ({}): {}", description, e.what());
if ((!was_successful && !dont_throw_if_failed_first_time)
if (!online
|| (!was_successful && !dont_throw_if_failed_first_time)
|| e.errnum() == ER_ACCESS_DENIED_ERROR
|| e.errnum() == ER_DBACCESS_DENIED_ERROR
|| e.errnum() == ER_BAD_DB_ERROR)
{
online = false;
throw;
}
online = false;
return nullptr;
}
connections.push_back(conn_ptr.get());
online = true;
was_successful = true;
return conn_ptr.release();
}

View File

@ -1,3 +1,5 @@
#include <Common/Exception.h>
#include <Common/ReplicasReconnector.h>
#include <algorithm>
#include <ctime>
#include <random>
@ -15,6 +17,38 @@ namespace DB::ErrorCodes
using namespace mysqlxx;
auto connectionReistablisher(std::weak_ptr<Pool> pool)
{
return [weak_pool = pool](UInt64 interval_milliseconds)
{
auto shared_pool = weak_pool.lock();
if (!shared_pool)
return false;
if (!shared_pool->isOnline())
{
try
{
shared_pool->get();
Poco::Util::Application::instance().logger().information("Reistablishing connection to " + shared_pool->getDescription() + " has succeeded.");
}
catch (const Poco::Exception & e)
{
if (interval_milliseconds >= 1000)
Poco::Util::Application::instance().logger().warning("Reistablishing connection to " + shared_pool->getDescription() + " has failed: " + e.displayText());
}
catch (...)
{
if (interval_milliseconds >= 1000)
Poco::Util::Application::instance().logger().warning("Reistablishing connection to " + shared_pool->getDescription() + " has failed.");
}
}
return true;
};
}
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_,
@ -24,6 +58,7 @@ PoolWithFailover::PoolWithFailover(
: max_tries(max_tries_)
, shareable(config_.getBool(config_name_ + ".share_connection", false))
, wait_timeout(UINT64_MAX)
, bg_reconnect(config_.getBool(config_name_ + ".background_reconnect", false))
{
if (config_.has(config_name_ + ".replica"))
{
@ -40,6 +75,9 @@ PoolWithFailover::PoolWithFailover(
replicas_by_priority[priority].emplace_back(
std::make_shared<Pool>(config_, replica_name, default_connections_, max_connections_, config_name_.c_str()));
if (bg_reconnect)
DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_by_priority[priority].back())));
}
}
@ -57,7 +95,10 @@ PoolWithFailover::PoolWithFailover(
{
replicas_by_priority[0].emplace_back(
std::make_shared<Pool>(config_, config_name_, default_connections_, max_connections_));
if (bg_reconnect)
DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_by_priority[0].back())));
}
}
@ -82,10 +123,12 @@ PoolWithFailover::PoolWithFailover(
size_t max_tries_,
uint64_t wait_timeout_,
size_t connect_timeout_,
size_t rw_timeout_)
size_t rw_timeout_,
bool bg_reconnect_)
: max_tries(max_tries_)
, shareable(false)
, wait_timeout(wait_timeout_)
, bg_reconnect(bg_reconnect_)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
@ -97,7 +140,10 @@ PoolWithFailover::PoolWithFailover(
rw_timeout_,
default_connections_,
max_connections_));
if (bg_reconnect)
DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_by_priority[0].back())));
}
}
@ -105,6 +151,7 @@ PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}
, shareable{other.shareable}
, wait_timeout(other.wait_timeout)
, bg_reconnect(other.bg_reconnect)
{
if (shareable)
{
@ -117,9 +164,14 @@ PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
Replicas replicas;
replicas.reserve(priority_replicas.second.size());
for (const auto & pool : priority_replicas.second)
{
replicas.emplace_back(std::make_shared<Pool>(*pool));
if (bg_reconnect)
DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas.back())));
}
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
}
}
}
@ -150,6 +202,9 @@ PoolWithFailover::Entry PoolWithFailover::get()
{
PoolPtr & pool = replicas[i];
if (bg_reconnect && !pool->isOnline())
continue;
try
{
Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet();

View File

@ -188,11 +188,17 @@ public:
/// Get description of database.
std::string getDescription() const
{
return description;
std::lock_guard lock(mutex);
return getDescriptionImpl();
}
void removeConnection(Connection * connection);
bool isOnline()
{
return online;
}
protected:
LoggerPtr log = getLogger("mysqlxx::Pool");
@ -209,7 +215,7 @@ private:
/// List of connections.
Connections connections;
/// Lock for connections list access
std::mutex mutex;
mutable std::mutex mutex;
/// Description of connection.
std::string description;
@ -234,8 +240,16 @@ private:
/// Initialises class if it wasn't.
void initialize();
/// Pool is online.
std::atomic<bool> online{true};
/** Create new connection. */
Connection * allocConnection(bool dont_throw_if_failed_first_time = false);
std::string getDescriptionImpl() const
{
return description;
}
};
}

View File

@ -85,6 +85,8 @@ namespace mysqlxx
bool shareable;
/// Timeout for waiting free connection.
uint64_t wait_timeout = 0;
/// Attempt to reconnect in background thread
bool bg_reconnect = false;
public:
using Entry = Pool::Entry;
@ -126,7 +128,8 @@ namespace mysqlxx
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
uint64_t wait_timeout_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_CONNECTION_WAIT_TIMEOUT,
size_t connect_timeout = MYSQLXX_DEFAULT_TIMEOUT,
size_t rw_timeout = MYSQLXX_DEFAULT_RW_TIMEOUT);
size_t rw_timeout = MYSQLXX_DEFAULT_RW_TIMEOUT,
bool bg_reconnect_ = false);
PoolWithFailover(const PoolWithFailover & other);

View File

@ -1,8 +1,10 @@
#include "PoolWithFailover.h"
#include <memory>
#if USE_LIBPQXX
#include "Utils.h"
#include <Common/ReplicasReconnector.h>
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <Common/quoteString.h>
@ -22,16 +24,76 @@ namespace ErrorCodes
namespace postgres
{
auto PoolWithFailover::connectionReistablisher(std::weak_ptr<PoolHolder> pool, size_t pool_wait_timeout)
{
return [weak_pool = pool, pool_wait_timeout](UInt64 interval_milliseconds)
{
auto shared_pool = weak_pool.lock();
if (!shared_pool)
return false;
if (!shared_pool->online)
{
auto logger = getLogger("PostgreSQLConnectionPool");
ConnectionPtr connection;
auto connection_available = shared_pool->pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
if (!connection_available)
{
LOG_WARNING(logger, "Reistablishing connection to {} has failed: unable to fetch connection within the timeout.", connection->getInfoForLog());
return true;
}
try
{
/// Create a new connection or reopen an old connection if it became invalid.
if (!connection)
connection = std::make_unique<Connection>(shared_pool->connection_info);
connection->connect();
shared_pool->online = true;
LOG_DEBUG(logger, "Reistablishing connection to {} has succeeded.", connection->getInfoForLog());
}
catch (const pqxx::broken_connection & pqxx_error)
{
if (interval_milliseconds >= 1000)
LOG_ERROR(logger, "Reistablishing connection to {} has failed: {}", connection->getInfoForLog(), pqxx_error.what());
shared_pool->online = false;
shared_pool->pool->returnObject(std::move(connection));
}
catch (const Poco::Exception & e)
{
if (interval_milliseconds >= 1000)
LOG_ERROR(logger, "Reistablishing connection to {} has failed: {}", connection->getInfoForLog(), e.displayText());
shared_pool->online = false;
shared_pool->pool->returnObject(std::move(connection));
}
catch (...)
{
if (interval_milliseconds >= 1000)
LOG_ERROR(logger, "Reistablishing connection to {} has failed.", connection->getInfoForLog());
shared_pool->online = false;
shared_pool->pool->returnObject(std::move(connection));
}
}
return true;
};
}
PoolWithFailover::PoolWithFailover(
const ReplicasConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_,
size_t connection_attempt_timeout_)
size_t connection_attempt_timeout_,
bool bg_reconnect_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
, bg_reconnect(bg_reconnect_)
{
LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
@ -47,7 +109,9 @@ PoolWithFailover::PoolWithFailover(
replica_configuration.username,
replica_configuration.password,
connection_attempt_timeout_);
replicas_with_priority[priority].emplace_back(connection_info, pool_size);
replicas_with_priority[priority].emplace_back(std::make_shared<PoolHolder>(connection_info, pool_size));
if (bg_reconnect)
DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_with_priority[priority].back()), pool_wait_timeout));
}
}
}
@ -58,10 +122,12 @@ PoolWithFailover::PoolWithFailover(
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_,
size_t connection_attempt_timeout_)
size_t connection_attempt_timeout_,
bool bg_reconnect_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
, bg_reconnect(bg_reconnect_)
{
LOG_TRACE(getLogger("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
@ -77,7 +143,9 @@ PoolWithFailover::PoolWithFailover(
configuration.username,
configuration.password,
connection_attempt_timeout_);
replicas_with_priority[0].emplace_back(connection_string, pool_size);
replicas_with_priority[0].emplace_back(std::make_shared<PoolHolder>(connection_string, pool_size));
if (bg_reconnect)
DB::ReplicasReconnector::instance().add(connectionReistablisher(std::weak_ptr(replicas_with_priority[0].back()), pool_wait_timeout));
}
}
@ -98,8 +166,11 @@ ConnectionHolderPtr PoolWithFailover::get()
{
auto & replica = replicas[i];
if (bg_reconnect && !replica->online)
continue;
ConnectionPtr connection;
auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
auto connection_available = replica->pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
if (!connection_available)
{
@ -112,29 +183,32 @@ ConnectionHolderPtr PoolWithFailover::get()
/// Create a new connection or reopen an old connection if it became invalid.
if (!connection)
{
connection = std::make_unique<Connection>(replica.connection_info);
connection = std::make_unique<Connection>(replica->connection_info);
LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog());
}
connection->connect();
replica->online = true;
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message = PreformattedMessage::create(
"Try {}. Connection to {} failed with error: {}\n",
try_idx + 1, DB::backQuote(replica.connection_info.host_port), pqxx_error.what());
try_idx + 1, DB::backQuote(replica->connection_info.host_port), pqxx_error.what());
replica.pool->returnObject(std::move(connection));
replica->online = false;
replica->pool->returnObject(std::move(connection));
continue;
}
catch (...)
{
replica.pool->returnObject(std::move(connection));
replica->online = false;
replica->pool->returnObject(std::move(connection));
throw;
}
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), auto_close_connection);
auto connection_holder = std::make_unique<ConnectionHolder>(replica->pool, std::move(connection), auto_close_connection);
/// Move all traversed replicas to the end.
if (replicas.size() > 1)

View File

@ -1,6 +1,7 @@
#pragma once
#include "config.h"
#include <memory>
#if USE_LIBPQXX
@ -29,7 +30,8 @@ public:
size_t pool_wait_timeout,
size_t max_tries_,
bool auto_close_connection_,
size_t connection_attempt_timeout_);
size_t connection_attempt_timeout_,
bool bg_reconnect_ = false);
explicit PoolWithFailover(
const DB::StoragePostgreSQL::Configuration & configuration,
@ -37,7 +39,8 @@ public:
size_t pool_wait_timeout,
size_t max_tries_,
bool auto_close_connection_,
size_t connection_attempt_timeout_);
size_t connection_attempt_timeout_,
bool bg_reconnect_ = false);
PoolWithFailover(const PoolWithFailover & other) = delete;
@ -48,19 +51,25 @@ private:
{
ConnectionInfo connection_info;
PoolPtr pool;
/// Pool is online.
std::atomic<bool> online{true};
PoolHolder(const ConnectionInfo & connection_info_, size_t pool_size)
: connection_info(connection_info_), pool(std::make_shared<Pool>(pool_size)) {}
};
/// Highest priority is 0, the bigger the number in map, the less the priority
using Replicas = std::vector<PoolHolder>;
using PoolHolderPtr = std::shared_ptr<PoolHolder>;
using Replicas = std::vector<PoolHolderPtr>;
using ReplicasWithPriority = std::map<size_t, Replicas>;
static auto connectionReistablisher(std::weak_ptr<PoolHolder> pool, size_t pool_wait_timeout);
ReplicasWithPriority replicas_with_priority;
size_t pool_wait_timeout;
size_t max_tries;
bool auto_close_connection;
bool bg_reconnect;
std::mutex mutex;
LoggerPtr log = getLogger("PostgreSQLConnectionPool");
};

View File

@ -30,6 +30,7 @@ namespace DB
// clang-format off
#define LIST_OF_SERVER_SETTINGS(DECLARE, ALIAS) \
DECLARE(UInt64, bg_reconnect_mysql_dict_interval, 1000, "Interval in milliseconds for reconnection attempts of failed MySQL and Postgres dictionaries having `background_reconnect` enabled.", 0) \
DECLARE(Bool, show_addresses_in_stack_traces, true, "If it is set true will show addresses in stack traces", 0) \
DECLARE(Bool, shutdown_wait_unfinished_queries, false, "If set true ClickHouse will wait for running queries finish before shutdown.", 0) \
DECLARE(UInt64, shutdown_wait_unfinished, 5, "Delay in seconds to wait for unfinished queries", 0) \

View File

@ -119,6 +119,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
.update_field = named_collection->getOrDefault<String>("update_field", ""),
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
.dont_check_update_time = named_collection->getOrDefault<bool>("dont_check_update_time", false),
.bg_reconnect = named_collection->getOrDefault<bool>("background_reconnect", false),
});
const auto & settings = global_context->getSettingsRef();
@ -147,7 +148,8 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false)
.dont_check_update_time = config.getBool(settings_config_prefix + ".dont_check_update_time", false),
.bg_reconnect = config.getBool(settings_config_prefix + ".background_reconnect", false),
});
pool = std::make_shared<mysqlxx::PoolWithFailover>(

View File

@ -39,6 +39,7 @@ public:
const std::string update_field;
const UInt64 update_lag;
const bool dont_check_update_time;
const bool bg_reconnect;
};
MySQLDictionarySource(

View File

@ -37,7 +37,7 @@ namespace ErrorCodes
}
static const ValidateKeysMultiset<ExternalDatabaseEqualKeysSet> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table", "schema",
"host", "port", "user", "password", "db", "database", "table", "schema", "background_reconnect",
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
#if USE_LIBPQXX
@ -217,6 +217,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
std::optional<PostgreSQLDictionarySource::Configuration> dictionary_configuration;
postgres::PoolWithFailover::ReplicasConfigurationByPriority replicas_by_priority;
bool bg_reconnect = false;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, context) : nullptr;
if (named_collection)
{
@ -242,6 +244,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
.update_lag = named_collection->getOrDefault<UInt64>("update_lag", 1),
});
bg_reconnect = named_collection->getOrDefault<bool>("background_reconnect", false);
replicas_by_priority[0].emplace_back(common_configuration);
}
else
@ -269,6 +273,7 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
.update_lag = config.getUInt64(fmt::format("{}.update_lag", settings_config_prefix), 1)
});
bg_reconnect = config.getBool(fmt::format("{}.background_reconnect", settings_config_prefix), false);
if (config.has(settings_config_prefix + ".replica"))
{
@ -319,7 +324,8 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
settings[Setting::postgresql_connection_pool_wait_timeout],
settings[Setting::postgresql_connection_pool_retries],
settings[Setting::postgresql_connection_pool_auto_close_connection],
settings[Setting::postgresql_connection_attempt_timeout]);
settings[Setting::postgresql_connection_attempt_timeout],
bg_reconnect);
return std::make_unique<PostgreSQLDictionarySource>(dict_struct, dictionary_configuration.value(), pool, sample_block);

View File

@ -0,0 +1,3 @@
<clickhouse>
<bg_reconnect_mysql_dict_interval>7000</bg_reconnect_mysql_dict_interval>
</clickhouse>

View File

@ -7,9 +7,14 @@ import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
DICTS = ["configs/dictionaries/mysql_dict1.xml", "configs/dictionaries/mysql_dict2.xml"]
CONFIG_FILES = ["configs/remote_servers.xml", "configs/named_collections.xml"]
CONFIG_FILES = [
"configs/remote_servers.xml",
"configs/named_collections.xml",
"configs/bg_reconnect.xml",
]
USER_CONFIGS = ["configs/users.xml"]
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
@ -436,3 +441,78 @@ def execute_mysql_query(connection, query):
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_template.format(table_name))
def test_background_dictionary_reconnect(started_cluster):
mysql_connection = get_mysql_conn(started_cluster)
execute_mysql_query(mysql_connection, "DROP TABLE IF EXISTS test.dict;")
execute_mysql_query(
mysql_connection,
"CREATE TABLE test.dict (id Integer, value Text);",
)
execute_mysql_query(
mysql_connection, "INSERT INTO test.dict VALUES (1, 'Value_1');"
)
query = instance.query
query(
f"""
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(MYSQL(
USER 'root'
PASSWORD 'clickhouse'
DB 'test'
QUERY $doc$SELECT * FROM test.dict;$doc$
BACKGROUND_RECONNECT 'true'
REPLICA(HOST 'mysql80' PORT 3306 PRIORITY 1)))
"""
)
result = query("SELECT value FROM dict WHERE id = 1")
assert result == "Value_1\n"
class MySQL_Instance:
pass
mysql_instance = MySQL_Instance()
mysql_instance.ip_address = started_cluster.mysql8_ip
with PartitionManager() as pm:
# Break connection to mysql server
pm.partition_instances(
instance, mysql_instance, action="REJECT --reject-with tcp-reset"
)
# Exhaust possible connection pool and initiate reconnection attempts
for _ in range(5):
try:
result = query("SELECT value FROM dict WHERE id = 1")
except Exception as e:
pass
counter = 0
# Based on bg_reconnect_mysql_dict_interval = 7000 in "configs/bg_reconnect.xml":
# connection should not be available for about 5-7 seconds
while counter <= 8:
try:
counter += 1
time.sleep(1)
result = query("SELECT value FROM dict WHERE id = 1")
break
except Exception as e:
pass
query("DROP DICTIONARY IF EXISTS dict;")
execute_mysql_query(mysql_connection, "DROP TABLE IF EXISTS test.dict;")
assert (
counter >= 4 and counter <= 8
), f"Connection reistablisher didn't meet anticipated time interval [4..8]: {counter}"

View File

@ -0,0 +1,3 @@
<clickhouse>
<bg_reconnect_mysql_dict_interval>7000</bg_reconnect_mysql_dict_interval>
</clickhouse>

View File

@ -7,6 +7,7 @@ import pytest
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.postgres_utility import get_postgres_conn
cluster = ClickHouseCluster(__file__)
@ -16,6 +17,7 @@ node1 = cluster.add_instance(
"configs/config.xml",
"configs/dictionaries/postgres_dict.xml",
"configs/named_collections.xml",
"configs/bg_reconnect.xml",
],
with_postgres=True,
with_postgres_cluster=True,
@ -588,6 +590,86 @@ def test_named_collection_from_ddl(started_cluster):
)
def test_background_dictionary_reconnect(started_cluster):
postgres_conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
database=True,
port=started_cluster.postgres_port,
)
postgres_conn.cursor().execute("DROP TABLE IF EXISTS dict")
postgres_conn.cursor().execute(
f"""
CREATE TABLE dict (
id integer NOT NULL, value text NOT NULL, PRIMARY KEY (id))
"""
)
postgres_conn.cursor().execute("INSERT INTO dict VALUES (1, 'Value_1')")
query = node1.query
query(
f"""
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(POSTGRESQL(
USER 'postgres'
PASSWORD 'mysecretpassword'
DB 'postgres_database'
QUERY $doc$SELECT * FROM dict;$doc$
BACKGROUND_RECONNECT 'true'
REPLICA(HOST '{started_cluster.postgres_ip}' PORT {started_cluster.postgres_port} PRIORITY 1)))
"""
)
result = query("SELECT value FROM dict WHERE id = 1")
assert result == "Value_1\n"
class PostgreSQL_Instance:
pass
postgres_instance = PostgreSQL_Instance()
postgres_instance.ip_address = started_cluster.postgres_ip
with PartitionManager() as pm:
# Break connection to mysql server
pm.partition_instances(
node1, postgres_instance, action="REJECT --reject-with tcp-reset"
)
# Exhaust possible connection pool and initiate reconnection attempts
for _ in range(5):
try:
result = query("SELECT value FROM dict WHERE id = 1")
except Exception as e:
pass
counter = 0
# Based on bg_reconnect_mysql_dict_interval = 7000 in "configs/bg_reconnect.xml":
# connection should not be available for about 5-7 seconds
while counter <= 8:
try:
counter += 1
time.sleep(1)
result = query("SELECT value FROM dict WHERE id = 1")
break
except Exception as e:
pass
query("DROP DICTIONARY IF EXISTS dict;")
postgres_conn.cursor().execute("DROP TABLE IF EXISTS dict")
assert (
counter >= 4 and counter <= 8
), f"Connection reistablisher didn't meet anticipated time interval [4..8]: {counter}"
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")