diff --git a/src/Core/PostgreSQL/Connection.cpp b/src/Core/PostgreSQL/Connection.cpp index ad54bbe9dca..c423d75981e 100644 --- a/src/Core/PostgreSQL/Connection.cpp +++ b/src/Core/PostgreSQL/Connection.cpp @@ -23,6 +23,7 @@ void Connection::execWithRetry(const std::function { pqxx::nontransaction tx(getRef()); exec(tx); + break; } catch (const pqxx::broken_connection & e) { diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 802d50d11c2..48b923c4756 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -293,7 +293,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String postgresql_replica_settings->loadFromQuery(*engine_define); return std::make_shared( - context, metadata_path, uuid, engine_define, + context, metadata_path, uuid, engine_define, create.attach, database_name, postgres_database_name, connection_info, std::move(postgresql_replica_settings)); } diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index 256affc68c8..742eb28c7a4 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -37,12 +37,14 @@ DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL( const String & metadata_path_, UUID uuid_, const ASTStorage * database_engine_define_, + bool is_attach_, const String & database_name_, const String & postgres_database_name, const postgres::ConnectionInfo & connection_info_, std::unique_ptr settings_) : DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_) , database_engine_define(database_engine_define_->clone()) + , is_attach(is_attach_) , remote_database_name(postgres_database_name) , connection_info(connection_info_) , settings(std::move(settings_)) @@ -58,6 +60,7 @@ void DatabaseMaterializedPostgreSQL::startSynchronization() database_name, connection_info, getContext(), + is_attach, settings->materialized_postgresql_max_block_size.value, settings->materialized_postgresql_allow_automatic_update, /* is_materialized_postgresql_database = */ true, diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h index f998a0c54de..7ca84f079ed 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h @@ -33,6 +33,7 @@ public: const String & metadata_path_, UUID uuid_, const ASTStorage * database_engine_define_, + bool is_attach_, const String & database_name_, const String & postgres_database_name, const postgres::ConnectionInfo & connection_info, @@ -63,6 +64,7 @@ private: void startSynchronization(); ASTPtr database_engine_define; + bool is_attach; String remote_database_name; postgres::ConnectionInfo connection_info; std::unique_ptr settings; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 67026d345eb..4c614d8fd5a 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -29,12 +29,14 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( const String & current_database_name_, const postgres::ConnectionInfo & connection_info_, ContextPtr context_, + bool is_attach_, const size_t max_block_size_, bool allow_automatic_update_, bool is_materialized_postgresql_database_, const String tables_list_) : log(&Poco::Logger::get("PostgreSQLReplicationHandler")) , context(context_) + , is_attach(is_attach_) , remote_database_name(remote_database_name_) , current_database_name(current_database_name_) , connection_info(connection_info_) @@ -145,10 +147,8 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) { initial_sync(); } - /// Replication slot depends on publication, so if replication slot exists and new - /// publication was just created - drop that replication slot and start from scratch. - /// TODO: tests - else if (new_publication_created) + /// Always drop replication slot if it is CREATE query and not ATTACH. + else if (!is_attach || new_publication) { dropReplicationSlot(tx); initial_sync(); @@ -285,22 +285,25 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx) std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name); pqxx::result result{tx.exec(query_str)}; assert(!result.empty()); - bool publication_exists = (result[0][0].as() == "t"); - - if (publication_exists) - LOG_INFO(log, "Publication {} already exists. Using existing version", publication_name); - - return publication_exists; + return result[0][0].as() == "t"; } -void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check) +void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx) { - /// For database engine a publication can be created earlier than in startReplication(). - if (new_publication_created) - return; + auto publication_exists = isPublicationExist(tx); - if (create_without_check || !isPublicationExist(tx)) + if (!is_attach && publication_exists) + { + /// This is a case for single Materialized storage. In case of database engine this check is done in advance. + LOG_WARNING(log, + "Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped", + publication_name); + + connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); + } + + if (!is_attach || !publication_exists) { if (tables_list.empty()) { @@ -320,8 +323,8 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo try { tx.exec(query_str); - new_publication_created = true; LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list); + new_publication = true; } catch (Exception & e) { @@ -329,6 +332,10 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bo throw; } } + else + { + LOG_TRACE(log, "Using existing publication ({}) version", publication_name); + } } @@ -401,6 +408,7 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx) { std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name); tx.exec(query_str); + LOG_TRACE(log, "Dropped publication: {}", publication_name); } @@ -438,9 +446,11 @@ void PostgreSQLReplicationHandler::shutdownFinal() NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection & connection_) { pqxx::work tx(connection_.getRef()); - bool publication_exists_before_startup = isPublicationExist(tx); NameSet result_tables; + bool publication_exists_before_startup = isPublicationExist(tx); + LOG_DEBUG(log, "Publication exists: {}, is attach: {}", publication_exists_before_startup, is_attach); + Strings expected_tables; if (!tables_list.empty()) { @@ -453,49 +463,58 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection & if (publication_exists_before_startup) { - if (tables_list.empty()) + if (!is_attach) { - /// There is no tables list, but publication already exists, then the expected behaviour - /// is to replicate the whole database. But it could be a server restart, so we can't drop it. LOG_WARNING(log, - "Publication {} already exists and tables list is empty. Assuming publication is correct", + "Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped", publication_name); - result_tables = fetchPostgreSQLTablesList(tx); + connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); } - /// Check tables list from publication is the same as expected tables list. - /// If not - drop publication and return expected tables list. else { - result_tables = fetchTablesFromPublication(tx); - NameSet diff; - std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(), - result_tables.begin(), result_tables.end(), - std::inserter(diff, diff.begin())); - if (!diff.empty()) + if (tables_list.empty()) { - String diff_tables; - for (const auto & table_name : diff) - { - if (!diff_tables.empty()) - diff_tables += ", "; - diff_tables += table_name; - } - LOG_WARNING(log, - "Publication {} already exists, but specified tables list differs from publication tables list in tables: {}", - publication_name, diff_tables); + "Publication {} already exists and tables list is empty. Assuming publication is correct.", + publication_name); - connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); + result_tables = fetchPostgreSQLTablesList(tx); + } + /// Check tables list from publication is the same as expected tables list. + /// If not - drop publication and return expected tables list. + else + { + result_tables = fetchTablesFromPublication(tx); + NameSet diff; + std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(), + result_tables.begin(), result_tables.end(), + std::inserter(diff, diff.begin())); + if (!diff.empty()) + { + String diff_tables; + for (const auto & table_name : diff) + { + if (!diff_tables.empty()) + diff_tables += ", "; + diff_tables += table_name; + } + + LOG_WARNING(log, + "Publication {} already exists, but specified tables list differs from publication tables list in tables: {}.", + publication_name, diff_tables); + + connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); + } } } } - else + + if (result_tables.empty()) { if (!tables_list.empty()) { - tx.commit(); - return NameSet(expected_tables.begin(), expected_tables.end()); + result_tables = NameSet(expected_tables.begin(), expected_tables.end()); } else { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 4b6321338b8..95ac12b3786 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -24,6 +24,7 @@ public: const String & current_database_name_, const postgres::ConnectionInfo & connection_info_, ContextPtr context_, + bool is_attach_, const size_t max_block_size_, bool allow_automatic_update_, bool is_materialized_postgresql_database_, @@ -54,7 +55,7 @@ private: bool isPublicationExist(pqxx::work & tx); - void createPublicationIfNeeded(pqxx::work & tx, bool create_without_check = false); + void createPublicationIfNeeded(pqxx::work & tx); NameSet fetchTablesFromPublication(pqxx::work & tx); @@ -83,6 +84,12 @@ private: Poco::Logger * log; ContextPtr context; + /// If it is not attach, i.e. a create query, then if publication already exists - always drop it. + bool is_attach; + + /// If new publication is created at start up - always drop replication slot if it exists. + bool new_publication = false; + const String remote_database_name, current_database_name; /// Connection string and address for logs. @@ -113,11 +120,6 @@ private: std::atomic stop_synchronization = false; - /// For database engine there are 2 places where it is checked for publication: - /// 1. to fetch tables list from already created publication when database is loaded - /// 2. at replication startup - bool new_publication_created = false; - /// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables. MaterializedStorages materialized_storages; diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp index 252059f606d..78f7fefd5dc 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp @@ -70,6 +70,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL( table_id_.database_name, connection_info, getContext(), + is_attach, replication_settings->materialized_postgresql_max_block_size.value, /* allow_automatic_update */ false, /* is_materialized_postgresql_database */false); } diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 2617a7ade40..162ffc53e20 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1068,7 +1068,7 @@ class ClickHouseCluster: logging.error("Can't connect to MySQL:{}".format(errors)) raise Exception("Cannot wait MySQL container") - def wait_postgres_to_start(self, timeout=180): + def wait_postgres_to_start(self, timeout=260): self.postgres_ip = self.get_instance_ip(self.postgres_host) start = time.time() while time.time() - start < timeout: