From ae64a2484463c6ce9d51c683ed63d63861c9f1fd Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 15 Mar 2021 14:37:31 +0000 Subject: [PATCH] Add connection pool --- .../PostgreSQLBlockInputStream.cpp | 8 +- src/DataStreams/PostgreSQLBlockInputStream.h | 8 +- src/Databases/DatabaseFactory.cpp | 6 +- .../PostgreSQL/DatabasePostgreSQL.cpp | 18 ++-- src/Databases/PostgreSQL/DatabasePostgreSQL.h | 8 +- .../PostgreSQL/PostgreSQLConnection.cpp | 49 ++++++---- .../PostgreSQL/PostgreSQLConnection.h | 32 ++++--- .../PostgreSQL/PostgreSQLConnectionPool.cpp | 95 +++++++++++++++++++ .../PostgreSQL/PostgreSQLConnectionPool.h | 62 ++++++++++++ .../PostgreSQLReplicaConnection.cpp | 19 ++-- .../PostgreSQL/PostgreSQLReplicaConnection.h | 9 +- src/Storages/StoragePostgreSQL.cpp | 23 +++-- src/Storages/StoragePostgreSQL.h | 9 +- .../TableFunctionPostgreSQL.cpp | 6 +- src/TableFunctions/TableFunctionPostgreSQL.h | 6 +- 15 files changed, 269 insertions(+), 89 deletions(-) create mode 100644 src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp create mode 100644 src/Storages/PostgreSQL/PostgreSQLConnectionPool.h diff --git a/src/DataStreams/PostgreSQLBlockInputStream.cpp b/src/DataStreams/PostgreSQLBlockInputStream.cpp index 8350dc86849..e597dce6983 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.cpp +++ b/src/DataStreams/PostgreSQLBlockInputStream.cpp @@ -28,13 +28,14 @@ namespace ErrorCodes } PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( - ConnectionPtr connection_, + PostgreSQLConnectionPoolPtr connection_pool_, const std::string & query_str_, const Block & sample_block, const UInt64 max_block_size_) : query_str(query_str_) , max_block_size(max_block_size_) - , connection(connection_) + , connection_pool(connection_pool_) + , connection(connection_pool->get()) { description.init(sample_block); for (const auto idx : ext::range(0, description.sample_block.columns())) @@ -111,6 +112,9 @@ void PostgreSQLBlockInputStream::readSuffix() stream->complete(); tx->commit(); } + + if (connection->is_open()) + connection_pool->put(connection); } diff --git a/src/DataStreams/PostgreSQLBlockInputStream.h b/src/DataStreams/PostgreSQLBlockInputStream.h index b88c81cca0a..e5946265e76 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.h +++ b/src/DataStreams/PostgreSQLBlockInputStream.h @@ -9,18 +9,17 @@ #include #include #include -#include +#include namespace DB { -using ConnectionPtr = std::shared_ptr; class PostgreSQLBlockInputStream : public IBlockInputStream { public: PostgreSQLBlockInputStream( - ConnectionPtr connection_, + PostgreSQLConnectionPoolPtr connection_pool_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_); @@ -47,7 +46,8 @@ private: const UInt64 max_block_size; ExternalResultDescription description; - ConnectionPtr connection; + PostgreSQLConnectionPoolPtr connection_pool; + PostgreSQLConnection::ConnectionPtr connection; std::unique_ptr tx; std::unique_ptr stream; diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index cd0143556c9..5f36921edba 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -36,7 +36,7 @@ #if USE_LIBPQXX #include // Y_IGNORE -#include +#include #endif namespace DB @@ -246,11 +246,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String auto parsed_host_port = parseAddress(host_port, 5432); /// no connection is made here - auto connection = std::make_shared( + auto connection_pool = std::make_shared( postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password); return std::make_shared( - context, metadata_path, engine_define, database_name, postgres_database_name, connection, use_table_cache); + context, metadata_path, engine_define, database_name, postgres_database_name, connection_pool, use_table_cache); } #endif diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp index 9b10ed9680e..a116dea3ace 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -17,6 +16,7 @@ #include #include #include +#include namespace DB @@ -40,14 +40,14 @@ DatabasePostgreSQL::DatabasePostgreSQL( const ASTStorage * database_engine_define_, const String & dbname_, const String & postgres_dbname, - PostgreSQLConnectionPtr connection_, + PostgreSQLConnectionPoolPtr connection_pool_, const bool cache_tables_) : IDatabase(dbname_) , global_context(context.getGlobalContext()) , metadata_path(metadata_path_) , database_engine_define(database_engine_define_->clone()) , dbname(postgres_dbname) - , connection(std::move(connection_)) + , connection_pool(std::move(connection_pool_)) , cache_tables(cache_tables_) { cleaner_task = context.getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); }); @@ -90,11 +90,13 @@ std::unordered_set DatabasePostgreSQL::fetchTablesList() const std::unordered_set tables; std::string query = "SELECT tablename FROM pg_catalog.pg_tables " "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'"; - pqxx::read_transaction tx(*connection->conn()); + auto connection = connection_pool->get(); + pqxx::read_transaction tx(*connection); for (auto table_name : tx.stream(query)) tables.insert(std::get<0>(table_name)); + connection_pool->put(connection); return tables; } @@ -108,7 +110,8 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const "PostgreSQL table name cannot contain single quote or backslash characters, passed {}", table_name); } - pqxx::nontransaction tx(*connection->conn()); + auto connection = connection_pool->get(); + pqxx::nontransaction tx(*connection); try { @@ -129,6 +132,7 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const throw; } + connection_pool->put(connection); return true; } @@ -163,13 +167,13 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Conte return StoragePtr{}; auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls; - auto columns = fetchPostgreSQLTableStructure(connection->conn(), doubleQuoteString(table_name), use_nulls); + auto columns = fetchPostgreSQLTableStructure(connection_pool->get(), table_name, use_nulls); if (!columns) return StoragePtr{}; auto storage = StoragePostgreSQL::create( - StorageID(database_name, table_name), table_name, std::make_shared(*connection), + StorageID(database_name, table_name), table_name, std::make_shared(*connection_pool), ColumnsDescription{*columns}, ConstraintsDescription{}, context); if (cache_tables) diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.h b/src/Databases/PostgreSQL/DatabasePostgreSQL.h index 56ea6645f15..7be34df2123 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.h @@ -15,8 +15,8 @@ namespace DB { class Context; -class PostgreSQLConnection; -using PostgreSQLConnectionPtr = std::shared_ptr; +class PostgreSQLConnectionPool; +using PostgreSQLConnectionPoolPtr = std::shared_ptr; /** Real-time access to table list and table structure from remote PostgreSQL. @@ -34,7 +34,7 @@ public: const ASTStorage * database_engine_define, const String & dbname_, const String & postgres_dbname, - PostgreSQLConnectionPtr connection_, + PostgreSQLConnectionPoolPtr connection_pool_, const bool cache_tables_); String getEngineName() const override { return "PostgreSQL"; } @@ -72,7 +72,7 @@ private: String metadata_path; ASTPtr database_engine_define; String dbname; - PostgreSQLConnectionPtr connection; + PostgreSQLConnectionPoolPtr connection_pool; const bool cache_tables; mutable Tables cached_tables; diff --git a/src/Storages/PostgreSQL/PostgreSQLConnection.cpp b/src/Storages/PostgreSQL/PostgreSQLConnection.cpp index 58eb7192eb9..bce37a632d2 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnection.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLConnection.cpp @@ -3,19 +3,31 @@ #endif #if USE_LIBPQXX -#include -#include -#include +#include "PostgreSQLConnection.h" #include +#include namespace DB { -PostgreSQLConnection::PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password) +PostgreSQLConnection::PostgreSQLConnection( + const String & connection_str_, + const String & address_) + : connection_str(connection_str_) + , address(address_) +{ +} + + +PostgreSQLConnection::PostgreSQLConnection( + ConnectionPtr connection_, + const String & connection_str_, + const String & address_) + : connection(std::move(connection_)) + , connection_str(connection_str_) + , address(address_) { - address = host + ':' + std::to_string(port); - connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password)); } @@ -26,17 +38,25 @@ PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other) } -PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::conn() +PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get() { connect(); return connection; } +PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet() +{ + if (tryConnect()) + return connection; + return nullptr; +} + + void PostgreSQLConnection::connect() { if (!connection || !connection->is_open()) - connection = std::make_unique(connection_str); + connection = std::make_shared(connection_str); } @@ -62,19 +82,6 @@ bool PostgreSQLConnection::tryConnect() return true; } - -std::string PostgreSQLConnection::formatConnectionString( - std::string dbname, std::string host, UInt16 port, std::string user, std::string password) -{ - WriteBufferFromOwnString out; - out << "dbname=" << quote << dbname - << " host=" << quote << host - << " port=" << port - << " user=" << quote << user - << " password=" << quote << password; - return out.str(); -} - } #endif diff --git a/src/Storages/PostgreSQL/PostgreSQLConnection.h b/src/Storages/PostgreSQL/PostgreSQLConnection.h index f23308ddef9..f14d5b40d3a 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnection.h +++ b/src/Storages/PostgreSQL/PostgreSQLConnection.h @@ -12,39 +12,45 @@ namespace DB { -/// Tiny connection class to make it more convenient to use. -/// Connection is not made until actually used. + class PostgreSQLConnection { + public: using ConnectionPtr = std::shared_ptr; - PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password); + PostgreSQLConnection( + const String & connection_str_, + const String & address_); + + PostgreSQLConnection( + ConnectionPtr connection_, + const String & connection_str_, + const String & address_); PostgreSQLConnection(const PostgreSQLConnection & other); - PostgreSQLConnection operator =(const PostgreSQLConnection &) = delete; - - bool tryConnect(); - - ConnectionPtr conn(); - const std::string & getAddress() { return address; } - std::string & conn_str() { return connection_str; } + ConnectionPtr get(); + + ConnectionPtr tryGet(); + + bool connected() { return tryConnect(); } private: void connect(); - static std::string formatConnectionString( - std::string dbname, std::string host, UInt16 port, std::string user, std::string password); + bool tryConnect(); ConnectionPtr connection; - std::string connection_str, address; + std::string connection_str; + std::string address; }; using PostgreSQLConnectionPtr = std::shared_ptr; } + #endif diff --git a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp new file mode 100644 index 00000000000..7f2b1e96438 --- /dev/null +++ b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp @@ -0,0 +1,95 @@ +#if !defined(ARCADIA_BUILD) +#include "config_core.h" +#endif + +#if USE_LIBPQXX +#include +#include +#include "PostgreSQLConnectionPool.h" +#include "PostgreSQLConnection.h" + + +namespace DB +{ + +PostgreSQLConnectionPool::PostgreSQLConnectionPool( + std::string dbname, std::string host, UInt16 port, std::string user, std::string password) + : pool(POSTGRESQL_POOL_DEFAULT_SIZE) +{ + address = host + ':' + std::to_string(port); + connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password)); + + /// No connection is made, just fill pool with non-connected connection objects. + for (size_t i = 0; i < POSTGRESQL_POOL_DEFAULT_SIZE; ++i) + pool.push(std::make_shared(connection_str, address)); +} + + +PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other) + : connection_str(other.connection_str) + , address(other.address) + , pool(POSTGRESQL_POOL_DEFAULT_SIZE) +{ +} + + +std::string PostgreSQLConnectionPool::formatConnectionString( + std::string dbname, std::string host, UInt16 port, std::string user, std::string password) +{ + WriteBufferFromOwnString out; + out << "dbname=" << quote << dbname + << " host=" << quote << host + << " port=" << port + << " user=" << quote << user + << " password=" << quote << password; + return out.str(); +} + + +PostgreSQLConnection::ConnectionPtr PostgreSQLConnectionPool::get() +{ + PostgreSQLConnectionPtr connection = popConnection(); + return connection->get(); +} + + +PostgreSQLConnection::ConnectionPtr PostgreSQLConnectionPool::tryGet() +{ + PostgreSQLConnectionPtr connection = popConnection(); + return connection->tryGet(); +} + + +PostgreSQLConnectionPtr PostgreSQLConnectionPool::popConnection() +{ + PostgreSQLConnectionPtr connection; + if (pool.tryPop(connection, POSTGRESQL_POOL_WAIT_POP_PUSH_MS)) + return connection; + + return std::make_shared(connection_str, address); +} + + +void PostgreSQLConnectionPool::put(PostgreSQLConnection::ConnectionPtr connection) +{ + pushConnection(std::make_shared(connection, connection_str, address)); +} + + +void PostgreSQLConnectionPool::pushConnection(PostgreSQLConnectionPtr connection) +{ + pool.tryPush(connection, POSTGRESQL_POOL_WAIT_POP_PUSH_MS); +} + + +bool PostgreSQLConnectionPool::connected() +{ + PostgreSQLConnectionPtr connection = popConnection(); + bool result = connection->connected(); + pushConnection(connection); + return result; +} + +} + +#endif diff --git a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h new file mode 100644 index 00000000000..cace5d4c433 --- /dev/null +++ b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h @@ -0,0 +1,62 @@ +#pragma once + +#if !defined(ARCADIA_BUILD) +#include "config_core.h" +#endif + +#if USE_LIBPQXX +#include +#include "PostgreSQLConnection.h" +#include // Y_IGNORE + +namespace DB +{ + +class PostgreSQLConnectionPool +{ + +public: + + PostgreSQLConnectionPool(std::string dbname, std::string host, UInt16 port, std::string user, std::string password); + + PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other); + + PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete; + + /// Will throw if unable to setup connection. + PostgreSQLConnection::ConnectionPtr get(); + + /// Will return nullptr if connection was not established. + PostgreSQLConnection::ConnectionPtr tryGet(); + + void put(PostgreSQLConnection::ConnectionPtr connection); + + bool connected(); + + std::string & conn_str() { return connection_str; } + +private: + static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16; + static constexpr inline auto POSTGRESQL_POOL_WAIT_POP_PUSH_MS = 100; + + using Pool = ConcurrentBoundedQueue; + + static std::string formatConnectionString( + std::string dbname, std::string host, UInt16 port, std::string user, std::string password); + + /// Try get connection from connection pool with timeout. + /// If pool is empty after timeout, make a new connection. + PostgreSQLConnectionPtr popConnection(); + + /// Put connection object back into pool. + void pushConnection(PostgreSQLConnectionPtr connection); + + std::string connection_str, address; + Pool pool; +}; + +using PostgreSQLConnectionPoolPtr = std::shared_ptr; + +} + +#endif diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp index 0c1efc16e05..f605e95ee39 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.cpp @@ -1,5 +1,6 @@ #include "PostgreSQLReplicaConnection.h" -#include +#include "PostgreSQLConnection.h" +#include namespace DB @@ -15,8 +16,7 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection( const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const size_t num_retries_) - : log(&Poco::Logger::get("PostgreSQLConnection")) - , num_retries(num_retries_) + : num_retries(num_retries_) { auto db = config.getString(config_prefix + ".db", ""); auto host = config.getString(config_prefix + ".host", ""); @@ -41,33 +41,32 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection( auto replica_user = config.getString(replica_name + ".user", user); auto replica_password = config.getString(replica_name + ".password", password); - replicas[priority] = std::make_shared(db, replica_host, replica_port, replica_user, replica_password); + replicas[priority] = std::make_shared(db, replica_host, replica_port, replica_user, replica_password); } } } else { - replicas[0] = std::make_shared(db, host, port, user, password); + replicas[0] = std::make_shared(db, host, port, user, password); } } PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other) - : log(&Poco::Logger::get("PostgreSQLConnection")) - , replicas(other.replicas) + : replicas(other.replicas) , num_retries(other.num_retries) { } -PostgreSQLConnection::ConnectionPtr PostgreSQLReplicaConnection::get() +PostgreSQLConnectionPoolPtr PostgreSQLReplicaConnection::get() { for (size_t i = 0; i < num_retries; ++i) { for (auto & replica : replicas) { - if (replica.second->tryConnect()) - return replica.second->conn(); + if (replica.second->connected()) + return replica.second; } } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h index e58d4bc8100..ca36c9f5435 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConnection.h @@ -1,9 +1,9 @@ #pragma once -#include "PostgreSQLConnection.h" #include #include -#include +#include "PostgreSQLConnectionPool.h" + namespace DB { @@ -21,14 +21,13 @@ public: PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other); - PostgreSQLConnection::ConnectionPtr get(); + PostgreSQLConnectionPoolPtr get(); private: /// Highest priority is 0, the bigger the number in map, the less the priority - using ReplicasByPriority = std::map; + using ReplicasByPriority = std::map; - Poco::Logger * log; ReplicasByPriority replicas; size_t num_retries; }; diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index ac1a5569293..be82a9dd204 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -42,7 +42,7 @@ namespace ErrorCodes StoragePostgreSQL::StoragePostgreSQL( const StorageID & table_id_, const String & remote_table_name_, - PostgreSQLConnectionPtr connection_, + PostgreSQLConnectionPoolPtr connection_pool_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, @@ -51,7 +51,7 @@ StoragePostgreSQL::StoragePostgreSQL( , remote_table_name(remote_table_name_) , remote_table_schema(remote_table_schema_) , global_context(context_) - , connection(std::move(connection_)) + , connection_pool(std::move(connection_pool_)) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -88,7 +88,7 @@ Pipe StoragePostgreSQL::read( } return Pipe(std::make_shared( - std::make_shared(connection->conn(), query, sample_block, max_block_size_))); + std::make_shared(connection_pool, query, sample_block, max_block_size_))); } @@ -97,10 +97,11 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream public: explicit PostgreSQLBlockOutputStream( const StorageMetadataPtr & metadata_snapshot_, - ConnectionPtr connection_, + PostgreSQLConnectionPoolPtr connection_pool_, const std::string & remote_table_name_) : metadata_snapshot(metadata_snapshot_) - , connection(connection_) + , connection_pool(connection_pool_) + , connection(connection_pool->get()) , remote_table_name(remote_table_name_) { } @@ -166,6 +167,9 @@ public: stream_inserter->complete(); work->commit(); } + + if (connection->is_open()) + connection_pool->put(connection); } @@ -276,7 +280,8 @@ public: private: StorageMetadataPtr metadata_snapshot; - ConnectionPtr connection; + PostgreSQLConnectionPoolPtr connection_pool; + PostgreSQLConnection::ConnectionPtr connection; std::string remote_table_name; std::unique_ptr work; @@ -287,7 +292,7 @@ private: BlockOutputStreamPtr StoragePostgreSQL::write( const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */) { - return std::make_shared(metadata_snapshot, connection->conn(), remote_table_name); + return std::make_shared(metadata_snapshot, connection_pool, remote_table_name); } @@ -312,7 +317,7 @@ void registerStoragePostgreSQL(StorageFactory & factory) if (engine_args.size() == 6) remote_table_schema = engine_args[5]->as().value.safeGet(); - auto connection = std::make_shared( + auto connection_pool = std::make_shared( engine_args[1]->as().value.safeGet(), parsed_host_port.first, parsed_host_port.second, @@ -320,7 +325,7 @@ void registerStoragePostgreSQL(StorageFactory & factory) engine_args[4]->as().value.safeGet()); return StoragePostgreSQL::create( - args.table_id, remote_table, connection, args.columns, args.constraints, args.context, remote_table_schema); + args.table_id, remote_table, connection_pool, args.columns, args.constraints, args.context, remote_table_schema); }, { .source_access_type = AccessType::POSTGRES, diff --git a/src/Storages/StoragePostgreSQL.h b/src/Storages/StoragePostgreSQL.h index 0d574c9e98e..fb80352f58d 100644 --- a/src/Storages/StoragePostgreSQL.h +++ b/src/Storages/StoragePostgreSQL.h @@ -9,14 +9,13 @@ #include #include #include +#include #include namespace DB { -class PostgreSQLConnection; -using PostgreSQLConnectionPtr = std::shared_ptr; class StoragePostgreSQL final : public ext::shared_ptr_helper, public IStorage { @@ -24,8 +23,8 @@ class StoragePostgreSQL final : public ext::shared_ptr_helper public: StoragePostgreSQL( const StorageID & table_id_, - const std::string & remote_table_name_, - PostgreSQLConnectionPtr connection_, + const String & remote_table_name_, + PostgreSQLConnectionPoolPtr connection_pool_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, @@ -50,7 +49,7 @@ private: String remote_table_name; String remote_table_schema; Context global_context; - PostgreSQLConnectionPtr connection; + PostgreSQLConnectionPoolPtr connection_pool; }; } diff --git a/src/TableFunctions/TableFunctionPostgreSQL.cpp b/src/TableFunctions/TableFunctionPostgreSQL.cpp index 9b00f86a565..f20aae11648 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.cpp +++ b/src/TableFunctions/TableFunctionPostgreSQL.cpp @@ -30,7 +30,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/, auto columns = getActualTableStructure(context); auto result = std::make_shared( StorageID(getDatabaseName(), table_name), remote_table_name, - connection, columns, ConstraintsDescription{}, context, remote_table_schema); + connection_pool, columns, ConstraintsDescription{}, context, remote_table_schema); result->startup(); return result; @@ -41,7 +41,7 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(const Contex { const bool use_nulls = context.getSettingsRef().external_table_functions_use_nulls; auto columns = fetchPostgreSQLTableStructure( - connection->conn(), + connection_pool->get(), remote_table_schema.empty() ? doubleQuoteString(remote_table_name) : doubleQuoteString(remote_table_schema) + '.' + doubleQuoteString(remote_table_name), use_nulls); @@ -73,7 +73,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const if (args.size() == 6) remote_table_schema = args[5]->as().value.safeGet(); - connection = std::make_shared( + connection_pool = std::make_shared( args[1]->as().value.safeGet(), parsed_host_port.first, parsed_host_port.second, diff --git a/src/TableFunctions/TableFunctionPostgreSQL.h b/src/TableFunctions/TableFunctionPostgreSQL.h index 92e061e18ca..601b2a090b2 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.h +++ b/src/TableFunctions/TableFunctionPostgreSQL.h @@ -10,8 +10,8 @@ namespace DB { -class PostgreSQLConnection; -using PostgreSQLConnectionPtr = std::shared_ptr; +class PostgreSQLConnectionPool; +using PostgreSQLConnectionPoolPtr = std::shared_ptr; class TableFunctionPostgreSQL : public ITableFunction { @@ -31,7 +31,7 @@ private: String connection_str; String remote_table_name, remote_table_schema; - PostgreSQLConnectionPtr connection; + PostgreSQLConnectionPoolPtr connection_pool; }; }