This commit is contained in:
kssenii 2022-04-08 13:50:24 +02:00
parent 846faa51d8
commit fc3e3251b9

View File

@ -50,6 +50,7 @@ queries = [
def check_tables_are_synchronized( def check_tables_are_synchronized(
table_name, order_by="key", postgres_database="postgres_database" table_name, order_by="key", postgres_database="postgres_database"
): ):
while True:
expected = instance.query( expected = instance.query(
"select * from {}.{} order by {};".format( "select * from {}.{} order by {};".format(
postgres_database, table_name, order_by postgres_database, table_name, order_by
@ -58,12 +59,8 @@ def check_tables_are_synchronized(
result = instance.query( result = instance.query(
"select * from test.{} order by {};".format(table_name, order_by) "select * from test.{} order by {};".format(table_name, order_by)
) )
if result == expected:
while result != expected: break
time.sleep(0.5)
result = instance.query(
"select * from test.{} order by {};".format(table_name, order_by)
)
assert result == expected assert result == expected
@ -103,7 +100,7 @@ def create_clickhouse_postgres_db(ip, port, name="postgres_database"):
) )
def create_materialized_table(ip, port, table_name='postgresql_replica'): def create_materialized_table(ip, port, table_name="postgresql_replica"):
instance.query( instance.query(
f""" f"""
CREATE TABLE test.{table_name} (key Int64, value Int64) CREATE TABLE test.{table_name} (key Int64, value Int64)
@ -709,12 +706,12 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
table_name = "postgresql_replica_697" table_name = "postgresql_replica_697"
create_postgres_table(cursor, table_name) create_postgres_table(cursor, table_name)
instance.query( instance.query(f"INSERT INTO postgres_database.{table_name} SELECT -1, 1")
f"INSERT INTO postgres_database.{table_name} SELECT -1, 1"
)
instance.query(f"DROP TABLE IF EXISTS test.{table_name} NO DELAY") instance.query(f"DROP TABLE IF EXISTS test.{table_name} NO DELAY")
create_materialized_table( create_materialized_table(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
table_name=table_name,
) )
n = 1 n = 1
@ -722,7 +719,7 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
sleep(1) sleep(1)
n += 1 n += 1
if n > 10: if n > 10:
break; break
for query in queries: for query in queries:
cursor.execute(query.format(table_name)) cursor.execute(query.format(table_name))