diff --git a/src/Core/PostgreSQL/PostgreSQLConnection.cpp b/src/Core/PostgreSQL/PostgreSQLConnection.cpp index 12f0232f326..b6128e909ef 100644 --- a/src/Core/PostgreSQL/PostgreSQLConnection.cpp +++ b/src/Core/PostgreSQL/PostgreSQLConnection.cpp @@ -1,9 +1,8 @@ #include "PostgreSQLConnection.h" #if USE_LIBPQXX -#include -#include #include +#include namespace DB @@ -17,11 +16,41 @@ namespace ErrorCodes namespace postgres { -Connection::Connection( - const String & connection_str_, - const String & address_) - : connection_str(connection_str_) - , address(address_) +ConnectionInfo formatConnectionString( + std::string dbname, std::string host, UInt16 port, std::string user, std::string password) +{ + DB::WriteBufferFromOwnString out; + out << "dbname=" << DB::quote << dbname + << " host=" << DB::quote << host + << " port=" << port + << " user=" << DB::quote << user + << " password=" << DB::quote << password; + return std::make_pair(out.str(), host + ':' + DB::toString(port)); +} + + +ConnectionPtr createReplicationConnection(const ConnectionInfo & connection_info) +{ + auto new_connection_info = std::make_pair( + fmt::format("{} replication=database", connection_info.first), + connection_info.second); + + auto connection = std::make_shared(new_connection_info); + connection->get()->set_variable("default_transaction_isolation", "'repeatable read'"); + + return connection; +} + + +template +std::shared_ptr createTransaction(pqxx::connection & connection) +{ + return std::make_shared(connection); +} + + +Connection::Connection(const ConnectionInfo & connection_info_) + : connection_info(connection_info_) { } @@ -54,8 +83,8 @@ void Connection::connectIfNeeded() { if (!connection || !connection->is_open()) { - LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", getAddress()); - connection = std::make_shared(connection_str); + connection = std::make_shared(connection_info.first); + LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", connection_info.second); } } @@ -70,8 +99,7 @@ bool Connection::tryConnectIfNeeded() { LOG_ERROR( &Poco::Logger::get("PostgreSQLConnection"), - "Unable to setup connection to {}, reason: {}", - getAddress(), pqxx_error.what()); + "Unable to setup connection to {}, reason: {}", connection_info.second, pqxx_error.what()); return false; } catch (...) diff --git a/src/Core/PostgreSQL/PostgreSQLConnection.h b/src/Core/PostgreSQL/PostgreSQLConnection.h index 94bb0635914..dfed426b462 100644 --- a/src/Core/PostgreSQL/PostgreSQLConnection.h +++ b/src/Core/PostgreSQL/PostgreSQLConnection.h @@ -8,6 +8,7 @@ #include // Y_IGNORE #include #include +#include namespace pqxx @@ -20,13 +21,40 @@ namespace pqxx namespace postgres { +class Connection; +using ConnectionPtr = std::shared_ptr; + + +/// Connection string and address without login/password (for error logs) +using ConnectionInfo = std::pair; + +ConnectionInfo formatConnectionString( + std::string dbname, std::string host, UInt16 port, std::string user, std::string password); + +ConnectionPtr createReplicationConnection(const ConnectionInfo & connection_info); + + +template +class Transaction +{ +public: + Transaction(pqxx::connection & connection) : transaction(connection) {} + + ~Transaction() { transaction.commit(); } + + T & getRef() { return transaction; } + + void exec(const String & query) { transaction.exec(query); } + +private: + T transaction; +}; + + class Connection { - public: - Connection( - const String & connection_str_, - const String & address_); + Connection(const ConnectionInfo & connection_info_); Connection(const Connection & other) = delete; @@ -38,20 +66,17 @@ public: bool isConnected() { return tryConnectIfNeeded(); } - const String & getConnectionString() { return connection_str; } + const ConnectionInfo & getConnectionInfo() { return connection_info; } private: void connectIfNeeded(); bool tryConnectIfNeeded(); - const std::string & getAddress() { return address; } - pqxx::ConnectionPtr connection; - std::string connection_str, address; + ConnectionInfo connection_info; }; -using ConnectionPtr = std::shared_ptr; class ConnectionHolder { diff --git a/src/Core/PostgreSQL/PostgreSQLConnectionPool.cpp b/src/Core/PostgreSQL/PostgreSQLConnectionPool.cpp index 42c716dcf14..f4a1c7f08f2 100644 --- a/src/Core/PostgreSQL/PostgreSQLConnectionPool.cpp +++ b/src/Core/PostgreSQL/PostgreSQLConnectionPool.cpp @@ -3,8 +3,6 @@ #endif #if USE_LIBPQXX -#include -#include #include "PostgreSQLConnectionPool.h" #include "PostgreSQLConnection.h" #include @@ -31,16 +29,14 @@ ConnectionPool::ConnectionPool( "New connection pool. Size: {}, blocks on empty pool: {}", pool_size, block_on_empty_pool); - address = host + ':' + std::to_string(port); - connection_str = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password)); + connection_info = formatConnectionString(std::move(dbname), std::move(host), port, std::move(user), std::move(password)); initialize(); } ConnectionPool::ConnectionPool(const ConnectionPool & other) : pool(std::make_shared(other.pool_size)) - , connection_str(other.connection_str) - , address(other.address) + , connection_info(other.connection_info) , pool_size(other.pool_size) , pool_wait_timeout(other.pool_wait_timeout) , block_on_empty_pool(other.block_on_empty_pool) @@ -53,20 +49,7 @@ void ConnectionPool::initialize() { /// No connection is made, just fill pool with non-connected connection objects. for (size_t i = 0; i < pool_size; ++i) - pool->push(std::make_shared(connection_str, address)); -} - - -std::string ConnectionPool::formatConnectionString( - std::string dbname, std::string host, UInt16 port, std::string user, std::string password) -{ - DB::WriteBufferFromOwnString out; - out << "dbname=" << DB::quote << dbname - << " host=" << DB::quote << host - << " port=" << port - << " user=" << DB::quote << user - << " password=" << DB::quote << password; - return out.str(); + pool->push(std::make_shared(connection_info)); } @@ -87,7 +70,7 @@ ConnectionHolderPtr ConnectionPool::get() return std::make_shared(connection, *pool); } - connection = std::make_shared(connection_str, address); + connection = std::make_shared(connection_info); return std::make_shared(connection, *pool); } diff --git a/src/Core/PostgreSQL/PostgreSQLConnectionPool.h b/src/Core/PostgreSQL/PostgreSQLConnectionPool.h index b9b2a50aa48..01ae21703d9 100644 --- a/src/Core/PostgreSQL/PostgreSQLConnectionPool.h +++ b/src/Core/PostgreSQL/PostgreSQLConnectionPool.h @@ -41,9 +41,6 @@ public: ConnectionHolderPtr get(); - static std::string formatConnectionString( - std::string dbname, std::string host, UInt16 port, std::string user, std::string password); - private: using Pool = ConcurrentBoundedQueue; using PoolPtr = std::shared_ptr; @@ -51,7 +48,7 @@ private: void initialize(); PoolPtr pool; - std::string connection_str, address; + ConnectionInfo connection_info; size_t pool_size; int64_t pool_wait_timeout; bool block_on_empty_pool; diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 6b9f90c5500..06b04a22b9f 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -287,7 +287,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const auto & password = safeGetLiteralValue(engine_args[3], engine_name); auto parsed_host_port = parseAddress(host_port, 5432); - auto connection_string = postgres::ConnectionPool::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password); + auto connection_info = postgres::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password); auto postgresql_replica_settings = std::make_unique(); @@ -296,7 +296,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String return std::make_shared( context, metadata_path, uuid, engine_define, - database_name, postgres_database_name, connection_string, + database_name, postgres_database_name, connection_info, std::move(postgresql_replica_settings)); } diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp index d6a02ca2cc9..7f808007ebc 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp @@ -41,12 +41,12 @@ DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL( const ASTStorage * database_engine_define_, const String & database_name_, const String & postgres_database_name, - const String & connection_string, + const postgres::ConnectionInfo & connection_info, std::unique_ptr settings_) : DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializePostgreSQL (" + database_name_ + ")", context_) , database_engine_define(database_engine_define_->clone()) , remote_database_name(postgres_database_name) - , connection(std::make_shared(connection_string, "")) + , connection(std::make_shared(connection_info)) , settings(std::move(settings_)) { } @@ -56,7 +56,7 @@ void DatabaseMaterializePostgreSQL::startSynchronization() { replication_handler = std::make_unique( remote_database_name, - connection->getConnectionString(), + connection->getConnectionInfo(), metadata_path + METADATA_SUFFIX, global_context, settings->postgresql_replica_max_block_size.value, diff --git a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h index b80ff4c5974..405bfd80283 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h @@ -36,7 +36,7 @@ public: const ASTStorage * database_engine_define_, const String & database_name_, const String & postgres_database_name, - const String & connection_string, + const postgres::ConnectionInfo & connection_info, std::unique_ptr settings_); String getEngineName() const override { return "MaterializePostgreSQL"; } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 1cca362ca35..cce84892ed6 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -19,7 +19,7 @@ static const auto reschedule_ms = 500; PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( const std::string & database_name_, - const std::string & conn_str, + const postgres::ConnectionInfo & connection_info_, const std::string & metadata_path_, const Context & context_, const size_t max_block_size_, @@ -29,13 +29,13 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( : log(&Poco::Logger::get("PostgreSQLReplicationHandler")) , context(context_) , database_name(database_name_) - , connection_str(conn_str) , metadata_path(metadata_path_) + , connection_info(connection_info_) , max_block_size(max_block_size_) , allow_minimal_ddl(allow_minimal_ddl_) , is_postgresql_replica_database_engine(is_postgresql_replica_database_engine_) , tables_list(tables_list_) - , connection(std::make_shared(conn_str, "")) + , connection(std::make_shared(connection_info_)) { replication_slot = fmt::format("{}_ch_replication_slot", database_name); publication_name = fmt::format("{}_ch_publication", database_name); @@ -63,14 +63,11 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart() { /// Will throw pqxx::broken_connection if no connection at the moment connection->get(); - startSynchronization(); } catch (const pqxx::broken_connection & pqxx_error) { - LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", - pqxx_error.what()); - + LOG_ERROR(log, "Unable to set up connection. Reconnection attempt will continue. Error message: {}", pqxx_error.what()); startup_task->scheduleAfter(reschedule_ms); } catch (...) @@ -92,20 +89,19 @@ void PostgreSQLReplicationHandler::startSynchronization() { createPublicationIfNeeded(connection->getRef()); - auto replication_connection = std::make_shared(fmt::format("{} replication=database", connection->getConnectionString()), ""); - replication_connection->get()->set_variable("default_transaction_isolation", "'repeatable read'"); - auto tx = std::make_shared(replication_connection->getRef()); + auto replication_connection = postgres::createReplicationConnection(connection_info); + postgres::Transaction tx(replication_connection->getRef()); std::string snapshot_name, start_lsn; auto initial_sync = [&]() { - createReplicationSlot(tx, start_lsn, snapshot_name); + createReplicationSlot(tx.getRef(), start_lsn, snapshot_name); loadFromSnapshot(snapshot_name, storages); }; /// Replication slot should be deleted with drop table only and created only once, reused after detach. - if (!isReplicationSlotExist(tx, replication_slot)) + if (!isReplicationSlotExist(tx.getRef(), replication_slot)) { initial_sync(); } @@ -114,12 +110,12 @@ void PostgreSQLReplicationHandler::startSynchronization() /// In case of some failure, the following cases are possible (since publication and replication slot are reused): /// 1. If replication slot exists and metadata file (where last synced version is written) does not exist, it is not ok. /// 2. If created a new publication and replication slot existed before it was created, it is not ok. - dropReplicationSlot(tx); + dropReplicationSlot(tx.getRef()); initial_sync(); } else { - LOG_TRACE(log, "Restoring tables..."); + LOG_TRACE(log, "Restoring {} tables...", storages.size()); for (const auto & [table_name, storage] : storages) { try @@ -135,8 +131,6 @@ void PostgreSQLReplicationHandler::startSynchronization() } } - tx->commit(); - consumer = std::make_shared( context, connection, @@ -226,10 +220,10 @@ void PostgreSQLReplicationHandler::consumerFunc() } -bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr tx) +bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx) { std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name); - pqxx::result result{tx->exec(query_str)}; + pqxx::result result{tx.exec(query_str)}; assert(!result.empty()); bool publication_exists = (result[0][0].as() == "t"); @@ -245,9 +239,9 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection & if (new_publication_created) return; - auto tx = std::make_shared(connection_); + postgres::Transaction tx(connection_); - if (!isPublicationExist(tx)) + if (!isPublicationExist(tx.getRef())) { if (tables_list.empty()) { @@ -263,7 +257,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection & std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, tables_list); try { - tx->exec(query_str); + tx.exec(query_str); new_publication_created = true; LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list); } @@ -273,15 +267,13 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection & throw; } } - - tx->commit(); } -bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr tx, std::string & slot_name) +bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name) { std::string query_str = fmt::format("SELECT active, restart_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name); - pqxx::result result{tx->exec(query_str)}; + pqxx::result result{tx.exec(query_str)}; /// Replication slot does not exist if (result.empty()) @@ -296,7 +288,7 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr tx, void PostgreSQLReplicationHandler::createReplicationSlot( - NontransactionPtr tx, std::string & start_lsn, std::string & snapshot_name, bool temporary) + pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary) { std::string query_str; @@ -310,7 +302,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot( try { - pqxx::result result{tx->exec(query_str)}; + pqxx::result result{tx.exec(query_str)}; start_lsn = result[0][1].as(); snapshot_name = result[0][2].as(); LOG_TRACE(log, "Created replication slot: {}, start lsn: {}", replication_slot, start_lsn); @@ -323,7 +315,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot( } -void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr tx, bool temporary) +void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx, bool temporary) { std::string slot_name; if (temporary) @@ -333,15 +325,15 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr tx, boo std::string query_str = fmt::format("SELECT pg_drop_replication_slot('{}')", slot_name); - tx->exec(query_str); + tx.exec(query_str); LOG_TRACE(log, "Dropped replication slot: {}", slot_name); } -void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr tx) +void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx) { std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name); - tx->exec(query_str); + tx.exec(query_str); } @@ -350,14 +342,12 @@ void PostgreSQLReplicationHandler::shutdownFinal() if (Poco::File(metadata_path).exists()) Poco::File(metadata_path).remove(); - connection = std::make_shared(connection_str, ""); - auto tx = std::make_shared(connection->getRef()); + connection = std::make_shared(connection_info); + postgres::Transaction tx(connection->getRef()); - dropPublication(tx); - if (isReplicationSlotExist(tx, replication_slot)) - dropReplicationSlot(tx); - - tx->commit(); + dropPublication(tx.getRef()); + if (isReplicationSlotExist(tx.getRef(), replication_slot)) + dropReplicationSlot(tx.getRef()); } @@ -379,9 +369,9 @@ NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::connectio { std::string query = fmt::format("SELECT tablename FROM pg_publication_tables WHERE pubname = '{}'", publication_name); std::unordered_set tables; - pqxx::read_transaction tx(connection_); + postgres::Transaction tx(connection_); - for (auto table_name : tx.stream(query)) + for (auto table_name : tx.getRef().stream(query)) tables.insert(std::get<0>(table_name)); return tables; @@ -405,7 +395,6 @@ std::unordered_map PostgreSQLReplicationHandler::reloadFromSnapsh std::unordered_map tables_start_lsn; try { - auto tx = std::make_shared(connection->getRef()); Storages sync_storages; for (const auto & relation : relation_data) { @@ -414,17 +403,14 @@ std::unordered_map PostgreSQLReplicationHandler::reloadFromSnapsh sync_storages[table_name] = storage; storage->dropNested(); } - tx->commit(); - auto replication_connection = std::make_shared(fmt::format("{} replication=database", connection_str), ""); - replication_connection->get()->set_variable("default_transaction_isolation", "'repeatable read'"); + auto replication_connection = postgres::createReplicationConnection(connection_info); + postgres::Transaction tx(replication_connection->getRef()); - auto r_tx = std::make_shared(replication_connection->getRef()); std::string snapshot_name, start_lsn; - createReplicationSlot(r_tx, start_lsn, snapshot_name, true); + createReplicationSlot(tx.getRef(), start_lsn, snapshot_name, true); /// This snapshot is valid up to the end of the transaction, which exported it. auto success_tables = loadFromSnapshot(snapshot_name, sync_storages); - r_tx->commit(); for (const auto & relation : relation_data) { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 0aa165bd183..147c0c7b114 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -28,7 +28,7 @@ class PostgreSQLReplicationHandler public: PostgreSQLReplicationHandler( const std::string & database_name_, - const std::string & conn_str_, + const postgres::ConnectionInfo & connection_info_, const std::string & metadata_path_, const Context & context_, const size_t max_block_size_, @@ -38,29 +38,31 @@ public: void startup(); + /// Stop replication without cleanup. void shutdown(); + /// Clean up replication: remove publication and replication slots. void shutdownFinal(); void addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage); + /// Fetch list of tables which are going to be replicated. Used for database engine. NameSet fetchRequiredTables(pqxx::connection & connection_); private: - using NontransactionPtr = std::shared_ptr; using Storages = std::unordered_map; - bool isPublicationExist(std::shared_ptr tx); - - bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name); - void createPublicationIfNeeded(pqxx::connection & connection_); - void createReplicationSlot(NontransactionPtr ntx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false); + bool isPublicationExist(pqxx::work & tx); - void dropReplicationSlot(NontransactionPtr tx, bool temporary = false); + bool isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name); - void dropPublication(NontransactionPtr ntx); + void createReplicationSlot(pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false); + + void dropReplicationSlot(pqxx::nontransaction & tx, bool temporary = false); + + void dropPublication(pqxx::nontransaction & ntx); void waitConnectionAndStart(); @@ -78,19 +80,48 @@ private: Poco::Logger * log; const Context & context; - const std::string database_name, connection_str, metadata_path; + + /// Remote database name. + const String database_name; + + /// Path for replication metadata. + const String metadata_path; + + /// Connection string and address for logs. + postgres::ConnectionInfo connection_info; + + /// max_block_size for replication stream. const size_t max_block_size; - bool allow_minimal_ddl, is_postgresql_replica_database_engine; - std::string tables_list, replication_slot, publication_name; + + /// Table structure changes are always tracked. By default, table with changed schema will get into a skip list. + bool allow_minimal_ddl = false; + + /// To distinguish whether current replication handler belongs to a MaterializePostgreSQL database engine or single storage. + bool is_postgresql_replica_database_engine; + + /// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated. + String tables_list; + + String replication_slot, publication_name; postgres::ConnectionPtr connection; + + /// Replication consumer. Manages deconding of replication stream and syncing into tables. std::shared_ptr consumer; BackgroundSchedulePool::TaskHolder startup_task, consumer_task; - std::atomic tables_loaded = false, stop_synchronization = false; + + std::atomic stop_synchronization = false; + + /// For database engine there are 2 places where it is checked for publication: + /// 1. to fetch tables list from already created publication when database is loaded + /// 2. at replication startup bool new_publication_created = false; + /// MaterializePostgreSQL tables. Used for managing all operations with its internal nested tables. Storages storages; + + /// List of nested tables, which is passed to replication consumer. std::unordered_map nested_storages; }; diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp index 7002d7d8c99..65b38086db9 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp @@ -42,7 +42,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( const StorageID & table_id_, const String & remote_database_name, const String & remote_table_name_, - const String & connection_str, + const postgres::ConnectionInfo & connection_info, const StorageInMemoryMetadata & storage_metadata, const Context & context_, std::unique_ptr replication_settings_) @@ -60,7 +60,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( replication_handler = std::make_unique( remote_database_name, - connection_str, + connection_info, metadata_path, global_context, replication_settings->postgresql_replica_max_block_size.value, @@ -445,7 +445,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory) const String & remote_database = engine_args[1]->as().value.safeGet(); /// No connection is made here, see Storages/PostgreSQL/PostgreSQLConnection.cpp - auto connection_string = postgres::ConnectionPool::formatConnectionString( + auto connection_info = postgres::formatConnectionString( remote_database, parsed_host_port.first, parsed_host_port.second, @@ -453,7 +453,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory) engine_args[4]->as().value.safeGet()); return StorageMaterializePostgreSQL::create( - args.table_id, remote_database, remote_table, connection_string, + args.table_id, remote_database, remote_table, connection_info, metadata, args.context, std::move(postgresql_replication_settings)); }; diff --git a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h index feba216b4c4..5bbea64133a 100644 --- a/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h @@ -76,7 +76,7 @@ protected: const StorageID & table_id_, const String & remote_database_name, const String & remote_table_name, - const String & connection_str, + const postgres::ConnectionInfo & connection_info, const StorageInMemoryMetadata & storage_metadata, const Context & context_, std::unique_ptr replication_settings_);