mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Improve tests
This commit is contained in:
parent
90972e3752
commit
b0530682e4
257
tests/integration/helpers/postgres_utility.py
Normal file
257
tests/integration/helpers/postgres_utility.py
Normal file
@ -0,0 +1,257 @@
|
||||
import psycopg2
|
||||
import time
|
||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
||||
|
||||
postgres_table_template = """
|
||||
CREATE TABLE IF NOT EXISTS "{}" (
|
||||
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
||||
"""
|
||||
postgres_table_template_2 = """
|
||||
CREATE TABLE IF NOT EXISTS "{}" (
|
||||
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
|
||||
"""
|
||||
postgres_table_template_3 = """
|
||||
CREATE TABLE IF NOT EXISTS "{}" (
|
||||
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
|
||||
"""
|
||||
postgres_table_template_4 = """
|
||||
CREATE TABLE IF NOT EXISTS "{}"."{}" (
|
||||
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
||||
"""
|
||||
|
||||
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
|
||||
if database == True:
|
||||
conn_string = f"host={ip} port={port} dbname='{database_name}' user='postgres' password='mysecretpassword'"
|
||||
else:
|
||||
conn_string = f"host={ip} port={port} user='postgres' password='mysecretpassword'"
|
||||
|
||||
if replication:
|
||||
conn_string += " replication='database'"
|
||||
|
||||
conn = psycopg2.connect(conn_string)
|
||||
if auto_commit:
|
||||
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
conn.autocommit = True
|
||||
return conn
|
||||
|
||||
def create_replication_slot(conn, slot_name='user_slot'):
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f'CREATE_REPLICATION_SLOT {slot_name} LOGICAL pgoutput EXPORT_SNAPSHOT')
|
||||
result = cursor.fetchall()
|
||||
print(result[0][0]) # slot name
|
||||
print(result[0][1]) # start lsn
|
||||
print(result[0][2]) # snapshot
|
||||
return result[0][2]
|
||||
|
||||
def drop_replication_slot(conn, slot_name='user_slot'):
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"select pg_drop_replication_slot('{slot_name}')")
|
||||
|
||||
|
||||
def create_postgres_schema(cursor, schema_name):
|
||||
drop_postgres_schema(cursor, schema_name)
|
||||
cursor.execute(f'CREATE SCHEMA {schema_name}')
|
||||
|
||||
def drop_postgres_schema(cursor, schema_name):
|
||||
cursor.execute(f'DROP SCHEMA IF EXISTS {schema_name} CASCADE')
|
||||
|
||||
|
||||
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
|
||||
drop_postgres_table(cursor, table_name)
|
||||
cursor.execute(template.format(table_name))
|
||||
if replica_identity_full:
|
||||
cursor.execute(f'ALTER TABLE {table_name} REPLICA IDENTITY FULL;')
|
||||
|
||||
def drop_postgres_table(cursor, table_name):
|
||||
cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}" """)
|
||||
|
||||
|
||||
def create_postgres_table_with_schema(cursor, schema_name, table_name):
|
||||
drop_postgres_table_with_schema(cursor, schema_name, table_name)
|
||||
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
|
||||
|
||||
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
|
||||
cursor.execute(f"""DROP TABLE IF EXISTS "{schema_name}"."{table_name}" """)
|
||||
|
||||
|
||||
class PostgresManager:
|
||||
def __init__(self):
|
||||
self.created_postgres_db_list = set()
|
||||
self.created_materialized_postgres_db_list = set()
|
||||
self.created_ch_postgres_db_list = set()
|
||||
|
||||
def init(self, instance, ip, port):
|
||||
self.instance = instance
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.prepare()
|
||||
|
||||
def restart(self):
|
||||
try:
|
||||
self.clear()
|
||||
self.prepare()
|
||||
except Exception as ex:
|
||||
self.prepare()
|
||||
raise ex
|
||||
|
||||
def prepare(self):
|
||||
conn = get_postgres_conn(ip=self.ip, port=self.port)
|
||||
cursor = conn.cursor()
|
||||
self.create_postgres_db(cursor, 'postgres_database')
|
||||
self.create_clickhouse_postgres_db(ip=self.ip, port=self.port)
|
||||
|
||||
def clear(self):
|
||||
for db in self.created_materialized_postgres_db_list.copy():
|
||||
self.drop_materialized_db(db);
|
||||
for db in self.created_ch_postgres_db_list.copy():
|
||||
self.drop_clickhouse_postgres_db(db)
|
||||
if len(self.created_postgres_db_list) > 0:
|
||||
conn = get_postgres_conn(ip=self.ip, port=self.port)
|
||||
cursor = conn.cursor()
|
||||
for db in self.created_postgres_db_list.copy():
|
||||
self.drop_postgres_db(cursor, db)
|
||||
|
||||
def create_postgres_db(self, cursor, name='postgres_database'):
|
||||
self.drop_postgres_db(cursor, name)
|
||||
self.created_postgres_db_list.add(name)
|
||||
cursor.execute(f"CREATE DATABASE {name}")
|
||||
|
||||
def drop_postgres_db(self, cursor, name='postgres_database'):
|
||||
cursor.execute(f"DROP DATABASE IF EXISTS {name}")
|
||||
if name in self.created_postgres_db_list:
|
||||
self.created_postgres_db_list.remove(name)
|
||||
|
||||
def create_clickhouse_postgres_db(self, ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
|
||||
self.drop_clickhouse_postgres_db(name)
|
||||
self.created_ch_postgres_db_list.add(name)
|
||||
|
||||
if len(schema_name) == 0:
|
||||
self.instance.query(f'''
|
||||
CREATE DATABASE {name}
|
||||
ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword')''')
|
||||
else:
|
||||
self.instance.query(f'''
|
||||
CREATE DATABASE {name}
|
||||
ENGINE = PostgreSQL('{ip}:{port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')''')
|
||||
|
||||
def drop_clickhouse_postgres_db(self, name='postgres_database'):
|
||||
self.instance.query(f'DROP DATABASE IF EXISTS {name}')
|
||||
if name in self.created_ch_postgres_db_list:
|
||||
self.created_ch_postgres_db_list.remove(name)
|
||||
|
||||
|
||||
def create_materialized_db(self, ip, port,
|
||||
materialized_database='test_database', postgres_database='postgres_database', settings=[]):
|
||||
self.created_materialized_postgres_db_list.add(materialized_database)
|
||||
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
|
||||
|
||||
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
|
||||
if len(settings) > 0:
|
||||
create_query += " SETTINGS "
|
||||
for i in range(len(settings)):
|
||||
if i != 0:
|
||||
create_query += ', '
|
||||
create_query += settings[i]
|
||||
self.instance.query(create_query)
|
||||
assert materialized_database in self.instance.query('SHOW DATABASES')
|
||||
|
||||
def drop_materialized_db(self, materialized_database='test_database'):
|
||||
self.instance.query(f'DROP DATABASE IF EXISTS {materialized_database} NO DELAY')
|
||||
if materialized_database in self.created_materialized_postgres_db_list:
|
||||
self.created_materialized_postgres_db_list.remove(materialized_database)
|
||||
assert materialized_database not in self.instance.query('SHOW DATABASES')
|
||||
|
||||
def create_and_fill_postgres_table(self, table_name):
|
||||
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
|
||||
cursor = conn.cursor()
|
||||
create_postgres_table(cursor, table_name);
|
||||
self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers(50)")
|
||||
|
||||
def create_and_fill_postgres_tables(self, tables_num, numbers=50):
|
||||
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
|
||||
cursor = conn.cursor()
|
||||
self.create_and_fill_postgres_tables_from_cursor(cursor, tables_num, numbers=numbers)
|
||||
|
||||
def create_and_fill_postgres_tables_from_cursor(self, cursor, tables_num, numbers=50):
|
||||
for i in range(tables_num):
|
||||
table_name = f'postgresql_replica_{i}'
|
||||
create_postgres_table(cursor, table_name);
|
||||
if numbers > 0:
|
||||
self.instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers({numbers})")
|
||||
|
||||
|
||||
queries = [
|
||||
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
|
||||
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
|
||||
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
|
||||
"UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0",
|
||||
'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);',
|
||||
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
|
||||
'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;',
|
||||
"UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1",
|
||||
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
|
||||
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
|
||||
'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);',
|
||||
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
|
||||
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
|
||||
"UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1",
|
||||
'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);',
|
||||
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
|
||||
"UPDATE postgresql_replica_{} SET key=key+10000000",
|
||||
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
|
||||
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
|
||||
]
|
||||
|
||||
|
||||
def assert_nested_table_is_created(instance, table_name, materialized_database='test_database', schema_name=''):
|
||||
if len(schema_name) == 0:
|
||||
table = table_name
|
||||
else:
|
||||
table = schema_name + "." + table_name
|
||||
|
||||
print(f'Checking table {table} exists in {materialized_database}')
|
||||
database_tables = instance.query(f'SHOW TABLES FROM {materialized_database}')
|
||||
|
||||
while table not in database_tables:
|
||||
time.sleep(0.2)
|
||||
database_tables = instance.query(f'SHOW TABLES FROM {materialized_database}')
|
||||
|
||||
assert(table in database_tables)
|
||||
|
||||
|
||||
def assert_number_of_columns(instance, expected, table_name, database_name='test_database'):
|
||||
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
|
||||
while (int(result) != expected):
|
||||
time.sleep(1)
|
||||
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
|
||||
print('Number of columns ok')
|
||||
|
||||
|
||||
def check_tables_are_synchronized(instance, table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
|
||||
assert_nested_table_is_created(instance, table_name, materialized_database, schema_name)
|
||||
|
||||
table_path = ''
|
||||
if len(schema_name) == 0:
|
||||
table_path = f'{materialized_database}.{table_name}'
|
||||
else:
|
||||
table_path = f'{materialized_database}.`{schema_name}.{table_name}`'
|
||||
|
||||
print(f"Checking table is synchronized: {table_path}")
|
||||
result_query = f'select * from {table_path} order by {order_by};'
|
||||
|
||||
expected = instance.query(f'select * from {postgres_database}.{table_name} order by {order_by};')
|
||||
result = instance.query(result_query)
|
||||
|
||||
for _ in range(120):
|
||||
if result == expected:
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
result = instance.query(result_query)
|
||||
|
||||
assert(result == expected)
|
||||
|
||||
|
||||
def check_several_tables_are_synchronized(instance, tables_num, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
|
||||
for i in range(tables_num):
|
||||
check_tables_are_synchronized(instance, f'postgresql_replica_{i}');
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user