diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index 35222080776..9c6eeceb605 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -21,6 +21,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR; + extern const int BAD_ARGUMENTS; } MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( diff --git a/tests/integration/test_postgresql_replica_database_engine_1/test.py b/tests/integration/test_postgresql_replica_database_engine_1/test.py index 0b3fb68708a..f7d33143a82 100644 --- a/tests/integration/test_postgresql_replica_database_engine_1/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_1/test.py @@ -389,7 +389,6 @@ def test_table_schema_changes(started_cluster): pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, - settings=["materialized_postgresql_allow_automatic_update = 1"], ) for i in range(NUM_TABLES): @@ -407,37 +406,16 @@ def test_table_schema_changes(started_cluster): altered_idx = random.randint(0, 4) altered_table = f"postgresql_replica_{altered_idx}" - cursor.execute(f"ALTER TABLE {altered_table} DROP COLUMN value2") + prev_count = int(instance.query(f"SELECT count() FROM test_database.{altered_table}")) + cursor.execute(f"ALTER TABLE {altered_table} DROP COLUMN value2") for i in range(NUM_TABLES): cursor.execute(f"INSERT INTO postgresql_replica_{i} VALUES (50, {i}, {i})") - cursor.execute(f"UPDATE {altered_table} SET value3 = 12 WHERE key%2=0") - time.sleep(2) - assert_nested_table_is_created(instance, altered_table) - assert_number_of_columns(instance, 3, altered_table) - check_tables_are_synchronized(instance, altered_table) - print("check1 OK") - - check_several_tables_are_synchronized(instance, NUM_TABLES) - - for i in range(NUM_TABLES): - if i != altered_idx: - instance.query( - "INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format( - i, i, i, i - ) - ) - else: - instance.query( - "INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format( - i, i, i - ) - ) - - check_tables_are_synchronized(instance, altered_table) - print("check2 OK") - check_several_tables_are_synchronized(instance, NUM_TABLES) + assert instance.wait_for_log_line( + f"Table postgresql_replica_{altered_idx} is skipped from replication stream" + ) + assert prev_count == int(instance.query(f"SELECT count() FROM test_database.{altered_table}")) def test_many_concurrent_queries(started_cluster): @@ -585,7 +563,6 @@ def test_virtual_columns(started_cluster): pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, - settings=["materialized_postgresql_allow_automatic_update = 1"], ) assert_nested_table_is_created(instance, table_name) diff --git a/tests/integration/test_postgresql_replica_database_engine_2/configs/log_conf.xml b/tests/integration/test_postgresql_replica_database_engine_2/configs/log_conf.xml index c42a3aba833..6cc1128e130 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/configs/log_conf.xml +++ b/tests/integration/test_postgresql_replica_database_engine_2/configs/log_conf.xml @@ -15,7 +15,6 @@ postgres1 5432 postgres_database - test_table
postgres @@ -23,7 +22,6 @@ postgres1 1111 postgres_database - test_table
diff --git a/tests/integration/test_postgresql_replica_database_engine_2/test.py b/tests/integration/test_postgresql_replica_database_engine_2/test.py index 6452803c36e..68c7cb96b71 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_2/test.py @@ -92,10 +92,7 @@ def test_add_new_table_to_replication(started_cluster): result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(" ) # Check without ip - assert ( - result[-59:] - == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n" - ) + assert result[-51:] == "\\'postgres_database\\', \\'postgres\\', \\'[HIDDEN]\\')\n" result = instance.query_and_get_error( "ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'" @@ -201,10 +198,7 @@ def test_remove_table_from_replication(started_cluster): result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(" ) - assert ( - result[-59:] - == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n" - ) + assert result[-51:] == "\\'postgres_database\\', \\'postgres\\', \\'[HIDDEN]\\')\n" table_name = "postgresql_replica_4" instance.query(f"DETACH TABLE test_database.{table_name} PERMANENTLY") @@ -363,8 +357,12 @@ def test_database_with_single_non_default_schema(started_cluster): f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)" ) - assert instance.wait_for_log_line(f"Table postgresql_replica_{altered_table} is skipped from replication stream") - instance.query(f"DETACH TABLE test_database.postgresql_replica_{altered_table} PERMANENTLY") + assert instance.wait_for_log_line( + f"Table postgresql_replica_{altered_table} is skipped from replication stream" + ) + instance.query( + f"DETACH TABLE test_database.postgresql_replica_{altered_table} PERMANENTLY" + ) assert not instance.contains_in_log( "from publication, because table does not exist in PostgreSQL" ) @@ -464,8 +462,12 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster): f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)" ) - assert instance.wait_for_log_line(f"Table test_schema.postgresql_replica_{altered_table} is skipped from replication stream") - altered_materialized_table = f"{materialized_db}.`test_schema.postgresql_replica_{altered_table}`" + assert instance.wait_for_log_line( + f"Table test_schema.postgresql_replica_{altered_table} is skipped from replication stream" + ) + altered_materialized_table = ( + f"{materialized_db}.`test_schema.postgresql_replica_{altered_table}`" + ) instance.query(f"DETACH TABLE {altered_materialized_table} PERMANENTLY") assert not instance.contains_in_log( "from publication, because table does not exist in PostgreSQL" @@ -572,9 +574,13 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster): f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)" ) - assert instance.wait_for_log_line(f"Table schema{altered_schema}.postgresql_replica_{altered_table} is skipped from replication stream") + assert instance.wait_for_log_line( + f"Table schema{altered_schema}.postgresql_replica_{altered_table} is skipped from replication stream" + ) - altered_materialized_table = f"{materialized_db}.`schema{altered_schema}.postgresql_replica_{altered_table}`" + altered_materialized_table = ( + f"{materialized_db}.`schema{altered_schema}.postgresql_replica_{altered_table}`" + ) instance.query(f"DETACH TABLE {altered_materialized_table} PERMANENTLY") assert not instance.contains_in_log( "from publication, because table does not exist in PostgreSQL"