Add auto close for postgres connection

This commit is contained in:
kssenii 2022-06-23 23:44:59 +02:00
parent 65776b2b4e
commit 2c5aeaaa1a
13 changed files with 136 additions and 35 deletions

View File

@ -73,6 +73,7 @@ void Connection::connect()
if (!connection || !connection->is_open())
updateConnection();
}
}
#endif

View File

@ -32,7 +32,10 @@ struct ConnectionInfo
class Connection : private boost::noncopyable
{
public:
Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3);
explicit Connection(
const ConnectionInfo & connection_info_,
bool replication_ = false,
size_t num_tries = 3);
void execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec);

View File

@ -7,6 +7,7 @@
#include <pqxx/pqxx>
#include <Core/Types.h>
#include <base/BorrowedObjectPool.h>
#include <Common/logger_useful.h>
#include "Connection.h"
@ -20,11 +21,20 @@ class ConnectionHolder
{
public:
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_) : pool(pool_), connection(std::move(connection_)) {}
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_, bool auto_close_)
: pool(pool_)
, connection(std::move(connection_))
, auto_close(auto_close_)
{}
ConnectionHolder(const ConnectionHolder & other) = delete;
~ConnectionHolder() { pool->returnObject(std::move(connection)); }
~ConnectionHolder()
{
if (auto_close)
connection.reset();
pool->returnObject(std::move(connection));
}
pqxx::connection & get()
{
@ -39,6 +49,7 @@ public:
private:
PoolPtr pool;
ConnectionPtr connection;
bool auto_close;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;

View File

@ -5,6 +5,7 @@
#include "Utils.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -20,10 +21,14 @@ namespace postgres
{
PoolWithFailover::PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
@ -40,10 +45,14 @@ PoolWithFailover::PoolWithFailover(
}
PoolWithFailover::PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
@ -94,7 +103,9 @@ ConnectionHolderPtr PoolWithFailover::get()
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.connection_info.host_port << "` failed: " << pqxx_error.what() << "\n";
error_message << "Try " << try_idx + 1 << ". "
<< "Connection to " << DB::backQuote(replica.connection_info.host_port)
<< " failed with error: " << pqxx_error.what() << "\n";
replica.pool->returnObject(std::move(connection));
continue;
@ -105,7 +116,7 @@ ConnectionHolderPtr PoolWithFailover::get()
throw;
}
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection));
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), auto_close_connection);
/// Move all traversed replicas to the end.
if (replicas.size() > 1)

View File

@ -12,6 +12,10 @@
#include <Storages/ExternalDataSourceConfiguration.h>
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
namespace postgres
{
@ -21,21 +25,19 @@ class PoolWithFailover
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
public:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
size_t pool_size,
size_t pool_wait_timeout,
size_t max_tries_,
bool auto_close_connection_);
PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
size_t pool_size,
size_t pool_wait_timeout,
size_t max_tries_,
bool auto_close_connection_);
PoolWithFailover(const PoolWithFailover & other) = delete;
@ -58,6 +60,7 @@ private:
ReplicasWithPriority replicas_with_priority;
size_t pool_wait_timeout;
size_t max_tries;
bool auto_close_connection;
std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("PostgreSQLConnectionPool");
};

View File

@ -428,6 +428,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
M(Bool, postgresql_connection_pool_auto_close_connection, false, "Close connection before returning connection to the pool.", 0) \
M(UInt64, glob_expansion_max_elements, 1000, "Maximum number of allowed addresses (For external storages, table functions, etc).", 0) \
M(UInt64, odbc_bridge_connection_pool_size, 16, "Connection pool size for each connection settings string in ODBC bridge.", 0) \
M(Bool, odbc_bridge_use_connection_pooling, true, "Use connection pooling in ODBC bridge. If set to false, a new connection is created every time", 0) \

View File

@ -341,9 +341,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_args.size() >= 6)
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
auto pool = std::make_shared<postgres::PoolWithFailover>(configuration,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache);

View File

@ -191,10 +191,13 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const auto settings_config_prefix = config_prefix + ".postgresql";
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
configuration.replicas_configurations,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
PostgreSQLDictionarySource::Configuration dictionary_configuration
{

View File

@ -95,10 +95,13 @@ StorageExternalDistributed::StorageExternalDistributed(
postgres_conf.set(configuration);
postgres_conf.addresses = addresses;
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
postgres_conf,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
shard = std::make_shared<StoragePostgreSQL>(table_id_, std::move(pool), configuration.table, columns_, constraints_, String{});
break;

View File

@ -449,9 +449,12 @@ void registerStoragePostgreSQL(StorageFactory & factory)
factory.registerStorage("PostgreSQL", [](const StorageFactory::Arguments & args)
{
auto configuration = StoragePostgreSQL::getConfiguration(args.engine_args, args.getLocalContext());
const auto & settings = args.getContext()->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(configuration,
args.getContext()->getSettingsRef().postgresql_connection_pool_size,
args.getContext()->getSettingsRef().postgresql_connection_pool_wait_timeout);
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<StoragePostgreSQL>(
args.table_id,

View File

@ -62,9 +62,13 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, Contex
throw Exception("Table function 'PostgreSQL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
configuration.emplace(StoragePostgreSQL::getConfiguration(func_args.arguments->children, context));
connection_pool = std::make_shared<postgres::PoolWithFailover>(*configuration,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
const auto & settings = context->getSettingsRef();
connection_pool = std::make_shared<postgres::PoolWithFailover>(
*configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
}

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<postgresql_connection_pool_auto_close_connection>1</postgresql_connection_pool_auto_close_connection>
</default>
</profiles>
</clickhouse>

View File

@ -10,7 +10,10 @@ node1 = cluster.add_instance(
"node1", main_configs=["configs/named_collections.xml"], with_postgres=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/named_collections.xml"], with_postgres_cluster=True
"node2",
main_configs=["configs/named_collections.xml"],
user_configs=["configs/settings.xml"],
with_postgres_cluster=True,
)
@ -19,6 +22,7 @@ def started_cluster():
try:
cluster.start()
node1.query("CREATE DATABASE test")
node2.query("CREATE DATABASE test")
yield cluster
finally:
@ -625,6 +629,48 @@ def test_uuid(started_cluster):
assert result.strip() == "Nullable(UUID)"
def test_auto_close_connection(started_cluster):
conn = get_postgres_conn(
started_cluster.postgres_ip, started_cluster.postgres_port, database=False
)
cursor = conn.cursor()
database_name = "auto_close_connection_test"
cursor.execute(f"DROP DATABASE IF EXISTS {database_name}")
cursor.execute(f"CREATE DATABASE {database_name}")
conn = get_postgres_conn(
started_cluster.postgres_ip,
started_cluster.postgres_port,
database=True,
database_name=database_name,
)
cursor = conn.cursor()
cursor.execute("CREATE TABLE test_table (key integer, value integer)")
node2.query(
f"""
CREATE TABLE test.test_table (key UInt32, value UInt32)
ENGINE = PostgreSQL(postgres1, database='{database_name}', table='test_table')
"""
)
result = node2.query(
"INSERT INTO test.test_table SELECT number, number FROM numbers(1000)",
user="default",
)
result = node2.query("SELECT * FROM test.test_table LIMIT 100", user="default")
count = int(
node2.query(
f"SELECT numbackends FROM postrgesql(postgres1, database='{database_name}', table='pg_stat_database') WHERE datname = '{database_name}'"
)
)
# Connection from python
assert count == 1
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")