Merge pull request #33179 from kssenii/materialized-postgresql-fix-detach-with-schema

This commit is contained in:
Vladimir C 2021-12-27 11:42:59 +03:00 committed by GitHub
commit ea5f867289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 10 deletions

View File

@ -278,9 +278,8 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
{
postgres::Connection connection(connection_info);
pqxx::nontransaction tx(connection.getRef());
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, postgres_schema, true, true, true));
if (!table_structure)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get PostgreSQL table structure");
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true));
auto table_override = tryGetTableOverride(current_database_name, table_name);
return storage->getCreateNestedTableQuery(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
@ -516,17 +515,25 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
void PostgreSQLReplicationHandler::addTableToPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteString(table_name));
std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteWithSchema(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Added table `{}` to publication `{}`", table_name, publication_name);
LOG_TRACE(log, "Added table {} to publication `{}`", doubleQuoteWithSchema(table_name), publication_name);
}
void PostgreSQLReplicationHandler::removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteString(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Removed table `{}` from publication `{}`", table_name, publication_name);
try
{
std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteWithSchema(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Removed table `{}` from publication `{}`", doubleQuoteWithSchema(table_name), publication_name);
}
catch (const pqxx::undefined_table &)
{
/// Removing table from replication must succeed even if table does not exist in PostgreSQL.
LOG_WARNING(log, "Did not remove table {} from publication, because table does not exist in PostgreSQL", doubleQuoteWithSchema(table_name), publication_name);
}
}

View File

@ -178,7 +178,7 @@ def assert_number_of_columns(expected, table_name, database_name='test_database'
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(table_name, materialized_database, schema_name)
print("Checking table is synchronized:", table_name)
print(f"Checking table is synchronized. Table name: {table_name}, table schema: {schema_name}")
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
@ -356,6 +356,11 @@ def test_remove_table_from_replication(started_cluster):
for i in range(NUM_TABLES):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
# Removing from replication table which does not exist in PostgreSQL must be ok.
instance.query('DETACH TABLE test_database.postgresql_replica_0');
assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
drop_materialized_db()
def test_predefined_connection_configuration(started_cluster):
drop_materialized_db()
@ -379,6 +384,7 @@ def test_database_with_single_non_default_schema(started_cluster):
NUM_TABLES=5
schema_name = 'test_schema'
materialized_db = 'test_database'
clickhouse_postgres_db = 'postgres_database_with_schema'
global insert_counter
insert_counter = 0
@ -430,6 +436,14 @@ def test_database_with_single_non_default_schema(started_cluster):
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
instance.query(f"DETACH TABLE {materialized_db}.{detached_table_name}")
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.{detached_table_name}")
check_tables_are_synchronized(detached_table_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
@ -440,6 +454,7 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
NUM_TABLES = 5
schema_name = 'test_schema'
clickhouse_postgres_db = 'postgres_database_with_schema'
materialized_db = 'test_database'
publication_tables = ''
global insert_counter
insert_counter = 0
@ -494,6 +509,15 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
instance.query(f"DETACH TABLE {materialized_db}.`{schema_name}.{detached_table_name}`")
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.`{schema_name}.{detached_table_name}`")
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
check_tables_are_synchronized(detached_table_name, schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
@ -504,6 +528,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
NUM_TABLES = 2
schemas_num = 2
schema_list = 'schema0, schema1'
materialized_db = 'test_database'
global insert_counter
insert_counter = 0
@ -557,11 +582,23 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
print('ALTER')
altered_schema = random.randint(0, schemas_num-1)
altered_table = random.randint(0, NUM_TABLES-1)
clickhouse_postgres_db = f'clickhouse_postgres_db{altered_schema}'
cursor.execute(f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer")
instance.query(f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)")
assert_number_of_columns(3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=f"schema{altered_schema}", postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
detached_table_schema = "schema0"
clickhouse_postgres_db = f'clickhouse_postgres_db0'
instance.query(f"DETACH TABLE {materialized_db}.`{detached_table_schema}.{detached_table_name}`")
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.`{detached_table_schema}.{detached_table_name}`")
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=detached_table_schema, postgres_database=clickhouse_postgres_db);
drop_materialized_db()