import pytest import time import os.path as p import random from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry from helpers.test_tools import TSV from random import randrange import threading from helpers.postgres_utility import get_postgres_conn from helpers.postgres_utility import PostgresManager from helpers.postgres_utility import create_replication_slot, drop_replication_slot from helpers.postgres_utility import create_postgres_schema, drop_postgres_schema from helpers.postgres_utility import create_postgres_table, drop_postgres_table from helpers.postgres_utility import check_tables_are_synchronized from helpers.postgres_utility import check_several_tables_are_synchronized from helpers.postgres_utility import assert_nested_table_is_created from helpers.postgres_utility import assert_number_of_columns from helpers.postgres_utility import ( postgres_table_template, postgres_table_template_2, postgres_table_template_3, postgres_table_template_4, ) from helpers.postgres_utility import queries cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( "instance", main_configs=["configs/log_conf.xml"], user_configs=["configs/users.xml"], with_postgres=True, stay_alive=True, ) pg_manager = PostgresManager() @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() pg_manager.init( instance, cluster.postgres_ip, cluster.postgres_port, default_database="postgres_database", ) yield cluster finally: cluster.shutdown() @pytest.fixture(autouse=True) def setup_teardown(): print("PostgreSQL is available - running test") yield # run test pg_manager.restart() def test_load_and_sync_all_database_tables(started_cluster): NUM_TABLES = 5 pg_manager.create_and_fill_postgres_tables(NUM_TABLES) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) check_several_tables_are_synchronized(instance, NUM_TABLES) result = instance.query( "SELECT count() FROM system.tables WHERE database = 'test_database';" ) assert int(result) == NUM_TABLES def test_replicating_dml(started_cluster): NUM_TABLES = 5 for i in range(NUM_TABLES): pg_manager.create_postgres_table(f"postgresql_replica_{i}") instance.query( "INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format( i, i ) ) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for i in range(NUM_TABLES): instance.query( f"INSERT INTO postgres_database.postgresql_replica_{i} SELECT 50 + number, {i} from numbers(1000)" ) check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): pg_manager.execute( f"UPDATE postgresql_replica_{i} SET value = {i} * {i} WHERE key < 50;" ) pg_manager.execute( f"UPDATE postgresql_replica_{i} SET value = {i} * {i} * {i} WHERE key >= 50;" ) check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): pg_manager.execute( f"DELETE FROM postgresql_replica_{i} WHERE (value*value + {i}) % 2 = 0;" ) pg_manager.execute( f"UPDATE postgresql_replica_{i} SET value = value - (value % 7) WHERE key > 128 AND key < 512;" ) pg_manager.execute(f"DELETE FROM postgresql_replica_{i} WHERE key % 7 = 1;") check_several_tables_are_synchronized(instance, NUM_TABLES) def test_different_data_types(started_cluster): conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, ) cursor = conn.cursor() cursor.execute("drop table if exists test_data_types;") cursor.execute("drop table if exists test_array_data_type;") cursor.execute( """CREATE TABLE test_data_types ( id integer PRIMARY KEY, a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial, h timestamp, i date, j decimal(5, 5), k numeric(5, 5))""" ) cursor.execute( """CREATE TABLE test_array_data_type ( key Integer NOT NULL PRIMARY KEY, a Date[] NOT NULL, -- Date b Timestamp[] NOT NULL, -- DateTime64(6) c real[][] NOT NULL, -- Float32 d double precision[][] NOT NULL, -- Float64 e decimal(5, 5)[][][] NOT NULL, -- Decimal32 f integer[][][] NOT NULL, -- Int32 g Text[][][][][] NOT NULL, -- String h Integer[][][], -- Nullable(Int32) i Char(2)[][][][], -- Nullable(String) k Char(2)[] -- Nullable(String) )""" ) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for i in range(10): instance.query( """ INSERT INTO postgres_database.test_data_types VALUES ({}, -32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12.012345', '2000-05-12', 0.2, 0.2)""".format( i ) ) check_tables_are_synchronized(instance, "test_data_types", "id") result = instance.query( "SELECT * FROM test_database.test_data_types ORDER BY id LIMIT 1;" ) assert ( result == "0\t-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12.012345\t2000-05-12\t0.2\t0.2\n" ) for i in range(10): col = random.choice(["a", "b", "c"]) cursor.execute("UPDATE test_data_types SET {} = {};".format(col, i)) cursor.execute("UPDATE test_data_types SET i = '2020-12-12';") check_tables_are_synchronized(instance, "test_data_types", "id") instance.query( "INSERT INTO postgres_database.test_array_data_type " "VALUES (" "0, " "['2000-05-12', '2000-05-12'], " "['2000-05-12 12:12:12.012345', '2000-05-12 12:12:12.012345'], " "[[1.12345], [1.12345], [1.12345]], " "[[1.1234567891], [1.1234567891], [1.1234567891]], " "[[[0.11111, 0.11111]], [[0.22222, 0.22222]], [[0.33333, 0.33333]]], " "[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], " "[[[[['winx', 'winx', 'winx']]]]], " "[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]], " "[[[[NULL]]]], " "[]" ")" ) expected = ( "0\t" + "['2000-05-12','2000-05-12']\t" + "['2000-05-12 12:12:12.012345','2000-05-12 12:12:12.012345']\t" + "[[1.12345],[1.12345],[1.12345]]\t" + "[[1.1234567891],[1.1234567891],[1.1234567891]]\t" + "[[[0.11111,0.11111]],[[0.22222,0.22222]],[[0.33333,0.33333]]]\t" "[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t" "[[[[['winx','winx','winx']]]]]\t" "[[[1,NULL],[NULL,1]],[[NULL,NULL],[NULL,NULL]],[[4,4],[5,5]]]\t" "[[[[NULL]]]]\t" "[]\n" ) check_tables_are_synchronized(instance, "test_array_data_type") result = instance.query( "SELECT * FROM test_database.test_array_data_type ORDER BY key;" ) assert result == expected pg_manager.drop_materialized_db() cursor.execute("drop table if exists test_data_types;") cursor.execute("drop table if exists test_array_data_type;") def test_load_and_sync_subset_of_database_tables(started_cluster): NUM_TABLES = 10 pg_manager.create_and_fill_postgres_tables(NUM_TABLES) publication_tables = "" for i in range(NUM_TABLES): if i < int(NUM_TABLES / 2): if publication_tables != "": publication_tables += ", " publication_tables += f"postgresql_replica_{i}" pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, settings=[ "materialized_postgresql_tables_list = '{}'".format(publication_tables) ], ) time.sleep(1) for i in range(int(NUM_TABLES / 2)): table_name = f"postgresql_replica_{i}" assert_nested_table_is_created(instance, table_name) result = instance.query( """SELECT count() FROM system.tables WHERE database = 'test_database';""" ) assert int(result) == int(NUM_TABLES / 2) database_tables = instance.query("SHOW TABLES FROM test_database") for i in range(NUM_TABLES): table_name = "postgresql_replica_{}".format(i) if i < int(NUM_TABLES / 2): assert table_name in database_tables else: assert table_name not in database_tables instance.query( "INSERT INTO postgres_database.{} SELECT 50 + number, {} from numbers(100)".format( table_name, i ) ) for i in range(NUM_TABLES): table_name = f"postgresql_replica_{i}" if i < int(NUM_TABLES / 2): check_tables_are_synchronized(instance, table_name) def test_changing_replica_identity_value(started_cluster): pg_manager.create_postgres_table("postgresql_replica") instance.query( "INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)" ) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) instance.query( "INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)" ) check_tables_are_synchronized(instance, "postgresql_replica") pg_manager.execute("UPDATE postgresql_replica SET key=key-25 WHERE key<100 ") check_tables_are_synchronized(instance, "postgresql_replica") def test_clickhouse_restart(started_cluster): NUM_TABLES = 5 pg_manager.create_and_fill_postgres_tables(NUM_TABLES) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): instance.query( "INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(50000)".format( i, i ) ) instance.restart_clickhouse() check_several_tables_are_synchronized(instance, NUM_TABLES) def test_replica_identity_index(started_cluster): pg_manager.create_postgres_table( "postgresql_replica", template=postgres_table_template_3 ) pg_manager.execute("CREATE unique INDEX idx on postgresql_replica(key1, key2);") pg_manager.execute( "ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx" ) instance.query( "INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)" ) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) instance.query( "INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)" ) check_tables_are_synchronized(instance, "postgresql_replica", order_by="key1") pg_manager.execute("UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 ") pg_manager.execute("UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 ") pg_manager.execute( "UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 " ) pg_manager.execute( "UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 " ) check_tables_are_synchronized(instance, "postgresql_replica", order_by="key1") pg_manager.execute("DELETE FROM postgresql_replica WHERE key2<75;") check_tables_are_synchronized(instance, "postgresql_replica", order_by="key1") def test_table_schema_changes(started_cluster): NUM_TABLES = 5 for i in range(NUM_TABLES): pg_manager.create_postgres_table( f"postgresql_replica_{i}", template=postgres_table_template_2 ) instance.query( f"INSERT INTO postgres_database.postgresql_replica_{i} SELECT number, {i}, {i}, {i} from numbers(25)" ) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, ) for i in range(NUM_TABLES): instance.query( f"INSERT INTO postgres_database.postgresql_replica_{i} SELECT 25 + number, {i}, {i}, {i} from numbers(25)" ) check_several_tables_are_synchronized(instance, NUM_TABLES) expected = instance.query( "SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key" ) altered_idx = random.randint(0, 4) altered_table = f"postgresql_replica_{altered_idx}" prev_count = int( instance.query(f"SELECT count() FROM test_database.{altered_table}") ) pg_manager.execute(f"ALTER TABLE {altered_table} DROP COLUMN value2") for i in range(NUM_TABLES): pg_manager.execute(f"INSERT INTO postgresql_replica_{i} VALUES (50, {i}, {i})") assert instance.wait_for_log_line( f"Table postgresql_replica_{altered_idx} is skipped from replication stream" ) assert prev_count == int( instance.query(f"SELECT count() FROM test_database.{altered_table}") ) def test_many_concurrent_queries(started_cluster): table = "test_many_conc" query_pool = [ "DELETE FROM {} WHERE (value*value) % 3 = 0;", "UPDATE {} SET value = value - 125 WHERE key % 2 = 0;", "DELETE FROM {} WHERE key % 10 = 0;", "UPDATE {} SET value = value*5 WHERE key % 2 = 1;", "DELETE FROM {} WHERE value % 2 = 0;", "UPDATE {} SET value = value + 2000 WHERE key % 5 = 0;", "DELETE FROM {} WHERE value % 3 = 0;", "UPDATE {} SET value = value * 2 WHERE key % 3 = 0;", "DELETE FROM {} WHERE value % 9 = 2;", "UPDATE {} SET value = value + 2 WHERE key % 3 = 1;", "DELETE FROM {} WHERE value%5 = 0;", ] NUM_TABLES = 5 conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, ) cursor = conn.cursor() pg_manager.create_and_fill_postgres_tables( NUM_TABLES, numbers=10000, table_name_base=table ) def attack(thread_id): print("thread {}".format(thread_id)) k = 10000 for i in range(20): query_id = random.randrange(0, len(query_pool) - 1) table_id = random.randrange(0, 5) # num tables random_table_name = f"{table}_{table_id}" table_name = f"{table}_{thread_id}" # random update / delete query cursor.execute(query_pool[query_id].format(random_table_name)) print( "Executing for table {} query: {}".format( random_table_name, query_pool[query_id] ) ) # allow some thread to do inserts (not to violate key constraints) if thread_id < 5: print("try insert table {}".format(thread_id)) instance.query( "INSERT INTO postgres_database.{} SELECT {}*10000*({} + number), number from numbers(1000)".format( table_name, thread_id, k ) ) k += 1 print("insert table {} ok".format(thread_id)) if i == 5: # also change primary key value print("try update primary key {}".format(thread_id)) cursor.execute( "UPDATE {} SET key=key%100000+100000*{} WHERE key%{}=0".format( table_name, i + 1, i + 1 ) ) print("update primary key {} ok".format(thread_id)) n = [10000] threads = [] threads_num = 16 for i in range(threads_num): threads.append(threading.Thread(target=attack, args=(i,))) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() n[0] = 50000 for table_id in range(NUM_TABLES): n[0] += 1 table_name = f"{table}_{table_id}" instance.query( "INSERT INTO postgres_database.{} SELECT {} + number, number from numbers(5000)".format( table_name, n[0] ) ) # cursor.execute("UPDATE {table}_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(table_id, table_id+1, table_id+1)) for thread in threads: thread.join() for i in range(NUM_TABLES): table_name = f"{table}_{i}" check_tables_are_synchronized(instance, table_name) count1 = instance.query( "SELECT count() FROM postgres_database.{}".format(table_name) ) count2 = instance.query( "SELECT count() FROM (SELECT * FROM test_database.{})".format(table_name) ) assert int(count1) == int(count2) print(count1, count2) def test_single_transaction(started_cluster): conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False, ) cursor = conn.cursor() table_name = "postgresql_replica_0" create_postgres_table(cursor, table_name) conn.commit() pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) assert_nested_table_is_created(instance, table_name) for query in queries: print("query {}".format(query)) cursor.execute(query.format(0)) time.sleep(5) result = instance.query(f"select count() from test_database.{table_name}") # no commit yet assert int(result) == 0 conn.commit() check_tables_are_synchronized(instance, table_name) def test_virtual_columns(started_cluster): conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, ) cursor = conn.cursor() table_name = "postgresql_replica_0" create_postgres_table(cursor, table_name) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, ) assert_nested_table_is_created(instance, table_name) instance.query( f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10)" ) check_tables_are_synchronized(instance, table_name) # just check that it works, no check with `expected` because _version is taken as LSN, which will be different each time. result = instance.query( f"SELECT key, value, _sign, _version FROM test_database.{table_name};" ) print(result) def test_multiple_databases(started_cluster): NUM_TABLES = 5 conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=False, ) pg_manager.create_postgres_db("postgres_database_1") pg_manager.create_postgres_db("postgres_database_2") conn1 = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, database_name="postgres_database_1", ) conn2 = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, database_name="postgres_database_2", ) cursor1 = conn1.cursor() cursor2 = conn2.cursor() pg_manager.create_clickhouse_postgres_db( "postgres_database_1", "", "postgres_database_1", ) pg_manager.create_clickhouse_postgres_db( "postgres_database_2", "", "postgres_database_2", ) cursors = [cursor1, cursor2] for cursor_id in range(len(cursors)): for i in range(NUM_TABLES): table_name = "postgresql_replica_{}".format(i) create_postgres_table(cursors[cursor_id], table_name) instance.query( "INSERT INTO postgres_database_{}.{} SELECT number, number from numbers(50)".format( cursor_id + 1, table_name ) ) print( "database 1 tables: ", instance.query( """SELECT name FROM system.tables WHERE database = 'postgres_database_1';""" ), ) print( "database 2 tables: ", instance.query( """SELECT name FROM system.tables WHERE database = 'postgres_database_2';""" ), ) pg_manager.create_materialized_db( started_cluster.postgres_ip, started_cluster.postgres_port, "test_database_1", "postgres_database_1", ) pg_manager.create_materialized_db( started_cluster.postgres_ip, started_cluster.postgres_port, "test_database_2", "postgres_database_2", ) cursors = [cursor1, cursor2] for cursor_id in range(len(cursors)): for i in range(NUM_TABLES): table_name = "postgresql_replica_{}".format(i) instance.query( "INSERT INTO postgres_database_{}.{} SELECT 50 + number, number from numbers(50)".format( cursor_id + 1, table_name ) ) for cursor_id in range(len(cursors)): for i in range(NUM_TABLES): table_name = "postgresql_replica_{}".format(i) check_tables_are_synchronized( instance, table_name, "key", "postgres_database_{}".format(cursor_id + 1), "test_database_{}".format(cursor_id + 1), ) def test_concurrent_transactions(started_cluster): def transaction(thread_id): conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False, ) cursor = conn.cursor() for query in queries: cursor.execute(query.format(thread_id)) print("thread {}, query {}".format(thread_id, query)) conn.commit() NUM_TABLES = 6 pg_manager.create_and_fill_postgres_tables(NUM_TABLES, numbers=0) threads = [] threads_num = 6 for i in range(threads_num): threads.append(threading.Thread(target=transaction, args=(i,))) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for thread in threads: time.sleep(random.uniform(0, 0.5)) thread.start() for thread in threads: thread.join() for i in range(NUM_TABLES): check_tables_are_synchronized(instance, f"postgresql_replica_{i}") count1 = instance.query( f"SELECT count() FROM postgres_database.postgresql_replica_{i}" ) count2 = instance.query( f"SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{i})" ) print(int(count1), int(count2), sep=" ") assert int(count1) == int(count2) def test_abrupt_connection_loss_while_heavy_replication(started_cluster): def transaction(thread_id): if thread_id % 2: conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=True, ) else: conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False, ) cursor = conn.cursor() for query in queries: cursor.execute(query.format(thread_id)) print("thread {}, query {}".format(thread_id, query)) if thread_id % 2 == 0: conn.commit() NUM_TABLES = 6 pg_manager.create_and_fill_postgres_tables(NUM_TABLES, numbers=0) threads_num = 6 threads = [] for i in range(threads_num): threads.append(threading.Thread(target=transaction, args=(i,))) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for thread in threads: time.sleep(random.uniform(0, 0.5)) thread.start() for thread in threads: thread.join() # Join here because it takes time for data to reach wal time.sleep(2) started_cluster.pause_container("postgres1") # for i in range(NUM_TABLES): # result = instance.query(f"SELECT count() FROM test_database.postgresql_replica_{i}") # print(result) # Just debug started_cluster.unpause_container("postgres1") check_several_tables_are_synchronized(instance, NUM_TABLES) def test_drop_database_while_replication_startup_not_finished(started_cluster): NUM_TABLES = 5 pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 100000) for i in range(6): pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) time.sleep(0.5 * i) pg_manager.drop_materialized_db() def test_restart_server_while_replication_startup_not_finished(started_cluster): NUM_TABLES = 5 pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 100000) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) time.sleep(1) instance.restart_clickhouse() check_several_tables_are_synchronized(instance, NUM_TABLES) def test_abrupt_server_restart_while_heavy_replication(started_cluster): def transaction(thread_id): if thread_id % 2: conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=True, ) else: conn = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False, ) cursor = conn.cursor() for query in queries: cursor.execute(query.format(thread_id)) print("thread {}, query {}".format(thread_id, query)) if thread_id % 2 == 0: conn.commit() NUM_TABLES = 6 pg_manager.create_and_fill_postgres_tables(tables_num=NUM_TABLES, numbers=0) threads = [] threads_num = 6 for i in range(threads_num): threads.append(threading.Thread(target=transaction, args=(i,))) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for thread in threads: time.sleep(random.uniform(0, 0.5)) thread.start() for thread in threads: thread.join() # Join here because it takes time for data to reach wal instance.restart_clickhouse() check_several_tables_are_synchronized(instance, NUM_TABLES) def test_quoting_1(started_cluster): table_name = "user" pg_manager.create_and_fill_postgres_table(table_name) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) check_tables_are_synchronized(instance, table_name) def test_quoting_2(started_cluster): table_name = "user" pg_manager.create_and_fill_postgres_table(table_name) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, settings=[f"materialized_postgresql_tables_list = '{table_name}'"], ) check_tables_are_synchronized(instance, table_name) def test_user_managed_slots(started_cluster): slot_name = "user_slot" table_name = "test_table" pg_manager.create_and_fill_postgres_table(table_name) replication_connection = get_postgres_conn( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, replication=True, auto_commit=True, ) snapshot = create_replication_slot(replication_connection, slot_name=slot_name) pg_manager.create_materialized_db( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, settings=[ f"materialized_postgresql_replication_slot = '{slot_name}'", f"materialized_postgresql_snapshot = '{snapshot}'", ], ) check_tables_are_synchronized(instance, table_name) instance.query( "INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format( table_name ) ) check_tables_are_synchronized(instance, table_name) instance.restart_clickhouse() instance.query( "INSERT INTO postgres_database.{} SELECT number, number from numbers(20000, 10000)".format( table_name ) ) check_tables_are_synchronized(instance, table_name) pg_manager.drop_materialized_db() drop_replication_slot(replication_connection, slot_name) replication_connection.close() if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...") cluster.shutdown()