This commit is contained in:
kssenii 2021-10-08 23:52:15 +03:00
parent f0e3122507
commit e42a687b80

View File

@ -77,7 +77,8 @@ def create_materialized_db(ip, port,
materialized_database='test_database', materialized_database='test_database',
postgres_database='postgres_database', postgres_database='postgres_database',
settings=[]): settings=[]):
create_query = "CREATE DATABASE {} ENGINE = MaterializedPostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, ip, port, postgres_database) instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', 'postgres', 'mysecretpassword')"
if len(settings) > 0: if len(settings) > 0:
create_query += " SETTINGS " create_query += " SETTINGS "
for i in range(len(settings)): for i in range(len(settings)):
@ -131,6 +132,14 @@ def assert_nested_table_is_created(table_name, materialized_database='test_datab
assert(table_name in database_tables) assert(table_name in database_tables)
@pytest.mark.timeout(320)
def assert_number_of_columns(expected, table_name, database_name='test_database'):
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
while (int(result) != expected):
time.sleep(1)
result = instance.query(f"select count() from system.columns where table = '{table_name}' and database = '{database_name}' and not startsWith(name, '_')")
@pytest.mark.timeout(320) @pytest.mark.timeout(320)
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database'): def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database'):
assert_nested_table_is_created(table_name, materialized_database) assert_nested_table_is_created(table_name, materialized_database)
@ -479,27 +488,30 @@ def test_table_schema_changes(started_cluster):
expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key"); expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key");
altered_table = random.randint(0, 4) altered_idx = random.randint(0, 4)
cursor.execute("ALTER TABLE postgresql_replica_{} DROP COLUMN value2".format(altered_table)) altered_table = f'postgresql_replica_{altered_idx}'
cursor.execute(f"ALTER TABLE {altered_table} DROP COLUMN value2")
for i in range(NUM_TABLES): for i in range(NUM_TABLES):
cursor.execute("INSERT INTO postgresql_replica_{} VALUES (50, {}, {})".format(i, i, i)) cursor.execute(f"INSERT INTO postgresql_replica_{i} VALUES (50, {i}, {i})")
cursor.execute("UPDATE postgresql_replica_{} SET value3 = 12 WHERE key%2=0".format(i)) cursor.execute(f"UPDATE {altered_table} SET value3 = 12 WHERE key%2=0")
assert_nested_table_is_created('postgresql_replica_{}'.format(altered_table)) time.sleep(2)
check_tables_are_synchronized('postgresql_replica_{}'.format(altered_table)) assert_nested_table_is_created(altered_table)
assert_number_of_columns(3, altered_table)
check_tables_are_synchronized(altered_table)
print('check1 OK') print('check1 OK')
for i in range(NUM_TABLES): for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i)); check_tables_are_synchronized('postgresql_replica_{}'.format(i));
for i in range(NUM_TABLES): for i in range(NUM_TABLES):
if i != altered_table: if i != altered_idx:
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(i, i, i, i)) instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(i, i, i, i))
else: else:
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i)) instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i))
check_tables_are_synchronized('postgresql_replica_{}'.format(altered_table)); check_tables_are_synchronized(altered_table);
print('check2 OK') print('check2 OK')
for i in range(NUM_TABLES): for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i)); check_tables_are_synchronized('postgresql_replica_{}'.format(i));
@ -645,6 +657,7 @@ def test_virtual_columns(started_cluster):
cursor.execute("ALTER TABLE postgresql_replica_0 ADD COLUMN value2 integer") cursor.execute("ALTER TABLE postgresql_replica_0 ADD COLUMN value2 integer")
instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(10, 10)") instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(10, 10)")
assert_number_of_columns(3, 'postgresql_replica_0')
check_tables_are_synchronized('postgresql_replica_0'); check_tables_are_synchronized('postgresql_replica_0');
result = instance.query('SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0;') result = instance.query('SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0;')