Improve tests part 2

This commit is contained in:
kssenii 2022-01-08 15:26:29 +03:00
parent 64538cf20f
commit 55430feac9
3 changed files with 105 additions and 299 deletions

View File

@ -18,6 +18,10 @@ postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_5 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value UUID, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
@ -84,6 +88,7 @@ class PostgresManager:
self.instance = instance
self.ip = ip
self.port = port
self.conn = get_postgres_conn(ip=self.ip, port=self.port)
self.prepare()
def restart(self):
@ -101,6 +106,8 @@ class PostgresManager:
self.create_clickhouse_postgres_db(ip=self.ip, port=self.port)
def clear(self):
if self.conn.closed == 0:
self.conn.close()
for db in self.created_materialized_postgres_db_list.copy():
self.drop_materialized_db(db);
for db in self.created_ch_postgres_db_list.copy():
@ -111,6 +118,10 @@ class PostgresManager:
for db in self.created_postgres_db_list.copy():
self.drop_postgres_db(cursor, db)
def get_db_cursor(self):
self.conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
return self.conn.cursor()
def create_postgres_db(self, cursor, name='postgres_database'):
self.drop_postgres_db(cursor, name)
self.created_postgres_db_list.add(name)
@ -141,7 +152,8 @@ class PostgresManager:
def create_materialized_db(self, ip, port,
materialized_database='test_database', postgres_database='postgres_database', settings=[]):
materialized_database='test_database', postgres_database='postgres_database',
settings=[], table_overrides=''):
self.created_materialized_postgres_db_list.add(materialized_database)
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
@ -152,6 +164,7 @@ class PostgresManager:
if i != 0:
create_query += ', '
create_query += settings[i]
create_query += table_overrides
self.instance.query(create_query)
assert materialized_database in self.instance.query('SHOW DATABASES')
@ -164,6 +177,9 @@ class PostgresManager:
def create_and_fill_postgres_table(self, table_name):
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
cursor = conn.cursor()
self.create_and_fill_postgres_table_from_cursor(cursor, table_name)
def create_and_fill_postgres_table_from_cursor(self, cursor, table_name):
create_postgres_table(cursor, table_name);
self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)")
@ -242,7 +258,7 @@ def check_tables_are_synchronized(instance, table_name, order_by='key', postgres
expected = instance.query(f'select * from {postgres_database}.{table_name} order by {order_by};')
result = instance.query(result_query)
for _ in range(120):
for _ in range(30):
if result == expected:
break
else:

View File

@ -558,12 +558,12 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
for thread in threads:
thread.join() # Join here because it takes time for data to reach wal
time.sleep(1)
time.sleep(2)
started_cluster.pause_container('postgres1')
for i in range(NUM_TABLES):
result = instance.query(f"SELECT count() FROM test_database.postgresql_replica_{i}")
print(result) # Just debug
# for i in range(NUM_TABLES):
# result = instance.query(f"SELECT count() FROM test_database.postgresql_replica_{i}")
# print(result) # Just debug
started_cluster.unpause_container('postgres1')
check_several_tables_are_synchronized(instance, NUM_TABLES)

View File

@ -12,235 +12,62 @@ from helpers.test_tools import TSV
from random import randrange
import threading
from helpers.postgres_utility import get_postgres_conn
from helpers.postgres_utility import PostgresManager
from helpers.postgres_utility import create_replication_slot, drop_replication_slot
from helpers.postgres_utility import create_postgres_schema, drop_postgres_schema
from helpers.postgres_utility import create_postgres_table, drop_postgres_table
from helpers.postgres_utility import create_postgres_table_with_schema, drop_postgres_table_with_schema
from helpers.postgres_utility import check_tables_are_synchronized
from helpers.postgres_utility import check_several_tables_are_synchronized
from helpers.postgres_utility import assert_nested_table_is_created
from helpers.postgres_utility import assert_number_of_columns
from helpers.postgres_utility import postgres_table_template, postgres_table_template_2, postgres_table_template_3, postgres_table_template_4, postgres_table_template_5
from helpers.postgres_utility import queries
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs = ['configs/log_conf.xml'],
user_configs = ['configs/users.xml'],
with_postgres=True, stay_alive=True)
postgres_table_template = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_2 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
"""
postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS "{}" (
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS "{}"."{}" (
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
"""
postgres_table_template_5 = """
CREATE TABLE IF NOT EXISTS "{}" (
key Integer NOT NULL, value UUID, PRIMARY KEY(key))
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
else:
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
if replication:
conn_string += " replication='database'"
conn = psycopg2.connect(conn_string)
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name))
result = cursor.fetchall()
print(result[0][0]) # slot name
print(result[0][1]) # start lsn
print(result[0][2]) # snapshot
return result[0][2]
def drop_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name))
def create_postgres_db(cursor, name='postgres_database'):
cursor.execute("CREATE DATABASE {}".format(name))
def drop_postgres_db(cursor, name='postgres_database'):
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
def drop_postgres_schema(cursor, schema_name):
cursor.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(schema_name))
def create_postgres_schema(cursor, schema_name):
drop_postgres_schema(cursor, schema_name)
cursor.execute('CREATE SCHEMA {}'.format(schema_name))
def create_clickhouse_postgres_db(ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
drop_clickhouse_postgres_db(name)
if len(schema_name) == 0:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, database_name))
else:
instance.query('''
CREATE DATABASE {}
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword', '{}')'''.format(name, ip, port, database_name, schema_name))
def drop_clickhouse_postgres_db(name='postgres_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(name))
def create_materialized_db(ip, port,
materialized_database='test_database',
postgres_database='postgres_database',
settings=[], table_overrides=''):
instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
if len(settings) > 0:
create_query += " SETTINGS "
for i in range(len(settings)):
if i != 0:
create_query += ', '
create_query += settings[i]
create_query += table_overrides
instance.query(create_query)
assert materialized_database in instance.query('SHOW DATABASES')
def drop_materialized_db(materialized_database='test_database'):
instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database))
assert materialized_database not in instance.query('SHOW DATABASES')
def drop_postgres_table(cursor, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}" """.format(table_name))
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
cursor.execute("""DROP TABLE IF EXISTS "{}"."{}" """.format(schema_name, table_name))
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
drop_postgres_table(cursor, table_name)
cursor.execute(template.format(table_name))
if replica_identity_full:
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
def create_postgres_table_with_schema(cursor, schema_name, table_name):
drop_postgres_table_with_schema(cursor, schema_name, table_name)
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
queries = [
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
"UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;',
"UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1",
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
"UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1",
'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);',
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
"UPDATE postgresql_replica_{} SET key=key+10000000",
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
]
def assert_nested_table_is_created(table_name, materialized_database='test_database', schema_name=''):
if len(schema_name) == 0:
table = table_name
else:
table = schema_name + "." + table_name
print(f'Checking table {table} exists in {materialized_database}')
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
while table not in database_tables:
time.sleep(0.2)
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
assert(table in database_tables)
def assert_number_of_columns(expected, table_name, database_name='test_database'):
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
print('Number of columns ok')
@pytest.mark.timeout(320)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
assert_nested_table_is_created(table_name, materialized_database, schema_name)
print(f"Checking table is synchronized. Table name: {table_name}, table schema: {schema_name}")
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
try_num = 0
while result != expected:
time.sleep(0.5)
if len(schema_name) == 0:
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
else:
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
try_num += 1
if try_num > 30:
break
assert(result == expected)
pg_manager = PostgresManager()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port)
cursor = conn.cursor()
create_postgres_db(cursor, 'postgres_database')
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port)
instance.query("DROP DATABASE IF EXISTS test_database")
pg_manager.init(instance, cluster.postgres_ip, cluster.postgres_port)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def setup_teardown():
print("PostgreSQL is available - running test")
yield # run test
pg_manager.restart()
def test_add_new_table_to_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
cursor.execute('DROP TABLE IF EXISTS test_table')
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
pg_manager.create_and_fill_postgres_tables_from_cursor(cursor, NUM_TABLES, 10000)
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
check_several_tables_are_synchronized(instance, NUM_TABLES)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
table_name = 'postgresql_replica_5'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
pg_manager.create_and_fill_postgres_table_from_cursor(cursor, table_name)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(") # Check without ip
@ -252,16 +79,16 @@ def test_add_new_table_to_replication(started_cluster):
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'")
assert('Database engine MaterializedPostgreSQL does not support setting' in result)
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.query(f"ATTACH TABLE test_database.{table_name}");
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n")
check_tables_are_synchronized(table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
check_tables_are_synchronized(instance, table_name);
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(10000, 10000)")
check_tables_are_synchronized(instance, table_name);
result = instance.query_and_get_error("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query_and_get_error(f"ATTACH TABLE test_database.{table_name}");
assert('Table test_database.postgresql_replica_5 already exists' in result)
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table");
@ -274,14 +101,14 @@ def test_add_new_table_to_replication(started_cluster):
table_name = 'postgresql_replica_6'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.query(f"ATTACH TABLE test_database.{table_name}");
instance.restart_clickhouse()
table_name = 'postgresql_replica_7'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.query(f"ATTACH TABLE test_database.{table_name}");
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
@ -289,33 +116,14 @@ def test_add_new_table_to_replication(started_cluster):
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n")
check_several_tables_are_synchronized(instance, NUM_TABLES + 3)
for i in range(NUM_TABLES + 3):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES + 3):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_remove_table_from_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS test_table')
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 10000)
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
check_several_tables_are_synchronized(instance, NUM_TABLES)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
@ -325,8 +133,8 @@ def test_remove_table_from_replication(started_cluster):
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
table_name = 'postgresql_replica_4'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query_and_get_error('SELECT * FROM test_database.{}'.format(table_name))
instance.query(f'DETACH TABLE test_database.{table_name}');
result = instance.query_and_get_error(f'SELECT * FROM test_database.{table_name}')
assert("doesn't exist" in result)
result = instance.query("SHOW TABLES FROM test_database")
@ -336,52 +144,42 @@ def test_remove_table_from_replication(started_cluster):
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n")
instance.query('ATTACH TABLE test_database.{}'.format(table_name));
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
instance.query(f'ATTACH TABLE test_database.{table_name}');
check_tables_are_synchronized(instance, table_name);
check_several_tables_are_synchronized(instance, NUM_TABLES)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-159:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
table_name = 'postgresql_replica_1'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
instance.query(f'DETACH TABLE test_database.{table_name}');
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
for i in range(NUM_TABLES):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
cursor = pg_manager.get_db_cursor()
cursor.execute(f'drop table if exists postgresql_replica_0;')
# Removing from replication table which does not exist in PostgreSQL must be ok.
instance.query('DETACH TABLE test_database.postgresql_replica_0');
assert instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
drop_materialized_db()
def test_predefined_connection_configuration(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_table')
cursor.execute(f'CREATE TABLE test_table (key integer PRIMARY KEY, value integer)')
cursor.execute(f'INSERT INTO test_table SELECT 1, 2')
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list='test_table'")
check_tables_are_synchronized("test_table");
drop_materialized_db()
cursor.execute('DROP TABLE IF EXISTS test_table')
check_tables_are_synchronized(instance, "test_table");
pg_manager.drop_materialized_db()
insert_counter = 0
def test_database_with_single_non_default_schema(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
NUM_TABLES=5
schema_name = 'test_schema'
materialized_db = 'test_database'
@ -405,18 +203,17 @@ def test_database_with_single_non_default_schema(started_cluster):
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(instance, f"postgresql_replica_{i}", postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
pg_manager.create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table_with_schema(cursor, schema_name, table_name);
create_postgres_table_with_schema(cursor, schema_name, f'postgresql_replica_{i}');
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema = '{schema_name}'", "materialized_postgresql_allow_automatic_update = 1"])
insert_into_tables()
@ -434,22 +231,19 @@ def test_database_with_single_non_default_schema(started_cluster):
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
assert_number_of_columns(instance, 3, f'postgresql_replica_{altered_table}')
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
instance.query(f"DETACH TABLE {materialized_db}.{detached_table_name}")
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.{detached_table_name}")
check_tables_are_synchronized(detached_table_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
check_tables_are_synchronized(instance, detached_table_name, postgres_database=clickhouse_postgres_db);
def test_database_with_multiple_non_default_schemas_1(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
NUM_TABLES = 5
schema_name = 'test_schema'
@ -475,11 +269,11 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
def check_all_tables_are_synchronized():
for i in range(NUM_TABLES):
print('checking table', i)
check_tables_are_synchronized("postgresql_replica_{}".format(i), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(instance, "postgresql_replica_{}".format(i), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('synchronization Ok')
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
pg_manager.create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
@ -489,7 +283,7 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
publication_tables += schema_name + '.' + table_name
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
pg_manager.create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{publication_tables}'", "materialized_postgresql_tables_list_with_schema=1", "materialized_postgresql_allow_automatic_update = 1"])
check_all_tables_are_synchronized()
@ -507,8 +301,8 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
instance.query(f"INSERT INTO {clickhouse_postgres_db}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(5000, 1000)")
assert_number_of_columns(3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
assert_number_of_columns(instance, 3, f'{schema_name}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
@ -516,15 +310,11 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.`{schema_name}.{detached_table_name}`")
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
check_tables_are_synchronized(detached_table_name, schema_name=schema_name, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
check_tables_are_synchronized(instance, detached_table_name, schema_name=schema_name, postgres_database=clickhouse_postgres_db);
def test_database_with_multiple_non_default_schemas_2(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
NUM_TABLES = 2
schemas_num = 2
schema_list = 'schema0, schema1'
@ -539,7 +329,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
print(f'checking table {schema_name}.{table_name}')
check_tables_are_synchronized(f'{table_name}', schema_name=schema_name, postgres_database=clickhouse_postgres_db);
check_tables_are_synchronized(instance, f'{table_name}', schema_name=schema_name, postgres_database=clickhouse_postgres_db);
print('synchronized Ok')
def insert_into_tables():
@ -560,14 +350,16 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
schema_name = f'schema{i}'
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
create_postgres_schema(cursor, schema_name)
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
pg_manager.create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
for ti in range(NUM_TABLES):
table_name = f'postgresql_replica_{ti}'
create_postgres_table_with_schema(cursor, schema_name, table_name);
insert_into_tables()
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema_list = '{schema_list}'", "materialized_postgresql_allow_automatic_update = 1"])
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_schema_list = '{schema_list}'",
"materialized_postgresql_allow_automatic_update = 1"])
check_all_tables_are_synchronized()
insert_into_tables()
@ -586,8 +378,8 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
cursor.execute(f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer")
instance.query(f"INSERT INTO clickhouse_postgres_db{altered_schema}.postgresql_replica_{altered_table} SELECT number, number, number from numbers(1000 * {insert_counter}, 1000)")
assert_number_of_columns(3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=f"schema{altered_schema}", postgres_database=clickhouse_postgres_db);
assert_number_of_columns(instance, 3, f'schema{altered_schema}.postgresql_replica_{altered_table}')
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", schema_name=f"schema{altered_schema}", postgres_database=clickhouse_postgres_db);
print('DETACH-ATTACH')
detached_table_name = "postgresql_replica_1"
@ -597,23 +389,22 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
assert not instance.contains_in_log("from publication, because table does not exist in PostgreSQL")
instance.query(f"ATTACH TABLE {materialized_db}.`{detached_table_schema}.{detached_table_name}`")
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
check_tables_are_synchronized(f"postgresql_replica_{altered_table}", schema_name=detached_table_schema, postgres_database=clickhouse_postgres_db);
drop_materialized_db()
check_tables_are_synchronized(instance, f"postgresql_replica_{altered_table}", schema_name=detached_table_schema, postgres_database=clickhouse_postgres_db);
def test_table_override(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
cursor = conn.cursor()
cursor = pg_manager.get_db_cursor()
table_name = 'table_override'
materialized_database = 'test_database'
create_postgres_table(cursor, table_name, template=postgres_table_template_5);
instance.query(f"create table {table_name}(key Int32, value UUID) engine = PostgreSQL (postgres1, table={table_name})")
instance.query(f"insert into {table_name} select number, generateUUIDv4() from numbers(10)")
table_overrides = f" TABLE OVERRIDE {table_name} (COLUMNS (key Int32, value UUID))"
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{table_name}'"], table_overrides=table_overrides)
assert_nested_table_is_created(table_name, materialized_database)
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
settings=[f"materialized_postgresql_tables_list = '{table_name}'"],
table_overrides=table_overrides)
assert_nested_table_is_created(instance, table_name, materialized_database)
result = instance.query(f"show create table {materialized_database}.{table_name}")
print(result)
expected = "CREATE TABLE test_database.table_override\\n(\\n `key` Int32,\\n `value` UUID,\\n `_sign` Int8() MATERIALIZED 1,\\n `_version` UInt64() MATERIALIZED 1\\n)\\nENGINE = ReplacingMergeTree(_version)\\nORDER BY tuple(key)"
@ -621,9 +412,8 @@ def test_table_override(started_cluster):
time.sleep(5)
query = f"select * from {materialized_database}.{table_name} order by key"
expected = instance.query(f"select * from {table_name} order by key")
instance.query(f"drop table {table_name} no delay")
assert_eq_with_retry(instance, query, expected)
drop_materialized_db()
drop_postgres_table(cursor, table_name)
if __name__ == '__main__':