diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 984a9cdd47a..a406ae62693 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -278,9 +278,8 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ { postgres::Connection connection(connection_info); pqxx::nontransaction tx(connection.getRef()); - auto table_structure = std::make_unique(fetchPostgreSQLTableStructure(tx, table_name, postgres_schema, true, true, true)); - if (!table_structure) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get PostgreSQL table structure"); + auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name); + auto table_structure = std::make_unique(fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true)); auto table_override = tryGetTableOverride(current_database_name, table_name); return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as() : nullptr); @@ -516,17 +515,25 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx) void PostgreSQLReplicationHandler::addTableToPublication(pqxx::nontransaction & ntx, const String & table_name) { - std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteString(table_name)); + std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteWithSchema(table_name)); ntx.exec(query_str); - LOG_TRACE(log, "Added table `{}` to publication `{}`", table_name, publication_name); + LOG_TRACE(log, "Added table {} to publication `{}`", doubleQuoteWithSchema(table_name), publication_name); } void PostgreSQLReplicationHandler::removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name) { - std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteString(table_name)); - ntx.exec(query_str); - LOG_TRACE(log, "Removed table `{}` from publication `{}`", table_name, publication_name); + try + { + std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteWithSchema(table_name)); + ntx.exec(query_str); + LOG_TRACE(log, "Removed table `{}` from publication `{}`", doubleQuoteWithSchema(table_name), publication_name); + } + catch (const pqxx::undefined_table &) + { + /// Removing table from replication must succeed even if table does not exist in PostgreSQL. + LOG_WARNING(log, "Did not remove table {} from publication, because table does not exist in PostgreSQL", doubleQuoteWithSchema(table_name), publication_name); + } } diff --git a/tests/integration/test_postgresql_replica_database_engine_2/test.py b/tests/integration/test_postgresql_replica_database_engine_2/test.py index c8b63d8e667..725158920a1 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_2/test.py @@ -178,7 +178,7 @@ def assert_number_of_columns(expected, table_name, database_name='test_database' def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''): assert_nested_table_is_created(table_name, materialized_database, schema_name) - print("Checking table is synchronized:", table_name) + print(f"Checking table is synchronized. Table name: {table_name}, table schema: {schema_name}") expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by)) if len(schema_name) == 0: result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by)) @@ -356,6 +356,11 @@ def test_remove_table_from_replication(started_cluster): for i in range(NUM_TABLES): cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + # Removing from replication table which does not exist in PostgreSQL must be ok. + instance.query('DETACH TABLE test_database.postgresql_replica_0'); + assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL") + drop_materialized_db() + def test_predefined_connection_configuration(started_cluster): drop_materialized_db() @@ -379,6 +384,7 @@ def test_database_with_single_non_default_schema(started_cluster): NUM_TABLES=5 schema_name = 'test_schema' + materialized_db = 'test_database' clickhouse_postgres_db = 'postgres_database_with_schema' global insert_counter insert_counter = 0 @@ -430,6 +436,14 @@ def test_database_with_single_non_default_schema(started_cluster): instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)") assert_number_of_columns(3, f'postgresql_replica_{altered_table}') check_tables_are_synchronized(f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db); + + print('DETACH-ATTACH') + detached_table_name = "postgresql_replica_1" + instance.query(f"DETACH TABLE {materialized_db}.{detached_table_name}") + assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL") == False + instance.query(f"ATTACH TABLE {materialized_db}.{detached_table_name}") + check_tables_are_synchronized(f"{detached_table_name}", postgres_database=clickhouse_postgres_db); + drop_materialized_db() @@ -440,6 +454,7 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster): NUM_TABLES = 5 schema_name = 'test_schema' clickhouse_postgres_db = 'postgres_database_with_schema' + materialized_db = 'test_database' publication_tables = '' global insert_counter insert_counter = 0 @@ -494,6 +509,15 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster): instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)") assert_number_of_columns(3, f'{schema_name}.postgresql_replica_{altered_table}') check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db); + + print('DETACH-ATTACH') + detached_table_name = "postgresql_replica_1" + instance.query(f"DETACH TABLE {materialized_db}.`{schema_name}.{detached_table_name}`") + assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL") == False + instance.query(f"ATTACH TABLE {materialized_db}.`{schema_name}.{detached_table_name}`") + assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n") + check_tables_are_synchronized(f"{detached_table_name}", schema_name=schema_name, postgres_database=clickhouse_postgres_db); + drop_materialized_db() @@ -504,6 +528,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster): NUM_TABLES = 2 schemas_num = 2 schema_list = 'schema0, schema1' + materialized_db = 'test_database' global insert_counter insert_counter = 0 @@ -557,11 +582,23 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster): print('ALTER') altered_schema = random.randint(0, schemas_num-1) altered_table = random.randint(0, NUM_TABLES-1) + clickhouse_postgres_db = f'clickhouse_postgres_db{altered_schema}' cursor.execute(f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer") instance.query(f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)") assert_number_of_columns(3, f'schema{altered_schema}.postgresql_replica_{altered_table}') - check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db); + check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=f"schema{altered_schema}", postgres_database=clickhouse_postgres_db); + + print('DETACH-ATTACH') + detached_table_name = "postgresql_replica_1" + detached_table_schema = "schema0" + clickhouse_postgres_db = f'clickhouse_postgres_db0' + instance.query(f"DETACH TABLE {materialized_db}.`{detached_table_schema}.{detached_table_name}`") + assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL") == False + instance.query(f"ATTACH TABLE {materialized_db}.`{detached_table_schema}.{detached_table_name}`") + assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n") + check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=detached_table_schema, postgres_database=clickhouse_postgres_db); + drop_materialized_db()