Add connection pool

This commit is contained in:
kssenii 2021-03-15 14:37:31 +00:00
parent 2303f1788c
commit ae64a24844
15 changed files with 269 additions and 89 deletions

View File

@ -28,13 +28,14 @@ namespace ErrorCodes
} }
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
ConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_) const UInt64 max_block_size_)
: query_str(query_str_) : query_str(query_str_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, connection(connection_) , connection_pool(connection_pool_)
, connection(connection_pool->get())
{ {
description.init(sample_block); description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns())) for (const auto idx : ext::range(0, description.sample_block.columns()))
@ -111,6 +112,9 @@ void PostgreSQLBlockInputStream::readSuffix()
stream->complete(); stream->complete();
tx->commit(); tx->commit();
} }
if (connection->is_open())
connection_pool->put(connection);
} }

View File

@ -9,18 +9,17 @@
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <pqxx/pqxx> #include <Storages/PostgreSQL/PostgreSQLConnectionPool.h>
namespace DB namespace DB
{ {
using ConnectionPtr = std::shared_ptr<pqxx::connection>;
class PostgreSQLBlockInputStream : public IBlockInputStream class PostgreSQLBlockInputStream : public IBlockInputStream
{ {
public: public:
PostgreSQLBlockInputStream( PostgreSQLBlockInputStream(
ConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_); const UInt64 max_block_size_);
@ -47,7 +46,8 @@ private:
const UInt64 max_block_size; const UInt64 max_block_size;
ExternalResultDescription description; ExternalResultDescription description;
ConnectionPtr connection; PostgreSQLConnectionPoolPtr connection_pool;
PostgreSQLConnection::ConnectionPtr connection;
std::unique_ptr<pqxx::read_transaction> tx; std::unique_ptr<pqxx::read_transaction> tx;
std::unique_ptr<pqxx::stream_from> stream; std::unique_ptr<pqxx::stream_from> stream;

View File

@ -36,7 +36,7 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE #include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE
#include <Storages/PostgreSQL/PostgreSQLConnection.h> #include <Storages/PostgreSQL/PostgreSQLConnectionPool.h>
#endif #endif
namespace DB namespace DB
@ -246,11 +246,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
auto parsed_host_port = parseAddress(host_port, 5432); auto parsed_host_port = parseAddress(host_port, 5432);
/// no connection is made here /// no connection is made here
auto connection = std::make_shared<PostgreSQLConnection>( auto connection_pool = std::make_shared<PostgreSQLConnectionPool>(
postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password); postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
return std::make_shared<DatabasePostgreSQL>( return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection, use_table_cache); context, metadata_path, engine_define, database_name, postgres_database_name, connection_pool, use_table_cache);
} }
#endif #endif

View File

@ -5,7 +5,6 @@
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <Storages/StoragePostgreSQL.h> #include <Storages/StoragePostgreSQL.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
@ -17,6 +16,7 @@
#include <Poco/File.h> #include <Poco/File.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h> #include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Storages/PostgreSQL/PostgreSQLConnectionPool.h>
namespace DB namespace DB
@ -40,14 +40,14 @@ DatabasePostgreSQL::DatabasePostgreSQL(
const ASTStorage * database_engine_define_, const ASTStorage * database_engine_define_,
const String & dbname_, const String & dbname_,
const String & postgres_dbname, const String & postgres_dbname,
PostgreSQLConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const bool cache_tables_) const bool cache_tables_)
: IDatabase(dbname_) : IDatabase(dbname_)
, global_context(context.getGlobalContext()) , global_context(context.getGlobalContext())
, metadata_path(metadata_path_) , metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone()) , database_engine_define(database_engine_define_->clone())
, dbname(postgres_dbname) , dbname(postgres_dbname)
, connection(std::move(connection_)) , connection_pool(std::move(connection_pool_))
, cache_tables(cache_tables_) , cache_tables(cache_tables_)
{ {
cleaner_task = context.getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); }); cleaner_task = context.getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
@ -90,11 +90,13 @@ std::unordered_set<std::string> DatabasePostgreSQL::fetchTablesList() const
std::unordered_set<std::string> tables; std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables " std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'"; "WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
pqxx::read_transaction tx(*connection->conn()); auto connection = connection_pool->get();
pqxx::read_transaction tx(*connection);
for (auto table_name : tx.stream<std::string>(query)) for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name)); tables.insert(std::get<0>(table_name));
connection_pool->put(connection);
return tables; return tables;
} }
@ -108,7 +110,8 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
"PostgreSQL table name cannot contain single quote or backslash characters, passed {}", table_name); "PostgreSQL table name cannot contain single quote or backslash characters, passed {}", table_name);
} }
pqxx::nontransaction tx(*connection->conn()); auto connection = connection_pool->get();
pqxx::nontransaction tx(*connection);
try try
{ {
@ -129,6 +132,7 @@ bool DatabasePostgreSQL::checkPostgresTable(const String & table_name) const
throw; throw;
} }
connection_pool->put(connection);
return true; return true;
} }
@ -163,13 +167,13 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, const Conte
return StoragePtr{}; return StoragePtr{};
auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls; auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchPostgreSQLTableStructure(connection->conn(), doubleQuoteString(table_name), use_nulls); auto columns = fetchPostgreSQLTableStructure(connection_pool->get(), table_name, use_nulls);
if (!columns) if (!columns)
return StoragePtr{}; return StoragePtr{};
auto storage = StoragePostgreSQL::create( auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), table_name, std::make_shared<PostgreSQLConnection>(*connection), StorageID(database_name, table_name), table_name, std::make_shared<PostgreSQLConnectionPool>(*connection_pool),
ColumnsDescription{*columns}, ConstraintsDescription{}, context); ColumnsDescription{*columns}, ConstraintsDescription{}, context);
if (cache_tables) if (cache_tables)

View File

@ -15,8 +15,8 @@ namespace DB
{ {
class Context; class Context;
class PostgreSQLConnection; class PostgreSQLConnectionPool;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>; using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;
/** Real-time access to table list and table structure from remote PostgreSQL. /** Real-time access to table list and table structure from remote PostgreSQL.
@ -34,7 +34,7 @@ public:
const ASTStorage * database_engine_define, const ASTStorage * database_engine_define,
const String & dbname_, const String & dbname_,
const String & postgres_dbname, const String & postgres_dbname,
PostgreSQLConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const bool cache_tables_); const bool cache_tables_);
String getEngineName() const override { return "PostgreSQL"; } String getEngineName() const override { return "PostgreSQL"; }
@ -72,7 +72,7 @@ private:
String metadata_path; String metadata_path;
ASTPtr database_engine_define; ASTPtr database_engine_define;
String dbname; String dbname;
PostgreSQLConnectionPtr connection; PostgreSQLConnectionPoolPtr connection_pool;
const bool cache_tables; const bool cache_tables;
mutable Tables cached_tables; mutable Tables cached_tables;

View File

@ -3,19 +3,31 @@
#endif #endif
#if USE_LIBPQXX #if USE_LIBPQXX
#include <Storages/PostgreSQL/PostgreSQLConnection.h> #include "PostgreSQLConnection.h"
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <IO/Operators.h>
namespace DB namespace DB
{ {
PostgreSQLConnection::PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password) PostgreSQLConnection::PostgreSQLConnection(
const String & connection_str_,
const String & address_)
: connection_str(connection_str_)
, address(address_)
{
}
PostgreSQLConnection::PostgreSQLConnection(
ConnectionPtr connection_,
const String & connection_str_,
const String & address_)
: connection(std::move(connection_))
, connection_str(connection_str_)
, address(address_)
{ {
address = host + ':' + std::to_string(port);
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
} }
@ -26,17 +38,25 @@ PostgreSQLConnection::PostgreSQLConnection(const PostgreSQLConnection & other)
} }
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::conn() PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get()
{ {
connect(); connect();
return connection; return connection;
} }
PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet()
{
if (tryConnect())
return connection;
return nullptr;
}
void PostgreSQLConnection::connect() void PostgreSQLConnection::connect()
{ {
if (!connection || !connection->is_open()) if (!connection || !connection->is_open())
connection = std::make_unique<pqxx::connection>(connection_str); connection = std::make_shared<pqxx::connection>(connection_str);
} }
@ -62,19 +82,6 @@ bool PostgreSQLConnection::tryConnect()
return true; return true;
} }
std::string PostgreSQLConnection::formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
{
WriteBufferFromOwnString out;
out << "dbname=" << quote << dbname
<< " host=" << quote << host
<< " port=" << port
<< " user=" << quote << user
<< " password=" << quote << password;
return out.str();
}
} }
#endif #endif

View File

@ -12,39 +12,45 @@
namespace DB namespace DB
{ {
/// Tiny connection class to make it more convenient to use.
/// Connection is not made until actually used.
class PostgreSQLConnection class PostgreSQLConnection
{ {
public: public:
using ConnectionPtr = std::shared_ptr<pqxx::connection>; using ConnectionPtr = std::shared_ptr<pqxx::connection>;
PostgreSQLConnection(std::string dbname, std::string host, UInt16 port, std::string user, std::string password); PostgreSQLConnection(
const String & connection_str_,
const String & address_);
PostgreSQLConnection(
ConnectionPtr connection_,
const String & connection_str_,
const String & address_);
PostgreSQLConnection(const PostgreSQLConnection & other); PostgreSQLConnection(const PostgreSQLConnection & other);
PostgreSQLConnection operator =(const PostgreSQLConnection &) = delete;
bool tryConnect();
ConnectionPtr conn();
const std::string & getAddress() { return address; } const std::string & getAddress() { return address; }
std::string & conn_str() { return connection_str; } ConnectionPtr get();
ConnectionPtr tryGet();
bool connected() { return tryConnect(); }
private: private:
void connect(); void connect();
static std::string formatConnectionString( bool tryConnect();
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
ConnectionPtr connection; ConnectionPtr connection;
std::string connection_str, address; std::string connection_str;
std::string address;
}; };
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>; using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
} }
#endif #endif

View File

@ -0,0 +1,95 @@
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include "PostgreSQLConnectionPool.h"
#include "PostgreSQLConnection.h"
namespace DB
{
PostgreSQLConnectionPool::PostgreSQLConnectionPool(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
: pool(POSTGRESQL_POOL_DEFAULT_SIZE)
{
address = host + ':' + std::to_string(port);
connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password));
/// No connection is made, just fill pool with non-connected connection objects.
for (size_t i = 0; i < POSTGRESQL_POOL_DEFAULT_SIZE; ++i)
pool.push(std::make_shared<PostgreSQLConnection>(connection_str, address));
}
PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other)
: connection_str(other.connection_str)
, address(other.address)
, pool(POSTGRESQL_POOL_DEFAULT_SIZE)
{
}
std::string PostgreSQLConnectionPool::formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password)
{
WriteBufferFromOwnString out;
out << "dbname=" << quote << dbname
<< " host=" << quote << host
<< " port=" << port
<< " user=" << quote << user
<< " password=" << quote << password;
return out.str();
}
PostgreSQLConnection::ConnectionPtr PostgreSQLConnectionPool::get()
{
PostgreSQLConnectionPtr connection = popConnection();
return connection->get();
}
PostgreSQLConnection::ConnectionPtr PostgreSQLConnectionPool::tryGet()
{
PostgreSQLConnectionPtr connection = popConnection();
return connection->tryGet();
}
PostgreSQLConnectionPtr PostgreSQLConnectionPool::popConnection()
{
PostgreSQLConnectionPtr connection;
if (pool.tryPop(connection, POSTGRESQL_POOL_WAIT_POP_PUSH_MS))
return connection;
return std::make_shared<PostgreSQLConnection>(connection_str, address);
}
void PostgreSQLConnectionPool::put(PostgreSQLConnection::ConnectionPtr connection)
{
pushConnection(std::make_shared<PostgreSQLConnection>(connection, connection_str, address));
}
void PostgreSQLConnectionPool::pushConnection(PostgreSQLConnectionPtr connection)
{
pool.tryPush(connection, POSTGRESQL_POOL_WAIT_POP_PUSH_MS);
}
bool PostgreSQLConnectionPool::connected()
{
PostgreSQLConnectionPtr connection = popConnection();
bool result = connection->connected();
pushConnection(connection);
return result;
}
}
#endif

View File

@ -0,0 +1,62 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Common/ConcurrentBoundedQueue.h>
#include "PostgreSQLConnection.h"
#include <pqxx/pqxx> // Y_IGNORE
namespace DB
{
class PostgreSQLConnectionPool
{
public:
PostgreSQLConnectionPool(std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other);
PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete;
/// Will throw if unable to setup connection.
PostgreSQLConnection::ConnectionPtr get();
/// Will return nullptr if connection was not established.
PostgreSQLConnection::ConnectionPtr tryGet();
void put(PostgreSQLConnection::ConnectionPtr connection);
bool connected();
std::string & conn_str() { return connection_str; }
private:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_POP_PUSH_MS = 100;
using Pool = ConcurrentBoundedQueue<PostgreSQLConnectionPtr>;
static std::string formatConnectionString(
std::string dbname, std::string host, UInt16 port, std::string user, std::string password);
/// Try get connection from connection pool with timeout.
/// If pool is empty after timeout, make a new connection.
PostgreSQLConnectionPtr popConnection();
/// Put connection object back into pool.
void pushConnection(PostgreSQLConnectionPtr connection);
std::string connection_str, address;
Pool pool;
};
using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;
}
#endif

View File

@ -1,5 +1,6 @@
#include "PostgreSQLReplicaConnection.h" #include "PostgreSQLReplicaConnection.h"
#include <Poco/Util/AbstractConfiguration.h> #include "PostgreSQLConnection.h"
#include <Common/Exception.h>
namespace DB namespace DB
@ -15,8 +16,7 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
const size_t num_retries_) const size_t num_retries_)
: log(&Poco::Logger::get("PostgreSQLConnection")) : num_retries(num_retries_)
, num_retries(num_retries_)
{ {
auto db = config.getString(config_prefix + ".db", ""); auto db = config.getString(config_prefix + ".db", "");
auto host = config.getString(config_prefix + ".host", ""); auto host = config.getString(config_prefix + ".host", "");
@ -41,33 +41,32 @@ PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(
auto replica_user = config.getString(replica_name + ".user", user); auto replica_user = config.getString(replica_name + ".user", user);
auto replica_password = config.getString(replica_name + ".password", password); auto replica_password = config.getString(replica_name + ".password", password);
replicas[priority] = std::make_shared<PostgreSQLConnection>(db, replica_host, replica_port, replica_user, replica_password); replicas[priority] = std::make_shared<PostgreSQLConnectionPool>(db, replica_host, replica_port, replica_user, replica_password);
} }
} }
} }
else else
{ {
replicas[0] = std::make_shared<PostgreSQLConnection>(db, host, port, user, password); replicas[0] = std::make_shared<PostgreSQLConnectionPool>(db, host, port, user, password);
} }
} }
PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other) PostgreSQLReplicaConnection::PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other)
: log(&Poco::Logger::get("PostgreSQLConnection")) : replicas(other.replicas)
, replicas(other.replicas)
, num_retries(other.num_retries) , num_retries(other.num_retries)
{ {
} }
PostgreSQLConnection::ConnectionPtr PostgreSQLReplicaConnection::get() PostgreSQLConnectionPoolPtr PostgreSQLReplicaConnection::get()
{ {
for (size_t i = 0; i < num_retries; ++i) for (size_t i = 0; i < num_retries; ++i)
{ {
for (auto & replica : replicas) for (auto & replica : replicas)
{ {
if (replica.second->tryConnect()) if (replica.second->connected())
return replica.second->conn(); return replica.second;
} }
} }

View File

@ -1,9 +1,9 @@
#pragma once #pragma once
#include "PostgreSQLConnection.h"
#include <Core/Types.h> #include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h> #include "PostgreSQLConnectionPool.h"
namespace DB namespace DB
{ {
@ -21,14 +21,13 @@ public:
PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other); PostgreSQLReplicaConnection(const PostgreSQLReplicaConnection & other);
PostgreSQLConnection::ConnectionPtr get(); PostgreSQLConnectionPoolPtr get();
private: private:
/// Highest priority is 0, the bigger the number in map, the less the priority /// Highest priority is 0, the bigger the number in map, the less the priority
using ReplicasByPriority = std::map<size_t, PostgreSQLConnectionPtr>; using ReplicasByPriority = std::map<size_t, PostgreSQLConnectionPoolPtr>;
Poco::Logger * log;
ReplicasByPriority replicas; ReplicasByPriority replicas;
size_t num_retries; size_t num_retries;
}; };

View File

@ -42,7 +42,7 @@ namespace ErrorCodes
StoragePostgreSQL::StoragePostgreSQL( StoragePostgreSQL::StoragePostgreSQL(
const StorageID & table_id_, const StorageID & table_id_,
const String & remote_table_name_, const String & remote_table_name_,
PostgreSQLConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
const Context & context_, const Context & context_,
@ -51,7 +51,7 @@ StoragePostgreSQL::StoragePostgreSQL(
, remote_table_name(remote_table_name_) , remote_table_name(remote_table_name_)
, remote_table_schema(remote_table_schema_) , remote_table_schema(remote_table_schema_)
, global_context(context_) , global_context(context_)
, connection(std::move(connection_)) , connection_pool(std::move(connection_pool_))
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_); storage_metadata.setColumns(columns_);
@ -88,7 +88,7 @@ Pipe StoragePostgreSQL::read(
} }
return Pipe(std::make_shared<SourceFromInputStream>( return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<PostgreSQLBlockInputStream>(connection->conn(), query, sample_block, max_block_size_))); std::make_shared<PostgreSQLBlockInputStream>(connection_pool, query, sample_block, max_block_size_)));
} }
@ -97,10 +97,11 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream
public: public:
explicit PostgreSQLBlockOutputStream( explicit PostgreSQLBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
ConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const std::string & remote_table_name_) const std::string & remote_table_name_)
: metadata_snapshot(metadata_snapshot_) : metadata_snapshot(metadata_snapshot_)
, connection(connection_) , connection_pool(connection_pool_)
, connection(connection_pool->get())
, remote_table_name(remote_table_name_) , remote_table_name(remote_table_name_)
{ {
} }
@ -166,6 +167,9 @@ public:
stream_inserter->complete(); stream_inserter->complete();
work->commit(); work->commit();
} }
if (connection->is_open())
connection_pool->put(connection);
} }
@ -276,7 +280,8 @@ public:
private: private:
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
ConnectionPtr connection; PostgreSQLConnectionPoolPtr connection_pool;
PostgreSQLConnection::ConnectionPtr connection;
std::string remote_table_name; std::string remote_table_name;
std::unique_ptr<pqxx::work> work; std::unique_ptr<pqxx::work> work;
@ -287,7 +292,7 @@ private:
BlockOutputStreamPtr StoragePostgreSQL::write( BlockOutputStreamPtr StoragePostgreSQL::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */) const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /* context */)
{ {
return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, connection->conn(), remote_table_name); return std::make_shared<PostgreSQLBlockOutputStream>(metadata_snapshot, connection_pool, remote_table_name);
} }
@ -312,7 +317,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
if (engine_args.size() == 6) if (engine_args.size() == 6)
remote_table_schema = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>(); remote_table_schema = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
auto connection = std::make_shared<PostgreSQLConnection>( auto connection_pool = std::make_shared<PostgreSQLConnectionPool>(
engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(), engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(),
parsed_host_port.first, parsed_host_port.first,
parsed_host_port.second, parsed_host_port.second,
@ -320,7 +325,7 @@ void registerStoragePostgreSQL(StorageFactory & factory)
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>()); engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
return StoragePostgreSQL::create( return StoragePostgreSQL::create(
args.table_id, remote_table, connection, args.columns, args.constraints, args.context, remote_table_schema); args.table_id, remote_table, connection_pool, args.columns, args.constraints, args.context, remote_table_schema);
}, },
{ {
.source_access_type = AccessType::POSTGRES, .source_access_type = AccessType::POSTGRES,

View File

@ -9,14 +9,13 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Storages/PostgreSQL/PostgreSQLConnectionPool.h>
#include <pqxx/pqxx> #include <pqxx/pqxx>
namespace DB namespace DB
{ {
class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>, public IStorage class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>, public IStorage
{ {
@ -24,8 +23,8 @@ class StoragePostgreSQL final : public ext::shared_ptr_helper<StoragePostgreSQL>
public: public:
StoragePostgreSQL( StoragePostgreSQL(
const StorageID & table_id_, const StorageID & table_id_,
const std::string & remote_table_name_, const String & remote_table_name_,
PostgreSQLConnectionPtr connection_, PostgreSQLConnectionPoolPtr connection_pool_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
const Context & context_, const Context & context_,
@ -50,7 +49,7 @@ private:
String remote_table_name; String remote_table_name;
String remote_table_schema; String remote_table_schema;
Context global_context; Context global_context;
PostgreSQLConnectionPtr connection; PostgreSQLConnectionPoolPtr connection_pool;
}; };
} }

View File

@ -30,7 +30,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
auto columns = getActualTableStructure(context); auto columns = getActualTableStructure(context);
auto result = std::make_shared<StoragePostgreSQL>( auto result = std::make_shared<StoragePostgreSQL>(
StorageID(getDatabaseName(), table_name), remote_table_name, StorageID(getDatabaseName(), table_name), remote_table_name,
connection, columns, ConstraintsDescription{}, context, remote_table_schema); connection_pool, columns, ConstraintsDescription{}, context, remote_table_schema);
result->startup(); result->startup();
return result; return result;
@ -41,7 +41,7 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(const Contex
{ {
const bool use_nulls = context.getSettingsRef().external_table_functions_use_nulls; const bool use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
auto columns = fetchPostgreSQLTableStructure( auto columns = fetchPostgreSQLTableStructure(
connection->conn(), connection_pool->get(),
remote_table_schema.empty() ? doubleQuoteString(remote_table_name) remote_table_schema.empty() ? doubleQuoteString(remote_table_name)
: doubleQuoteString(remote_table_schema) + '.' + doubleQuoteString(remote_table_name), : doubleQuoteString(remote_table_schema) + '.' + doubleQuoteString(remote_table_name),
use_nulls); use_nulls);
@ -73,7 +73,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const
if (args.size() == 6) if (args.size() == 6)
remote_table_schema = args[5]->as<ASTLiteral &>().value.safeGet<String>(); remote_table_schema = args[5]->as<ASTLiteral &>().value.safeGet<String>();
connection = std::make_shared<PostgreSQLConnection>( connection_pool = std::make_shared<PostgreSQLConnectionPool>(
args[1]->as<ASTLiteral &>().value.safeGet<String>(), args[1]->as<ASTLiteral &>().value.safeGet<String>(),
parsed_host_port.first, parsed_host_port.first,
parsed_host_port.second, parsed_host_port.second,

View File

@ -10,8 +10,8 @@
namespace DB namespace DB
{ {
class PostgreSQLConnection; class PostgreSQLConnectionPool;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>; using PostgreSQLConnectionPoolPtr = std::shared_ptr<PostgreSQLConnectionPool>;
class TableFunctionPostgreSQL : public ITableFunction class TableFunctionPostgreSQL : public ITableFunction
{ {
@ -31,7 +31,7 @@ private:
String connection_str; String connection_str;
String remote_table_name, remote_table_schema; String remote_table_name, remote_table_schema;
PostgreSQLConnectionPtr connection; PostgreSQLConnectionPoolPtr connection_pool;
}; };
} }