Fix remaining tests

This commit is contained in:
kssenii 2023-02-16 17:22:29 +01:00
parent 75d62ee24a
commit 417052e4b2
4 changed files with 27 additions and 45 deletions

View File

@ -21,6 +21,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR;
extern const int BAD_ARGUMENTS;
}
MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(

View File

@ -389,7 +389,6 @@ def test_table_schema_changes(started_cluster):
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialized_postgresql_allow_automatic_update = 1"],
)
for i in range(NUM_TABLES):
@ -407,37 +406,16 @@ def test_table_schema_changes(started_cluster):
altered_idx = random.randint(0, 4)
altered_table = f"postgresql_replica_{altered_idx}"
cursor.execute(f"ALTER TABLE {altered_table} DROP COLUMN value2")
prev_count = int(instance.query(f"SELECT count() FROM test_database.{altered_table}"))
cursor.execute(f"ALTER TABLE {altered_table} DROP COLUMN value2")
for i in range(NUM_TABLES):
cursor.execute(f"INSERT INTO postgresql_replica_{i} VALUES (50, {i}, {i})")
cursor.execute(f"UPDATE {altered_table} SET value3 = 12 WHERE key%2=0")
time.sleep(2)
assert_nested_table_is_created(instance, altered_table)
assert_number_of_columns(instance, 3, altered_table)
check_tables_are_synchronized(instance, altered_table)
print("check1 OK")
check_several_tables_are_synchronized(instance, NUM_TABLES)
for i in range(NUM_TABLES):
if i != altered_idx:
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(
i, i, i, i
)
)
else:
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(
i, i, i
)
)
check_tables_are_synchronized(instance, altered_table)
print("check2 OK")
check_several_tables_are_synchronized(instance, NUM_TABLES)
assert instance.wait_for_log_line(
f"Table postgresql_replica_{altered_idx} is skipped from replication stream"
)
assert prev_count == int(instance.query(f"SELECT count() FROM test_database.{altered_table}"))
def test_many_concurrent_queries(started_cluster):
@ -585,7 +563,6 @@ def test_virtual_columns(started_cluster):
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialized_postgresql_allow_automatic_update = 1"],
)
assert_nested_table_is_created(instance, table_name)

View File

@ -15,7 +15,6 @@
<host>postgres1</host>
<port>5432</port>
<database>postgres_database</database>
<table>test_table</table>
</postgres1>
<postgres2>
<user>postgres</user>
@ -23,7 +22,6 @@
<host>postgres1</host>
<port>1111</port>
<database>postgres_database</database>
<table>test_table</table>
</postgres2>
</named_collections>
</clickhouse>

View File

@ -92,10 +92,7 @@ def test_add_new_table_to_replication(started_cluster):
result[:63]
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
) # Check without ip
assert (
result[-59:]
== "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n"
)
assert result[-51:] == "\\'postgres_database\\', \\'postgres\\', \\'[HIDDEN]\\')\n"
result = instance.query_and_get_error(
"ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'"
@ -201,10 +198,7 @@ def test_remove_table_from_replication(started_cluster):
result[:63]
== "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL("
)
assert (
result[-59:]
== "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n"
)
assert result[-51:] == "\\'postgres_database\\', \\'postgres\\', \\'[HIDDEN]\\')\n"
table_name = "postgresql_replica_4"
instance.query(f"DETACH TABLE test_database.{table_name} PERMANENTLY")
@ -363,8 +357,12 @@ def test_database_with_single_non_default_schema(started_cluster):
f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)"
)
assert instance.wait_for_log_line(f"Table postgresql_replica_{altered_table} is skipped from replication stream")
instance.query(f"DETACH TABLE test_database.postgresql_replica_{altered_table} PERMANENTLY")
assert instance.wait_for_log_line(
f"Table postgresql_replica_{altered_table} is skipped from replication stream"
)
instance.query(
f"DETACH TABLE test_database.postgresql_replica_{altered_table} PERMANENTLY"
)
assert not instance.contains_in_log(
"from publication, because table does not exist in PostgreSQL"
)
@ -464,8 +462,12 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)"
)
assert instance.wait_for_log_line(f"Table test_schema.postgresql_replica_{altered_table} is skipped from replication stream")
altered_materialized_table = f"{materialized_db}.`test_schema.postgresql_replica_{altered_table}`"
assert instance.wait_for_log_line(
f"Table test_schema.postgresql_replica_{altered_table} is skipped from replication stream"
)
altered_materialized_table = (
f"{materialized_db}.`test_schema.postgresql_replica_{altered_table}`"
)
instance.query(f"DETACH TABLE {altered_materialized_table} PERMANENTLY")
assert not instance.contains_in_log(
"from publication, because table does not exist in PostgreSQL"
@ -572,9 +574,13 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)"
)
assert instance.wait_for_log_line(f"Table schema{altered_schema}.postgresql_replica_{altered_table} is skipped from replication stream")
assert instance.wait_for_log_line(
f"Table schema{altered_schema}.postgresql_replica_{altered_table} is skipped from replication stream"
)
altered_materialized_table = f"{materialized_db}.`schema{altered_schema}.postgresql_replica_{altered_table}`"
altered_materialized_table = (
f"{materialized_db}.`schema{altered_schema}.postgresql_replica_{altered_table}`"
)
instance.query(f"DETACH TABLE {altered_materialized_table} PERMANENTLY")
assert not instance.contains_in_log(
"from publication, because table does not exist in PostgreSQL"