diff --git a/tests/integration/helpers/postgres_utility.py b/tests/integration/helpers/postgres_utility.py new file mode 100644 index 00000000000..eac1d6712f2 --- /dev/null +++ b/tests/integration/helpers/postgres_utility.py @@ -0,0 +1,257 @@ +import psycopg2 +import time +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +postgres_table_template = """ + CREATE TABLE IF NOT EXISTS "{}" ( + key Integer NOT NULL, value Integer, PRIMARY KEY(key)) + """ +postgres_table_template_2 = """ + CREATE TABLE IF NOT EXISTS "{}" ( + key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key)) + """ +postgres_table_template_3 = """ + CREATE TABLE IF NOT EXISTS "{}" ( + key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL) + """ +postgres_table_template_4 = """ + CREATE TABLE IF NOT EXISTS "{}"."{}" ( + key Integer NOT NULL, value Integer, PRIMARY KEY(key)) + """ + +def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False): + if database == True: + conn_string = f"host={ip} port={port} dbname='{database_name}' user='postgres' password='mysecretpassword'" + else: + conn_string = f"host={ip} port={port} user='postgres' password='mysecretpassword'" + + if replication: + conn_string += " replication='database'" + + conn = psycopg2.connect(conn_string) + if auto_commit: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + conn.autocommit = True + return conn + +def create_replication_slot(conn, slot_name='user_slot'): + cursor = conn.cursor() + cursor.execute(f'CREATE_REPLICATION_SLOT {slot_name} LOGICAL pgoutput EXPORT_SNAPSHOT') + result = cursor.fetchall() + print(result[0][0]) # slot name + print(result[0][1]) # start lsn + print(result[0][2]) # snapshot + return result[0][2] + +def drop_replication_slot(conn, slot_name='user_slot'): + cursor = conn.cursor() + cursor.execute(f"select pg_drop_replication_slot('{slot_name}')") + + +def create_postgres_schema(cursor, schema_name): + drop_postgres_schema(cursor, schema_name) + cursor.execute(f'CREATE SCHEMA {schema_name}') + +def drop_postgres_schema(cursor, schema_name): + cursor.execute(f'DROP SCHEMA IF EXISTS {schema_name} CASCADE') + + +def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template): + drop_postgres_table(cursor, table_name) + cursor.execute(template.format(table_name)) + if replica_identity_full: + cursor.execute(f'ALTER TABLE {table_name} REPLICA IDENTITY FULL;') + +def drop_postgres_table(cursor, table_name): + cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}" """) + + +def create_postgres_table_with_schema(cursor, schema_name, table_name): + drop_postgres_table_with_schema(cursor, schema_name, table_name) + cursor.execute(postgres_table_template_4.format(schema_name, table_name)) + +def drop_postgres_table_with_schema(cursor, schema_name, table_name): + cursor.execute(f"""DROP TABLE IF EXISTS "{schema_name}"."{table_name}" """) + + +class PostgresManager: + def __init__(self): + self.created_postgres_db_list = set() + self.created_materialized_postgres_db_list = set() + self.created_ch_postgres_db_list = set() + + def init(self, instance, ip, port): + self.instance = instance + self.ip = ip + self.port = port + self.prepare() + + def restart(self): + try: + self.clear() + self.prepare() + except Exception as ex: + self.prepare() + raise ex + + def prepare(self): + conn = get_postgres_conn(ip=self.ip, port=self.port) + cursor = conn.cursor() + self.create_postgres_db(cursor, 'postgres_database') + self.create_clickhouse_postgres_db(ip=self.ip, port=self.port) + + def clear(self): + for db in self.created_materialized_postgres_db_list.copy(): + self.drop_materialized_db(db); + for db in self.created_ch_postgres_db_list.copy(): + self.drop_clickhouse_postgres_db(db) + if len(self.created_postgres_db_list) > 0: + conn = get_postgres_conn(ip=self.ip, port=self.port) + cursor = conn.cursor() + for db in self.created_postgres_db_list.copy(): + self.drop_postgres_db(cursor, db) + + def create_postgres_db(self, cursor, name='postgres_database'): + self.drop_postgres_db(cursor, name) + self.created_postgres_db_list.add(name) + cursor.execute(f"CREATE DATABASE {name}") + + def drop_postgres_db(self, cursor, name='postgres_database'): + cursor.execute(f"DROP DATABASE IF EXISTS {name}") + if name in self.created_postgres_db_list: + self.created_postgres_db_list.remove(name) + + def create_clickhouse_postgres_db(self, ip, port, name='postgres_database', database_name='postgres_database', schema_name=''): + self.drop_clickhouse_postgres_db(name) + self.created_ch_postgres_db_list.add(name) + + if len(schema_name) == 0: + self.instance.query(f''' + CREATE DATABASE {name} + ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword')''') + else: + self.instance.query(f''' + CREATE DATABASE {name} + ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')''') + + def drop_clickhouse_postgres_db(self, name='postgres_database'): + self.instance.query(f'DROP DATABASE IF EXISTS {name}') + if name in self.created_ch_postgres_db_list: + self.created_ch_postgres_db_list.remove(name) + + + def create_materialized_db(self, ip, port, + materialized_database='test_database', postgres_database='postgres_database', settings=[]): + self.created_materialized_postgres_db_list.add(materialized_database) + self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}") + + create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')" + if len(settings) > 0: + create_query += " SETTINGS " + for i in range(len(settings)): + if i != 0: + create_query += ', ' + create_query += settings[i] + self.instance.query(create_query) + assert materialized_database in self.instance.query('SHOW DATABASES') + + def drop_materialized_db(self, materialized_database='test_database'): + self.instance.query(f'DROP DATABASE IF EXISTS {materialized_database} NO DELAY') + if materialized_database in self.created_materialized_postgres_db_list: + self.created_materialized_postgres_db_list.remove(materialized_database) + assert materialized_database not in self.instance.query('SHOW DATABASES') + + def create_and_fill_postgres_table(self, table_name): + conn = get_postgres_conn(ip=self.ip, port=self.port, database=True) + cursor = conn.cursor() + create_postgres_table(cursor, table_name); + self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)") + + def create_and_fill_postgres_tables(self, tables_num, numbers=50): + conn = get_postgres_conn(ip=self.ip, port=self.port, database=True) + cursor = conn.cursor() + self.create_and_fill_postgres_tables_from_cursor(cursor, tables_num, numbers=numbers) + + def create_and_fill_postgres_tables_from_cursor(self, cursor, tables_num, numbers=50): + for i in range(tables_num): + table_name = f'postgresql_replica_{i}' + create_postgres_table(cursor, table_name); + if numbers > 0: + self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers({numbers})") + + +queries = [ + 'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);', + 'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;', + 'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;', + "UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0", + 'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);', + 'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;', + 'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;', + "UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1", + 'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;', + 'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;', + 'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);', + 'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;', + 'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;', + "UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1", + 'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);', + 'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;', + "UPDATE postgresql_replica_{} SET key=key+10000000", + 'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;', + 'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;' + ] + + +def assert_nested_table_is_created(instance, table_name, materialized_database='test_database', schema_name=''): + if len(schema_name) == 0: + table = table_name + else: + table = schema_name + "." + table_name + + print(f'Checking table {table} exists in {materialized_database}') + database_tables = instance.query(f'SHOW TABLES FROM {materialized_database}') + + while table not in database_tables: + time.sleep(0.2) + database_tables = instance.query(f'SHOW TABLES FROM {materialized_database}') + + assert(table in database_tables) + + +def assert_number_of_columns(instance, expected, table_name, database_name='test_database'): + result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')") + while (int(result) != expected): + time.sleep(1) + result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')") + print('Number of columns ok') + + +def check_tables_are_synchronized(instance, table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''): + assert_nested_table_is_created(instance, table_name, materialized_database, schema_name) + + table_path = '' + if len(schema_name) == 0: + table_path = f'{materialized_database}.{table_name}' + else: + table_path = f'{materialized_database}.`{schema_name}.{table_name}`' + + print(f"Checking table is synchronized: {table_path}") + result_query = f'select * from {table_path} order by {order_by};' + + expected = instance.query(f'select * from {postgres_database}.{table_name} order by {order_by};') + result = instance.query(result_query) + + for _ in range(120): + if result == expected: + break + else: + time.sleep(0.5) + result = instance.query(result_query) + + assert(result == expected) + + +def check_several_tables_are_synchronized(instance, tables_num, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''): + for i in range(tables_num): + check_tables_are_synchronized(instance, f'postgresql_replica_{i}'); diff --git a/tests/integration/test_postgresql_replica_database_engine_1/test.py b/tests/integration/test_postgresql_replica_database_engine_1/test.py index cba9e93c056..a51ad6bcf07 100644 --- a/tests/integration/test_postgresql_replica_database_engine_1/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_1/test.py @@ -1,245 +1,67 @@ import pytest import time -import psycopg2 import os.path as p import random from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry -from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from helpers.test_tools import TSV from random import randrange import threading +from helpers.postgres_utility import get_postgres_conn +from helpers.postgres_utility import PostgresManager + +from helpers.postgres_utility import create_replication_slot, drop_replication_slot +from helpers.postgres_utility import create_postgres_schema, drop_postgres_schema +from helpers.postgres_utility import create_postgres_table, drop_postgres_table +from helpers.postgres_utility import check_tables_are_synchronized +from helpers.postgres_utility import check_several_tables_are_synchronized +from helpers.postgres_utility import assert_nested_table_is_created +from helpers.postgres_utility import assert_number_of_columns +from helpers.postgres_utility import postgres_table_template, postgres_table_template_2, postgres_table_template_3, postgres_table_template_4 +from helpers.postgres_utility import queries + + cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance', main_configs = ['configs/log_conf.xml'], user_configs = ['configs/users.xml'], with_postgres=True, stay_alive=True) -postgres_table_template = """ - CREATE TABLE IF NOT EXISTS "{}" ( - key Integer NOT NULL, value Integer, PRIMARY KEY(key)) - """ -postgres_table_template_2 = """ - CREATE TABLE IF NOT EXISTS "{}" ( - key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key)) - """ -postgres_table_template_3 = """ - CREATE TABLE IF NOT EXISTS "{}" ( - key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL) - """ -postgres_table_template_4 = """ - CREATE TABLE IF NOT EXISTS "{}"."{}" ( - key Integer NOT NULL, value Integer, PRIMARY KEY(key)) - """ - -def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False): - if database == True: - conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name) - else: - conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port) - - if replication: - conn_string += " replication='database'" - - conn = psycopg2.connect(conn_string) - if auto_commit: - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - conn.autocommit = True - return conn - -def create_replication_slot(conn, slot_name='user_slot'): - cursor = conn.cursor() - cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name)) - result = cursor.fetchall() - print(result[0][0]) # slot name - print(result[0][1]) # start lsn - print(result[0][2]) # snapshot - return result[0][2] - -def drop_replication_slot(conn, slot_name='user_slot'): - cursor = conn.cursor() - cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name)) - -def create_postgres_db(cursor, name='postgres_database'): - cursor.execute("CREATE DATABASE {}".format(name)) - -def drop_postgres_db(cursor, name='postgres_database'): - cursor.execute("DROP DATABASE IF EXISTS {}".format(name)) - -def drop_postgres_schema(cursor, schema_name): - cursor.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(schema_name)) - -def create_postgres_schema(cursor, schema_name): - drop_postgres_schema(cursor, schema_name) - cursor.execute('CREATE SCHEMA {}'.format(schema_name)) - -def create_clickhouse_postgres_db(ip, port, name='postgres_database', database_name='postgres_database', schema_name=''): - drop_clickhouse_postgres_db(name) - if len(schema_name) == 0: - instance.query(''' - CREATE DATABASE {} - ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, database_name)) - else: - instance.query(''' - CREATE DATABASE {} - ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword', '{}')'''.format(name, ip, port, database_name, schema_name)) - -def drop_clickhouse_postgres_db(name='postgres_database'): - instance.query('DROP DATABASE IF EXISTS {}'.format(name)) - -def create_materialized_db(ip, port, - materialized_database='test_database', - postgres_database='postgres_database', - settings=[]): - instance.query(f"DROP DATABASE IF EXISTS {materialized_database}") - create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')" - if len(settings) > 0: - create_query += " SETTINGS " - for i in range(len(settings)): - if i != 0: - create_query += ', ' - create_query += settings[i] - instance.query(create_query) - assert materialized_database in instance.query('SHOW DATABASES') - -def drop_materialized_db(materialized_database='test_database'): - instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database)) - assert materialized_database not in instance.query('SHOW DATABASES') - -def drop_postgres_table(cursor, table_name): - cursor.execute("""DROP TABLE IF EXISTS "{}" """.format(table_name)) - -def drop_postgres_table_with_schema(cursor, schema_name, table_name): - cursor.execute("""DROP TABLE IF EXISTS "{}"."{}" """.format(schema_name, table_name)) - -def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template): - drop_postgres_table(cursor, table_name) - cursor.execute(template.format(table_name)) - if replica_identity_full: - cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name)) - -def create_postgres_table_with_schema(cursor, schema_name, table_name): - drop_postgres_table_with_schema(cursor, schema_name, table_name) - cursor.execute(postgres_table_template_4.format(schema_name, table_name)) - -queries = [ - 'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);', - 'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;', - 'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;', - "UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0", - 'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);', - 'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;', - 'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;', - "UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1", - 'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;', - 'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;', - 'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);', - 'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;', - 'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;', - "UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1", - 'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);', - 'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;', - "UPDATE postgresql_replica_{} SET key=key+10000000", - 'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;', - 'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;' - ] - - -def assert_nested_table_is_created(table_name, materialized_database='test_database', schema_name=''): - if len(schema_name) == 0: - table = table_name - else: - table = schema_name + "." + table_name - print(f'Checking table {table} exists in {materialized_database}') - database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database)) - while table not in database_tables: - time.sleep(0.2) - database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database)) - assert(table in database_tables) - - -def assert_number_of_columns(expected, table_name, database_name='test_database'): - result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')") - while (int(result) != expected): - time.sleep(1) - result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')") - print('Number of columns ok') - - -@pytest.mark.timeout(320) -def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''): - assert_nested_table_is_created(table_name, materialized_database, schema_name) - - print("Checking table is synchronized:", table_name) - expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by)) - if len(schema_name) == 0: - result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by)) - else: - result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by)) - - while result != expected: - time.sleep(0.5) - if len(schema_name) == 0: - result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by)) - else: - result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by)) - - assert(result == expected) +pg_manager = PostgresManager() @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() - conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port) - cursor = conn.cursor() - create_postgres_db(cursor, 'postgres_database') - create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port) - - instance.query("DROP DATABASE IF EXISTS test_database") + pg_manager.init(instance, cluster.postgres_ip, cluster.postgres_port) yield cluster finally: cluster.shutdown() +@pytest.fixture(autouse=True) +def setup_teardown(): + print("PostgreSQL is available - running test") + yield # run test + pg_manager.restart() + + def test_load_and_sync_all_database_tables(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() NUM_TABLES = 5 - - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - create_postgres_table(cursor, table_name); - instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name)) - - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) - assert 'test_database' in instance.query('SHOW DATABASES') - - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - check_tables_are_synchronized(table_name); - cursor.execute('drop table {};'.format(table_name)) - - result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''') + pg_manager.create_and_fill_postgres_tables(NUM_TABLES) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) + check_several_tables_are_synchronized(instance, NUM_TABLES) + result = instance.query("SELECT count() FROM system.tables WHERE database = 'test_database';") assert(int(result) == NUM_TABLES) - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) - def test_replicating_dml(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -248,41 +70,26 @@ def test_replicating_dml(started_cluster): create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i)) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) for i in range(NUM_TABLES): instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(1000)".format(i, i)) - - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - check_tables_are_synchronized(table_name); + check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): cursor.execute('UPDATE postgresql_replica_{} SET value = {} * {} WHERE key < 50;'.format(i, i, i)) cursor.execute('UPDATE postgresql_replica_{} SET value = {} * {} * {} WHERE key >= 50;'.format(i, i, i, i)) - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); + check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): cursor.execute('DELETE FROM postgresql_replica_{} WHERE (value*value + {}) % 2 = 0;'.format(i, i)) cursor.execute('UPDATE postgresql_replica_{} SET value = value - (value % 7) WHERE key > 128 AND key < 512;'.format(i)) cursor.execute('DELETE FROM postgresql_replica_{} WHERE key % 7 = 1;'.format(i, i)) - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) - - drop_materialized_db() + check_several_tables_are_synchronized(instance, NUM_TABLES) def test_different_data_types(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) cursor = conn.cursor() cursor.execute('drop table if exists test_data_types;') @@ -309,15 +116,14 @@ def test_different_data_types(started_cluster): k Char(2)[] -- Nullable(String) )''') - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) for i in range(10): instance.query(''' INSERT INTO postgres_database.test_data_types VALUES ({}, -32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12.012345', '2000-05-12', 0.2, 0.2)'''.format(i)) - check_tables_are_synchronized('test_data_types', 'id'); + check_tables_are_synchronized(instance, 'test_data_types', 'id'); result = instance.query('SELECT * FROM test_database.test_data_types ORDER BY id LIMIT 1;') assert(result == '0\t-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12.012345\t2000-05-12\t0.2\t0.2\n') @@ -326,7 +132,7 @@ def test_different_data_types(started_cluster): cursor.execute('UPDATE test_data_types SET {} = {};'.format(col, i)) cursor.execute('''UPDATE test_data_types SET i = '2020-12-12';'''.format(col, i)) - check_tables_are_synchronized('test_data_types', 'id'); + check_tables_are_synchronized(instance, 'test_data_types', 'id'); instance.query("INSERT INTO postgres_database.test_array_data_type " "VALUES (" @@ -357,44 +163,35 @@ def test_different_data_types(started_cluster): "[]\n" ) - check_tables_are_synchronized('test_array_data_type'); + check_tables_are_synchronized(instance, 'test_array_data_type'); result = instance.query('SELECT * FROM test_database.test_array_data_type ORDER BY key;') assert(result == expected) - drop_materialized_db() + pg_manager.drop_materialized_db() cursor.execute('drop table if exists test_data_types;') cursor.execute('drop table if exists test_array_data_type;') def test_load_and_sync_subset_of_database_tables(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() NUM_TABLES = 10 + pg_manager.create_and_fill_postgres_tables(NUM_TABLES) publication_tables = '' for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, number from numbers(50)".format(i)) - if i < int(NUM_TABLES/2): if publication_tables != '': publication_tables += ', ' - publication_tables += table_name + publication_tables += f'postgresql_replica_{i}' - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - settings=["materialized_postgresql_tables_list = '{}'".format(publication_tables)]) - assert 'test_database' in instance.query('SHOW DATABASES') + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + settings=["materialized_postgresql_tables_list = '{}'".format(publication_tables)]) time.sleep(1) for i in range(int(NUM_TABLES/2)): - table_name = 'postgresql_replica_{}'.format(i) - assert_nested_table_is_created(table_name) + table_name = f'postgresql_replica_{i}' + assert_nested_table_is_created(instance, table_name) result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''') assert(int(result) == int(NUM_TABLES/2)) @@ -409,69 +206,40 @@ def test_load_and_sync_subset_of_database_tables(started_cluster): instance.query("INSERT INTO postgres_database.{} SELECT 50 + number, {} from numbers(100)".format(table_name, i)) for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) + table_name = f'postgresql_replica_{i}' if i < int(NUM_TABLES/2): - check_tables_are_synchronized(table_name); - - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + check_tables_are_synchronized(instance, table_name); def test_changing_replica_identity_value(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) cursor = conn.cursor() create_postgres_table(cursor, 'postgresql_replica'); instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)") - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)") - check_tables_are_synchronized('postgresql_replica'); + check_tables_are_synchronized(instance, 'postgresql_replica'); cursor.execute("UPDATE postgresql_replica SET key=key-25 WHERE key<100 ") - check_tables_are_synchronized('postgresql_replica'); - - drop_materialized_db() - cursor.execute('drop table if exists postgresql_replica;') + check_tables_are_synchronized(instance, 'postgresql_replica'); def test_clickhouse_restart(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() NUM_TABLES = 5 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i)) - - instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')") - - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - check_tables_are_synchronized(table_name); + pg_manager.create_and_fill_postgres_tables(NUM_TABLES) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) + check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(50000)".format(i, i)) instance.restart_clickhouse() - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + check_several_tables_are_synchronized(instance, NUM_TABLES) def test_replica_identity_index(started_cluster): - drop_materialized_db() conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) cursor = conn.cursor() @@ -480,27 +248,22 @@ def test_replica_identity_index(started_cluster): cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx") instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)") - create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)") - check_tables_are_synchronized('postgresql_replica', order_by='key1'); + check_tables_are_synchronized(instance, 'postgresql_replica', order_by='key1'); cursor.execute("UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 ") cursor.execute("UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 ") cursor.execute("UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 ") cursor.execute("UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 ") - check_tables_are_synchronized('postgresql_replica', order_by='key1'); + check_tables_are_synchronized(instance, 'postgresql_replica', order_by='key1'); cursor.execute('DELETE FROM postgresql_replica WHERE key2<75;') - check_tables_are_synchronized('postgresql_replica', order_by='key1'); - - drop_materialized_db() - cursor.execute('drop table if exists postgresql_replica;') + check_tables_are_synchronized(instance, 'postgresql_replica', order_by='key1'); def test_table_schema_changes(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) cursor = conn.cursor() NUM_TABLES = 5 @@ -509,15 +272,14 @@ def test_table_schema_changes(started_cluster): create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2); instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i)) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - settings=["materialized_postgresql_allow_automatic_update = 1"]) + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + settings=["materialized_postgresql_allow_automatic_update = 1"]) for i in range(NUM_TABLES): instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i)) - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); + check_several_tables_are_synchronized(instance, NUM_TABLES) expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key"); @@ -530,13 +292,12 @@ def test_table_schema_changes(started_cluster): cursor.execute(f"UPDATE {altered_table} SET value3 = 12 WHERE key%2=0") time.sleep(2) - assert_nested_table_is_created(altered_table) - assert_number_of_columns(3, altered_table) - check_tables_are_synchronized(altered_table) + assert_nested_table_is_created(instance, altered_table) + assert_number_of_columns(instance, 3, altered_table) + check_tables_are_synchronized(instance, altered_table) print('check1 OK') - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); + check_several_tables_are_synchronized(instance, NUM_TABLES) for i in range(NUM_TABLES): if i != altered_idx: @@ -544,32 +305,12 @@ def test_table_schema_changes(started_cluster): else: instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i)) - check_tables_are_synchronized(altered_table); + check_tables_are_synchronized(instance, altered_table); print('check2 OK') - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - for i in range(NUM_TABLES): - cursor.execute('drop table postgresql_replica_{};'.format(i)) - - instance.query("DROP DATABASE test_database") - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + check_several_tables_are_synchronized(instance, NUM_TABLES) def test_many_concurrent_queries(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() - NUM_TABLES = 5 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT number, number from numbers(10000)'.format(i)) - n = [10000] - query_pool = ['DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;', 'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;', 'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;', @@ -582,6 +323,13 @@ def test_many_concurrent_queries(started_cluster): 'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;', 'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'] + NUM_TABLES = 5 + + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + database=True) + cursor = conn.cursor() + pg_manager.create_and_fill_postgres_tables_from_cursor(cursor, NUM_TABLES, numbers=10000) + def attack(thread_id): print('thread {}'.format(thread_id)) k = 10000 @@ -606,13 +354,14 @@ def test_many_concurrent_queries(started_cluster): cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(thread_id, i+1, i+1)) print("update primary key {} ok".format(thread_id)) + n = [10000] + threads = [] threads_num = 16 for i in range(threads_num): threads.append(threading.Thread(target=attack, args=(i,))) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) for thread in threads: time.sleep(random.uniform(0, 1)) @@ -628,108 +377,91 @@ def test_many_concurrent_queries(started_cluster): thread.join() for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); + check_tables_are_synchronized(instance, 'postgresql_replica_{}'.format(i)); count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i)) count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i)) assert(int(count1) == int(count2)) print(count1, count2) - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) - def test_single_transaction(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False) cursor = conn.cursor() - create_postgres_table(cursor, 'postgresql_replica_0'); + table_name = 'postgresql_replica_0' + create_postgres_table(cursor, table_name); conn.commit() - create_materialized_db(ip=started_cluster.postgres_ip, + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) - assert_nested_table_is_created('postgresql_replica_0') + assert_nested_table_is_created(instance, table_name) for query in queries: print('query {}'.format(query)) cursor.execute(query.format(0)) time.sleep(5) - result = instance.query("select count() from test_database.postgresql_replica_0") + result = instance.query(f"select count() from test_database.{table_name}") # no commit yet assert(int(result) == 0) conn.commit() - check_tables_are_synchronized('postgresql_replica_0'); - - drop_materialized_db() - cursor.execute('drop table if exists postgresql_replica_0;') + check_tables_are_synchronized(instance, table_name); def test_virtual_columns(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) cursor = conn.cursor() - create_postgres_table(cursor, 'postgresql_replica_0'); + table_name = 'postgresql_replica_0' + create_postgres_table(cursor, table_name); - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - settings=["materialized_postgresql_allow_automatic_update = 1"]) - assert_nested_table_is_created('postgresql_replica_0') - instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number from numbers(10)") - check_tables_are_synchronized('postgresql_replica_0'); + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + settings=["materialized_postgresql_allow_automatic_update = 1"]) + + assert_nested_table_is_created(instance, table_name) + instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10)") + check_tables_are_synchronized(instance, table_name); # just check that it works, no check with `expected` because _version is taken as LSN, which will be different each time. - result = instance.query('SELECT key, value, _sign, _version FROM test_database.postgresql_replica_0;') + result = instance.query(f'SELECT key, value, _sign, _version FROM test_database.{table_name};') print(result) - cursor.execute("ALTER TABLE postgresql_replica_0 ADD COLUMN value2 integer") - instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(10, 10)") - assert_number_of_columns(3, 'postgresql_replica_0') - check_tables_are_synchronized('postgresql_replica_0'); + cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value2 integer") + instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number, number from numbers(10, 10)") + assert_number_of_columns(instance, 3, table_name) + check_tables_are_synchronized(instance, table_name); result = instance.query('SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0;') print(result) - instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(20, 10)") - check_tables_are_synchronized('postgresql_replica_0'); + instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number, number from numbers(20, 10)") + check_tables_are_synchronized(instance, table_name); - result = instance.query('SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0;') + result = instance.query(f'SELECT key, value, value2, _sign, _version FROM test_database.{table_name};') print(result) - drop_materialized_db() - cursor.execute('drop table if exists postgresql_replica_0;') - def test_multiple_databases(started_cluster): - drop_materialized_db('test_database_1') - drop_materialized_db('test_database_2') NUM_TABLES = 5 - - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=False) cursor = conn.cursor() - create_postgres_db(cursor, 'postgres_database_1') - create_postgres_db(cursor, 'postgres_database_2') + pg_manager.create_postgres_db(cursor, 'postgres_database_1') + pg_manager.create_postgres_db(cursor, 'postgres_database_2') - conn1 = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn1 = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, database_name='postgres_database_1') - conn2 = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn2 = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, database_name='postgres_database_2') cursor1 = conn1.cursor() cursor2 = conn2.cursor() - create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_1', 'postgres_database_1') - create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_2', 'postgres_database_2') + pg_manager.create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_1', 'postgres_database_1') + pg_manager.create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_2', 'postgres_database_2') cursors = [cursor1, cursor2] for cursor_id in range(len(cursors)): @@ -740,9 +472,9 @@ def test_multiple_databases(started_cluster): print('database 1 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_1';''')) print('database 2 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_2';''')) - create_materialized_db(started_cluster.postgres_ip, started_cluster.postgres_port, + pg_manager.create_materialized_db(started_cluster.postgres_ip, started_cluster.postgres_port, 'test_database_1', 'postgres_database_1') - create_materialized_db(started_cluster.postgres_ip, started_cluster.postgres_port, + pg_manager.create_materialized_db(started_cluster.postgres_ip, started_cluster.postgres_port, 'test_database_2', 'postgres_database_2') cursors = [cursor1, cursor2] @@ -754,289 +486,186 @@ def test_multiple_databases(started_cluster): for cursor_id in range(len(cursors)): for i in range(NUM_TABLES): table_name = 'postgresql_replica_{}'.format(i) - check_tables_are_synchronized( + check_tables_are_synchronized(instance, table_name, 'key', 'postgres_database_{}'.format(cursor_id + 1), 'test_database_{}'.format(cursor_id + 1)); - for i in range(NUM_TABLES): - cursor1.execute('drop table if exists postgresql_replica_{};'.format(i)) - for i in range(NUM_TABLES): - cursor2.execute('drop table if exists postgresql_replica_{};'.format(i)) - - drop_clickhouse_postgres_db('postgres_database_1') - drop_clickhouse_postgres_db('postgres_database_2') - - drop_materialized_db('test_database_1') - drop_materialized_db('test_database_2') - def test_concurrent_transactions(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() - NUM_TABLES = 6 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - def transaction(thread_id): conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False) - cursor_ = conn.cursor() + cursor = conn.cursor() for query in queries: - cursor_.execute(query.format(thread_id)) + cursor.execute(query.format(thread_id)) print('thread {}, query {}'.format(thread_id, query)) conn.commit() + NUM_TABLES = 6 + pg_manager.create_and_fill_postgres_tables(NUM_TABLES, numbers=0) + threads = [] threads_num = 6 for i in range(threads_num): threads.append(threading.Thread(target=transaction, args=(i,))) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) for thread in threads: time.sleep(random.uniform(0, 0.5)) thread.start() + for thread in threads: thread.join() for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i)) - count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i)) + check_tables_are_synchronized(instance, f'postgresql_replica_{i}'); + count1 = instance.query(f'SELECT count() FROM postgres_database.postgresql_replica_{i}') + count2 = instance.query(f'SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{i})') print(int(count1), int(count2), sep=' ') assert(int(count1) == int(count2)) - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) - def test_abrupt_connection_loss_while_heavy_replication(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() - NUM_TABLES = 6 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - def transaction(thread_id): if thread_id % 2: - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=True) else: - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False) - cursor_ = conn.cursor() + cursor = conn.cursor() for query in queries: - cursor_.execute(query.format(thread_id)) + cursor.execute(query.format(thread_id)) print('thread {}, query {}'.format(thread_id, query)) if thread_id % 2 == 0: conn.commit() - threads = [] + NUM_TABLES = 6 + pg_manager.create_and_fill_postgres_tables(NUM_TABLES, numbers=0) + threads_num = 6 + threads = [] for i in range(threads_num): threads.append(threading.Thread(target=transaction, args=(i,))) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) for thread in threads: time.sleep(random.uniform(0, 0.5)) thread.start() - # Join here because it takes time for data to reach wal for thread in threads: - thread.join() + thread.join() # Join here because it takes time for data to reach wal + time.sleep(1) started_cluster.pause_container('postgres1') for i in range(NUM_TABLES): - result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) + result = instance.query(f"SELECT count() FROM test_database.postgresql_replica_{i}") print(result) # Just debug started_cluster.unpause_container('postgres1') - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - for i in range(NUM_TABLES): - result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) - print(result) # Just debug - - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + check_several_tables_are_synchronized(instance, NUM_TABLES) def test_drop_database_while_replication_startup_not_finished(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() NUM_TABLES = 5 - - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - create_postgres_table(cursor, table_name); - instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(100000)".format(table_name)) - + pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 100000) for i in range(6): - create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) time.sleep(0.5 * i) - drop_materialized_db() - - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + pg_manager.drop_materialized_db() def test_restart_server_while_replication_startup_not_finished(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() NUM_TABLES = 5 - - for i in range(NUM_TABLES): - table_name = 'postgresql_replica_{}'.format(i) - create_postgres_table(cursor, table_name); - instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(100000)".format(table_name)) - - create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) - time.sleep(0.5) + pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 100000) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) + time.sleep(1) instance.restart_clickhouse() - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table postgresql_replica_{};'.format(i)) + check_several_tables_are_synchronized(instance, NUM_TABLES) def test_abrupt_server_restart_while_heavy_replication(started_cluster): - drop_materialized_db() - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() - NUM_TABLES = 6 - - for i in range(NUM_TABLES): - create_postgres_table(cursor, 'postgresql_replica_{}'.format(i)); - def transaction(thread_id): if thread_id % 2: - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=True) else: - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, + conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True, auto_commit=False) - cursor_ = conn.cursor() + cursor = conn.cursor() for query in queries: - cursor_.execute(query.format(thread_id)) + cursor.execute(query.format(thread_id)) print('thread {}, query {}'.format(thread_id, query)) if thread_id % 2 == 0: conn.commit() + NUM_TABLES = 6 + pg_manager.create_and_fill_postgres_tables(tables_num=NUM_TABLES, numbers=0) + threads = [] threads_num = 6 for i in range(threads_num): threads.append(threading.Thread(target=transaction, args=(i,))) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) for thread in threads: time.sleep(random.uniform(0, 0.5)) thread.start() - # Join here because it takes time for data to reach wal for thread in threads: - thread.join() + thread.join() # Join here because it takes time for data to reach wal + instance.restart_clickhouse() - - for i in range(NUM_TABLES): - result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) - print(result) # Just debug - - for i in range(NUM_TABLES): - check_tables_are_synchronized('postgresql_replica_{}'.format(i)); - - for i in range(NUM_TABLES): - result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i)) - print(result) # Just debug - - drop_materialized_db() - for i in range(NUM_TABLES): - cursor.execute('drop table if exists postgresql_replica_{};'.format(i)) + check_several_tables_are_synchronized(instance, NUM_TABLES) def test_quoting_1(started_cluster): - conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) - cursor = conn.cursor() table_name = 'user' - create_postgres_table(cursor, table_name); - instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)") - create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) - check_tables_are_synchronized(table_name); - drop_materialized_db() - drop_postgres_table(cursor, table_name) + pg_manager.create_and_fill_postgres_table(table_name) + pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port) + check_tables_are_synchronized(instance, table_name); def test_quoting_2(started_cluster): - conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True) - cursor = conn.cursor() table_name = 'user' - create_postgres_table(cursor, table_name); - instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)") - create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, - settings=[f"materialized_postgresql_tables_list = '{table_name}'"]) - check_tables_are_synchronized(table_name); - drop_materialized_db() - drop_postgres_table(cursor, table_name) + pg_manager.create_and_fill_postgres_table(table_name) + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + settings=[f"materialized_postgresql_tables_list = '{table_name}'"]) + check_tables_are_synchronized(instance, table_name); def test_user_managed_slots(started_cluster): - conn = get_postgres_conn(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - database=True) - cursor = conn.cursor() - table_name = 'test_table' - create_postgres_table(cursor, table_name); - instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name)) - slot_name = 'user_slot' - replication_connection = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, - database=True, replication=True, auto_commit=True) + table_name = 'test_table' + pg_manager.create_and_fill_postgres_table(table_name) + + replication_connection = get_postgres_conn( + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + database=True, replication=True, auto_commit=True) snapshot = create_replication_slot(replication_connection, slot_name=slot_name) - create_materialized_db(ip=started_cluster.postgres_ip, - port=started_cluster.postgres_port, - settings=["materialized_postgresql_replication_slot = '{}'".format(slot_name), - "materialized_postgresql_snapshot = '{}'".format(snapshot)]) - check_tables_are_synchronized(table_name); + + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + settings=[f"materialized_postgresql_replication_slot = '{slot_name}'", + f"materialized_postgresql_snapshot = '{snapshot}'"]) + check_tables_are_synchronized(instance, table_name); + instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name)) - check_tables_are_synchronized(table_name); + check_tables_are_synchronized(instance, table_name); + instance.restart_clickhouse() + instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(20000, 10000)".format(table_name)) - check_tables_are_synchronized(table_name); - drop_postgres_table(cursor, table_name) - drop_materialized_db() + check_tables_are_synchronized(instance, table_name); + + pg_manager.drop_materialized_db() drop_replication_slot(replication_connection, slot_name) - cursor.execute('DROP TABLE IF EXISTS test_table') + replication_connection.close() if __name__ == '__main__':