Update test.py

This commit is contained in:
Kseniia Sumarokova 2023-10-15 14:19:44 +02:00 committed by GitHub
parent b26115eabe
commit e0668e9ea0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -719,51 +719,6 @@ def test_too_many_parts(started_cluster):
pg_manager2.drop_materialized_db()
def test_replica_consumer(started_cluster):
table = "test_replica_consumer"
pg_manager_replica = PostgresManager()
pg_manager_replica.init(
instance2,
cluster.postgres_ip,
cluster.postgres_port,
default_database="postgres_database",
postgres_db_exists=True,
)
for pm in [pg_manager, pg_manager_replica]:
pm.create_and_fill_postgres_table(table)
pm.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
"materialized_postgresql_use_unique_replication_consumer_identifier = 1",
],
)
assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)"
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_replica.get_default_database()
)
assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
pg_manager_replica.clear()
def test_toast(started_cluster):
table = "test_toast"
pg_manager.execute(
@ -794,6 +749,58 @@ VALUES (1, (SELECT array_to_string(ARRAY(SELECT chr((100 + round(random() * 25))
)
def test_replica_consumer(started_cluster):
table = "test_replica_consumer"
pg_manager_replica = PostgresManager()
pg_manager_replica.init(
instance2,
cluster.postgres_ip,
cluster.postgres_port,
default_database="postgres_database",
postgres_db_exists=True,
)
for pm in [pg_manager, pg_manager_replica]:
pm.create_and_fill_postgres_table(table)
pm.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
"materialized_postgresql_use_unique_replication_consumer_identifier = 1",
],
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_replica.get_default_database()
)
assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)"
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_replica.get_default_database()
)
assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
pg_manager_replica.clear()
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")