From 22b515fbc9fdf638dbe4e7ed796ddad5a0b02e07 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 27 Mar 2021 20:14:02 +0000 Subject: [PATCH] Add namespace, simplify names --- contrib/amqp | 1 + .../PostgreSQLBlockInputStream.cpp | 2 +- src/DataStreams/PostgreSQLBlockInputStream.h | 4 +- src/Databases/DatabaseFactory.cpp | 2 +- .../PostgreSQL/DatabasePostgreSQL.cpp | 2 +- src/Databases/PostgreSQL/DatabasePostgreSQL.h | 4 +- .../fetchPostgreSQLTableStructure.cpp | 2 +- .../fetchPostgreSQLTableStructure.h | 2 +- .../PostgreSQLDictionarySource.cpp | 2 +- src/Dictionaries/PostgreSQLDictionarySource.h | 2 +- .../PostgreSQL/PostgreSQLConnection.cpp | 14 +++---- .../PostgreSQL/PostgreSQLConnection.h | 37 ++++++++++--------- .../PostgreSQL/PostgreSQLConnectionPool.cpp | 34 ++++++++--------- .../PostgreSQL/PostgreSQLConnectionPool.h | 35 +++++++++--------- .../PostgreSQL/PostgreSQLPoolWithFailover.cpp | 24 ++++++------ .../PostgreSQL/PostgreSQLPoolWithFailover.h | 18 ++++----- src/Storages/StorageExternalDistributed.cpp | 2 +- src/Storages/StoragePostgreSQL.cpp | 10 ++--- src/Storages/StoragePostgreSQL.h | 4 +- .../TableFunctionPostgreSQL.cpp | 2 +- src/TableFunctions/TableFunctionPostgreSQL.h | 6 +-- 21 files changed, 106 insertions(+), 103 deletions(-) create mode 160000 contrib/amqp diff --git a/contrib/amqp b/contrib/amqp new file mode 160000 index 00000000000..03781aaff0f --- /dev/null +++ b/contrib/amqp @@ -0,0 +1 @@ +Subproject commit 03781aaff0f10ef41f902b8cf865fe0067180c10 diff --git a/src/DataStreams/PostgreSQLBlockInputStream.cpp b/src/DataStreams/PostgreSQLBlockInputStream.cpp index abe63520729..8b8bfa5753a 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.cpp +++ b/src/DataStreams/PostgreSQLBlockInputStream.cpp @@ -28,7 +28,7 @@ namespace ErrorCodes } PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( - PostgreSQLConnectionHolderPtr connection_, + postgres::ConnectionHolderPtr connection_, const std::string & query_str_, const Block & sample_block, const UInt64 max_block_size_) diff --git a/src/DataStreams/PostgreSQLBlockInputStream.h b/src/DataStreams/PostgreSQLBlockInputStream.h index 7795cc2d36a..b172cae5b62 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.h +++ b/src/DataStreams/PostgreSQLBlockInputStream.h @@ -19,7 +19,7 @@ class PostgreSQLBlockInputStream : public IBlockInputStream { public: PostgreSQLBlockInputStream( - PostgreSQLConnectionHolderPtr connection_, + postgres::ConnectionHolderPtr connection_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_); @@ -46,7 +46,7 @@ private: const UInt64 max_block_size; ExternalResultDescription description; - PostgreSQLConnectionHolderPtr connection; + postgres::ConnectionHolderPtr connection; std::unique_ptr tx; std::unique_ptr stream; diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index c8cad344a87..06929e1378c 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -248,7 +248,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String auto parsed_host_port = parseAddress(host_port, 5432); /// no connection is made here - auto connection_pool = std::make_shared( + auto connection_pool = std::make_shared( postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password, diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp index 3e2067b0334..1134912aeac 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.cpp @@ -40,7 +40,7 @@ DatabasePostgreSQL::DatabasePostgreSQL( const ASTStorage * database_engine_define_, const String & dbname_, const String & postgres_dbname, - PostgreSQLPoolWithFailoverPtr connection_pool_, + postgres::PoolWithFailoverPtr connection_pool_, const bool cache_tables_) : IDatabase(dbname_) , global_context(context.getGlobalContext()) diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQL.h b/src/Databases/PostgreSQL/DatabasePostgreSQL.h index 44e1f46d184..6cb978d7cf1 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabasePostgreSQL.h @@ -33,7 +33,7 @@ public: const ASTStorage * database_engine_define, const String & dbname_, const String & postgres_dbname, - PostgreSQLPoolWithFailoverPtr connection_pool_, + postgres::PoolWithFailoverPtr connection_pool_, const bool cache_tables_); String getEngineName() const override { return "PostgreSQL"; } @@ -71,7 +71,7 @@ private: String metadata_path; ASTPtr database_engine_define; String dbname; - PostgreSQLPoolWithFailoverPtr connection_pool; + postgres::PoolWithFailoverPtr connection_pool; const bool cache_tables; mutable Tables cached_tables; diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index 4887b47d98a..1269413b9b4 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -94,7 +94,7 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl std::shared_ptr fetchPostgreSQLTableStructure( - PostgreSQLConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls) + postgres::ConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls) { auto columns = NamesAndTypesList(); diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h index 96305349062..f40929aa91d 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h @@ -12,7 +12,7 @@ namespace DB { std::shared_ptr fetchPostgreSQLTableStructure( - PostgreSQLConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls); + postgres::ConnectionHolderPtr connection, const String & postgres_table_name, bool use_nulls); } diff --git a/src/Dictionaries/PostgreSQLDictionarySource.cpp b/src/Dictionaries/PostgreSQLDictionarySource.cpp index 0e78a0b09cf..a952781db66 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.cpp +++ b/src/Dictionaries/PostgreSQLDictionarySource.cpp @@ -31,7 +31,7 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource( const Block & sample_block_) : dict_struct{dict_struct_} , sample_block(sample_block_) - , connection(std::make_shared(config_, config_prefix)) + , connection(std::make_shared(config_, config_prefix)) , log(&Poco::Logger::get("PostgreSQLDictionarySource")) , db(config_.getString(fmt::format("{}.db", config_prefix), "")) , table(config_.getString(fmt::format("{}.table", config_prefix), "")) diff --git a/src/Dictionaries/PostgreSQLDictionarySource.h b/src/Dictionaries/PostgreSQLDictionarySource.h index 662df0b57d3..f1520a37a79 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.h +++ b/src/Dictionaries/PostgreSQLDictionarySource.h @@ -51,7 +51,7 @@ private: const DictionaryStructure dict_struct; Block sample_block; - PostgreSQLPoolWithFailoverPtr connection; + postgres::PoolWithFailoverPtr connection; Poco::Logger * log; const std::string db; diff --git a/src/Storages/PostgreSQL/PostgreSQLConnection.cpp b/src/Storages/PostgreSQL/PostgreSQLConnection.cpp index 61caba8ac81..6e485bdca69 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnection.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLConnection.cpp @@ -8,10 +8,10 @@ #include -namespace DB +namespace postgres { -PostgreSQLConnection::PostgreSQLConnection( +Connection::Connection( const String & connection_str_, const String & address_) : connection_str(connection_str_) @@ -20,14 +20,14 @@ PostgreSQLConnection::PostgreSQLConnection( } -PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::get() +pqxx::ConnectionPtr Connection::get() { connectIfNeeded(); return connection; } -PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet() +pqxx::ConnectionPtr Connection::tryGet() { if (tryConnectIfNeeded()) return connection; @@ -35,17 +35,17 @@ PostgreSQLConnection::ConnectionPtr PostgreSQLConnection::tryGet() } -void PostgreSQLConnection::connectIfNeeded() +void Connection::connectIfNeeded() { if (!connection || !connection->is_open()) { LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", getAddress()); - connection = std::make_shared(connection_str); + connection = std::make_unique(connection_str); } } -bool PostgreSQLConnection::tryConnectIfNeeded() +bool Connection::tryConnectIfNeeded() { try { diff --git a/src/Storages/PostgreSQL/PostgreSQLConnection.h b/src/Storages/PostgreSQL/PostgreSQLConnection.h index c8e1c3dcc91..488f45a068d 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnection.h +++ b/src/Storages/PostgreSQL/PostgreSQLConnection.h @@ -10,24 +10,27 @@ #include -namespace DB +namespace pqxx +{ + using ConnectionPtr = std::shared_ptr; +} + +namespace postgres { -class PostgreSQLConnection +class Connection { -using ConnectionPtr = std::shared_ptr; - public: - PostgreSQLConnection( + Connection( const String & connection_str_, const String & address_); - PostgreSQLConnection(const PostgreSQLConnection & other) = delete; + Connection(const Connection & other) = delete; - ConnectionPtr get(); + pqxx::ConnectionPtr get(); - ConnectionPtr tryGet(); + pqxx::ConnectionPtr tryGet(); bool isConnected() { return tryConnectIfNeeded(); } @@ -38,40 +41,40 @@ private: const std::string & getAddress() { return address; } - ConnectionPtr connection; + pqxx::ConnectionPtr connection; std::string connection_str, address; }; -using PostgreSQLConnectionPtr = std::shared_ptr; +using ConnectionPtr = std::shared_ptr; -class PostgreSQLConnectionHolder +class ConnectionHolder { -using Pool = ConcurrentBoundedQueue; +using Pool = ConcurrentBoundedQueue; static constexpr inline auto POSTGRESQL_POOL_WAIT_MS = 50; public: - PostgreSQLConnectionHolder(PostgreSQLConnectionPtr connection_, Pool & pool_) + ConnectionHolder(ConnectionPtr connection_, Pool & pool_) : connection(std::move(connection_)) , pool(pool_) { } - PostgreSQLConnectionHolder(const PostgreSQLConnectionHolder & other) = delete; + ConnectionHolder(const ConnectionHolder & other) = delete; - ~PostgreSQLConnectionHolder() { pool.tryPush(connection, POSTGRESQL_POOL_WAIT_MS); } + ~ConnectionHolder() { pool.tryPush(connection, POSTGRESQL_POOL_WAIT_MS); } pqxx::connection & conn() const { return *connection->get(); } bool isConnected() { return connection->isConnected(); } private: - PostgreSQLConnectionPtr connection; + ConnectionPtr connection; Pool & pool; }; -using PostgreSQLConnectionHolderPtr = std::shared_ptr; +using ConnectionHolderPtr = std::shared_ptr; } diff --git a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp index 659877b6b49..42c716dcf14 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.cpp @@ -10,10 +10,10 @@ #include -namespace DB +namespace postgres { -PostgreSQLConnectionPool::PostgreSQLConnectionPool( +ConnectionPool::ConnectionPool( std::string dbname, std::string host, UInt16 port, @@ -37,7 +37,7 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool( } -PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other) +ConnectionPool::ConnectionPool(const ConnectionPool & other) : pool(std::make_shared(other.pool_size)) , connection_str(other.connection_str) , address(other.address) @@ -49,46 +49,46 @@ PostgreSQLConnectionPool::PostgreSQLConnectionPool(const PostgreSQLConnectionPoo } -void PostgreSQLConnectionPool::initialize() +void ConnectionPool::initialize() { /// No connection is made, just fill pool with non-connected connection objects. for (size_t i = 0; i < pool_size; ++i) - pool->push(std::make_shared(connection_str, address)); + pool->push(std::make_shared(connection_str, address)); } -std::string PostgreSQLConnectionPool::formatConnectionString( +std::string ConnectionPool::formatConnectionString( std::string dbname, std::string host, UInt16 port, std::string user, std::string password) { - WriteBufferFromOwnString out; - out << "dbname=" << quote << dbname - << " host=" << quote << host + DB::WriteBufferFromOwnString out; + out << "dbname=" << DB::quote << dbname + << " host=" << DB::quote << host << " port=" << port - << " user=" << quote << user - << " password=" << quote << password; + << " user=" << DB::quote << user + << " password=" << DB::quote << password; return out.str(); } -PostgreSQLConnectionHolderPtr PostgreSQLConnectionPool::get() +ConnectionHolderPtr ConnectionPool::get() { - PostgreSQLConnectionPtr connection; + ConnectionPtr connection; /// Always blocks by default. if (block_on_empty_pool) { /// pop to ConcurrentBoundedQueue will block until it is non-empty. pool->pop(connection); - return std::make_shared(connection, *pool); + return std::make_shared(connection, *pool); } if (pool->tryPop(connection, pool_wait_timeout)) { - return std::make_shared(connection, *pool); + return std::make_shared(connection, *pool); } - connection = std::make_shared(connection_str, address); - return std::make_shared(connection, *pool); + connection = std::make_shared(connection_str, address); + return std::make_shared(connection, *pool); } } diff --git a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h index 5b993339052..f1239fc78b5 100644 --- a/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h +++ b/src/Storages/PostgreSQL/PostgreSQLConnectionPool.h @@ -8,42 +8,41 @@ #include "PostgreSQLConnection.h" -namespace DB +namespace postgres { -class PostgreSQLPoolWithFailover; - +class PoolWithFailover; /// Connection pool size is defined by user with setting `postgresql_connection_pool_size` (default 16). /// If pool is empty, it will block until there are available connections. /// If setting `connection_pool_wait_timeout` is defined, it will not block on empty pool and will /// wait until the timeout and then create a new connection. (only for storage/db engine) -class PostgreSQLConnectionPool +class ConnectionPool { -friend class PostgreSQLPoolWithFailover; +friend class PoolWithFailover; static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16; public: - PostgreSQLConnectionPool( - std::string dbname, - std::string host, - UInt16 port, - std::string user, - std::string password, - size_t pool_size_ = POSTGRESQL_POOL_DEFAULT_SIZE, - int64_t pool_wait_timeout_ = -1); + ConnectionPool( + std::string dbname, + std::string host, + UInt16 port, + std::string user, + std::string password, + size_t pool_size_ = POSTGRESQL_POOL_DEFAULT_SIZE, + int64_t pool_wait_timeout_ = -1); - PostgreSQLConnectionPool(const PostgreSQLConnectionPool & other); + ConnectionPool(const ConnectionPool & other); - PostgreSQLConnectionPool operator =(const PostgreSQLConnectionPool &) = delete; + ConnectionPool operator =(const ConnectionPool &) = delete; - PostgreSQLConnectionHolderPtr get(); + ConnectionHolderPtr get(); private: - using Pool = ConcurrentBoundedQueue; + using Pool = ConcurrentBoundedQueue; using PoolPtr = std::shared_ptr; static std::string formatConnectionString( @@ -58,7 +57,7 @@ private: bool block_on_empty_pool; }; -using PostgreSQLConnectionPoolPtr = std::shared_ptr; +using ConnectionPoolPtr = std::shared_ptr; } diff --git a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp index 137fdd9bb4f..8d453d0773f 100644 --- a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.cpp @@ -7,16 +7,18 @@ namespace DB { - namespace ErrorCodes { extern const int POSTGRESQL_CONNECTION_FAILURE; } +} +namespace postgres +{ -PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover( +PoolWithFailover::PoolWithFailover( const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, + const std::string & config_prefix, const size_t max_tries_) : max_tries(max_tries_) { @@ -43,18 +45,18 @@ PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover( auto replica_user = config.getString(replica_name + ".user", user); auto replica_password = config.getString(replica_name + ".password", password); - replicas_with_priority[priority].emplace_back(std::make_shared(db, replica_host, replica_port, replica_user, replica_password)); + replicas_with_priority[priority].emplace_back(std::make_shared(db, replica_host, replica_port, replica_user, replica_password)); } } } else { - replicas_with_priority[0].emplace_back(std::make_shared(db, host, port, user, password)); + replicas_with_priority[0].emplace_back(std::make_shared(db, host, port, user, password)); } } -PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover( +PoolWithFailover::PoolWithFailover( const std::string & database, const std::string & hosts_pattern, const uint16_t port, @@ -64,24 +66,24 @@ PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover( const size_t max_addresses) : max_tries(max_tries_) { - auto hosts = parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses); + auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses); for (const auto & host : hosts) { /// Replicas have the same priority, but traversed replicas are moved to the end of the queue after each fetch. - replicas_with_priority[0].emplace_back(std::make_shared(database, host, port, user, password)); + replicas_with_priority[0].emplace_back(std::make_shared(database, host, port, user, password)); LOG_TRACE(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address {}:{} to pool", host, port); } } -PostgreSQLPoolWithFailover::PostgreSQLPoolWithFailover(const PostgreSQLPoolWithFailover & other) +PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) : replicas_with_priority(other.replicas_with_priority) , max_tries(other.max_tries) { } -PostgreSQLConnectionHolderPtr PostgreSQLPoolWithFailover::get() +ConnectionHolderPtr PoolWithFailover::get() { std::lock_guard lock(mutex); @@ -103,7 +105,7 @@ PostgreSQLConnectionHolderPtr PostgreSQLPoolWithFailover::get() } } - throw Exception(ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas"); + throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas"); } } diff --git a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h index 61b95cb9649..45d2657b34e 100644 --- a/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h +++ b/src/Storages/PostgreSQL/PostgreSQLPoolWithFailover.h @@ -5,22 +5,22 @@ #include "PostgreSQLConnectionPool.h" -namespace DB +namespace postgres { -class PostgreSQLPoolWithFailover +class PoolWithFailover { public: static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5; static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES = 5; - PostgreSQLPoolWithFailover( + PoolWithFailover( const Poco::Util::AbstractConfiguration & config, - const String & config_prefix, + const std::string & config_prefix, const size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); - PostgreSQLPoolWithFailover( + PoolWithFailover( const std::string & database, const std::string & host_pattern, uint16_t port, @@ -29,14 +29,14 @@ public: const size_t max_tries = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, const size_t max_addresses = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_ADDRESSES); - PostgreSQLPoolWithFailover(const PostgreSQLPoolWithFailover & other); + PoolWithFailover(const PoolWithFailover & other); - PostgreSQLConnectionHolderPtr get(); + ConnectionHolderPtr get(); private: /// Highest priority is 0, the bigger the number in map, the less the priority - using Replicas = std::vector; + using Replicas = std::vector; using ReplicasWithPriority = std::map; ReplicasWithPriority replicas_with_priority; @@ -44,6 +44,6 @@ private: std::mutex mutex; }; -using PostgreSQLPoolWithFailoverPtr = std::shared_ptr; +using PoolWithFailoverPtr = std::shared_ptr; } diff --git a/src/Storages/StorageExternalDistributed.cpp b/src/Storages/StorageExternalDistributed.cpp index c118b78492d..4c19dd5dad2 100644 --- a/src/Storages/StorageExternalDistributed.cpp +++ b/src/Storages/StorageExternalDistributed.cpp @@ -88,7 +88,7 @@ StorageExternalDistributed::StorageExternalDistributed( } else if (engine_name == "PostgreSQL") { - PostgreSQLPoolWithFailover pool( + postgres::PoolWithFailover pool( remote_database, parsed_shard_description.first, parsed_shard_description.second, diff --git a/src/Storages/StoragePostgreSQL.cpp b/src/Storages/StoragePostgreSQL.cpp index 26e5c7e18cf..be43319090e 100644 --- a/src/Storages/StoragePostgreSQL.cpp +++ b/src/Storages/StoragePostgreSQL.cpp @@ -41,7 +41,7 @@ namespace ErrorCodes StoragePostgreSQL::StoragePostgreSQL( const StorageID & table_id_, - const PostgreSQLPoolWithFailover & pool_, + const postgres::PoolWithFailover & pool_, const String & remote_table_name_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, @@ -51,7 +51,7 @@ StoragePostgreSQL::StoragePostgreSQL( , remote_table_name(remote_table_name_) , remote_table_schema(remote_table_schema_) , global_context(context_) - , pool(std::make_shared(pool_)) + , pool(std::make_shared(pool_)) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -97,7 +97,7 @@ class PostgreSQLBlockOutputStream : public IBlockOutputStream public: explicit PostgreSQLBlockOutputStream( const StorageMetadataPtr & metadata_snapshot_, - PostgreSQLConnectionHolderPtr connection_, + postgres::ConnectionHolderPtr connection_, const std::string & remote_table_name_) : metadata_snapshot(metadata_snapshot_) , connection(std::move(connection_)) @@ -276,7 +276,7 @@ public: private: StorageMetadataPtr metadata_snapshot; - PostgreSQLConnectionHolderPtr connection; + postgres::ConnectionHolderPtr connection; std::string remote_table_name; std::unique_ptr work; @@ -316,7 +316,7 @@ void registerStoragePostgreSQL(StorageFactory & factory) if (engine_args.size() == 6) remote_table_schema = engine_args[5]->as().value.safeGet(); - PostgreSQLPoolWithFailover pool( + postgres::PoolWithFailover pool( remote_database, parsed_host_port.first, parsed_host_port.second, diff --git a/src/Storages/StoragePostgreSQL.h b/src/Storages/StoragePostgreSQL.h index 86748488a6a..ec06a698c1f 100644 --- a/src/Storages/StoragePostgreSQL.h +++ b/src/Storages/StoragePostgreSQL.h @@ -23,7 +23,7 @@ class StoragePostgreSQL final : public ext::shared_ptr_helper public: StoragePostgreSQL( const StorageID & table_id_, - const PostgreSQLPoolWithFailover & pool_, + const postgres::PoolWithFailover & pool_, const String & remote_table_name_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, @@ -49,7 +49,7 @@ private: String remote_table_name; String remote_table_schema; Context global_context; - PostgreSQLPoolWithFailoverPtr pool; + postgres::PoolWithFailoverPtr pool; }; } diff --git a/src/TableFunctions/TableFunctionPostgreSQL.cpp b/src/TableFunctions/TableFunctionPostgreSQL.cpp index 5dfbc2267f2..77768b1afbe 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.cpp +++ b/src/TableFunctions/TableFunctionPostgreSQL.cpp @@ -73,7 +73,7 @@ void TableFunctionPostgreSQL::parseArguments(const ASTPtr & ast_function, const if (args.size() == 6) remote_table_schema = args[5]->as().value.safeGet(); - connection_pool = std::make_shared( + connection_pool = std::make_shared( args[1]->as().value.safeGet(), parsed_host_port.first, parsed_host_port.second, diff --git a/src/TableFunctions/TableFunctionPostgreSQL.h b/src/TableFunctions/TableFunctionPostgreSQL.h index ba5146939b2..96d46b4fe5a 100644 --- a/src/TableFunctions/TableFunctionPostgreSQL.h +++ b/src/TableFunctions/TableFunctionPostgreSQL.h @@ -5,14 +5,12 @@ #if USE_LIBPQXX #include +#include namespace DB { -class PostgreSQLPoolWithFailover; -using PostgreSQLPoolWithFailoverPtr = std::shared_ptr; - class TableFunctionPostgreSQL : public ITableFunction { public: @@ -31,7 +29,7 @@ private: String connection_str; String remote_table_name, remote_table_schema; - PostgreSQLPoolWithFailoverPtr connection_pool; + postgres::PoolWithFailoverPtr connection_pool; }; }