2021-02-08 23:23:51 +00:00
|
|
|
import pytest
|
|
|
|
import time
|
|
|
|
import psycopg2
|
|
|
|
import os.path as p
|
2021-02-18 23:33:01 +00:00
|
|
|
import random
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
|
|
from helpers.test_tools import TSV
|
|
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
2021-02-22 12:35:53 +00:00
|
|
|
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'], user_configs = ['configs/users.xml'], with_postgres=True)
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
postgres_table_template = """
|
|
|
|
CREATE TABLE IF NOT EXISTS {} (
|
2021-02-11 21:59:58 +00:00
|
|
|
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
2021-02-08 23:23:51 +00:00
|
|
|
"""
|
2021-02-18 23:33:01 +00:00
|
|
|
postgres_table_template_2 = """
|
|
|
|
CREATE TABLE IF NOT EXISTS {} (
|
|
|
|
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
|
|
|
|
"""
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
def get_postgres_conn(database=False):
|
|
|
|
if database == True:
|
|
|
|
conn_string = "host='localhost' dbname='postgres_database' user='postgres' password='mysecretpassword'"
|
|
|
|
else:
|
|
|
|
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
|
|
|
|
conn = psycopg2.connect(conn_string)
|
|
|
|
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
|
|
|
conn.autocommit = True
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
|
|
def create_postgres_db(cursor, name):
|
|
|
|
cursor.execute("CREATE DATABASE {}".format(name))
|
|
|
|
|
|
|
|
|
2021-02-18 23:33:01 +00:00
|
|
|
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
|
2021-02-12 18:21:55 +00:00
|
|
|
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
|
2021-02-18 23:33:01 +00:00
|
|
|
cursor.execute(template.format(table_name))
|
2021-02-18 18:14:05 +00:00
|
|
|
if replica_identity_full:
|
|
|
|
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(30)
|
2021-02-15 22:49:13 +00:00
|
|
|
def assert_nested_table_is_created(table_name):
|
|
|
|
database_tables = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
while table_name not in database_tables:
|
|
|
|
time.sleep(0.2)
|
|
|
|
database_tables = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
assert(table_name in database_tables)
|
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(30)
|
2021-02-12 10:05:13 +00:00
|
|
|
def check_tables_are_synchronized(table_name, order_by='key'):
|
2021-02-17 20:42:18 +00:00
|
|
|
assert_nested_table_is_created(table_name)
|
|
|
|
|
2021-02-12 10:05:13 +00:00
|
|
|
expected = instance.query('select * from postgres_database.{} order by {};'.format(table_name, order_by))
|
|
|
|
result = instance.query('select * from test_database.{} order by {};'.format(table_name, order_by))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
while result != expected:
|
|
|
|
time.sleep(0.5)
|
2021-02-12 10:05:13 +00:00
|
|
|
result = instance.query('select * from test_database.{} order by {};'.format(table_name, order_by))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
assert(result == expected)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def started_cluster():
|
|
|
|
try:
|
|
|
|
cluster.start()
|
|
|
|
conn = get_postgres_conn()
|
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_db(cursor, 'postgres_database')
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
|
|
instance.query('''
|
|
|
|
CREATE DATABASE postgres_database
|
|
|
|
ENGINE = PostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')''')
|
|
|
|
|
|
|
|
yield cluster
|
|
|
|
|
|
|
|
finally:
|
|
|
|
cluster.shutdown()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
2021-02-11 21:59:58 +00:00
|
|
|
def postgresql_setup_teardown():
|
2021-02-08 23:23:51 +00:00
|
|
|
yield # run test
|
|
|
|
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
|
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(120)
|
2021-02-12 18:21:55 +00:00
|
|
|
def test_load_and_sync_all_database_tables(started_cluster):
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
2021-02-08 23:23:51 +00:00
|
|
|
conn = get_postgres_conn(True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-15 22:49:13 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursor, table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-03-17 12:35:02 +00:00
|
|
|
instance.query("CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
|
2021-02-08 23:23:51 +00:00
|
|
|
assert 'test_database' in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-15 22:49:13 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
cursor.execute('drop table {};'.format(table_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''')
|
|
|
|
assert(int(result) == NUM_TABLES)
|
|
|
|
|
|
|
|
instance.query("DROP DATABASE test_database")
|
|
|
|
assert 'test_database' not in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(120)
|
2021-02-08 23:23:51 +00:00
|
|
|
def test_replicating_dml(started_cluster):
|
2021-02-12 18:21:55 +00:00
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
2021-02-08 23:23:51 +00:00
|
|
|
conn = get_postgres_conn(True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i))
|
|
|
|
|
|
|
|
instance.query(
|
2021-03-17 12:35:02 +00:00
|
|
|
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-11 21:59:58 +00:00
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(1000)".format(i, i))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-15 22:49:13 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('UPDATE postgresql_replica_{} SET value = {} * {} WHERE key < 50;'.format(i, i, i))
|
|
|
|
cursor.execute('UPDATE postgresql_replica_{} SET value = {} * {} * {} WHERE key >= 50;'.format(i, i, i, i))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('DELETE FROM postgresql_replica_{} WHERE (value*value + {}) % 2 = 0;'.format(i, i))
|
2021-02-12 10:05:13 +00:00
|
|
|
cursor.execute('UPDATE postgresql_replica_{} SET value = value - (value % 7) WHERE key > 128 AND key < 512;'.format(i))
|
|
|
|
cursor.execute('DELETE FROM postgresql_replica_{} WHERE key % 7 = 1;'.format(i, i))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table postgresql_replica_{};'.format(i))
|
|
|
|
|
|
|
|
instance.query("DROP DATABASE test_database")
|
|
|
|
assert 'test_database' not in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(120)
|
2021-02-12 10:05:13 +00:00
|
|
|
def test_different_data_types(started_cluster):
|
2021-02-12 18:21:55 +00:00
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
2021-02-12 10:05:13 +00:00
|
|
|
conn = get_postgres_conn(True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('drop table if exists test_data_types;')
|
|
|
|
cursor.execute('drop table if exists test_array_data_type;')
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
|
'''CREATE TABLE test_data_types (
|
|
|
|
id integer PRIMARY KEY, a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
|
|
|
|
h timestamp, i date, j decimal(5, 5), k numeric(5, 5))''')
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
|
'''CREATE TABLE test_array_data_type
|
|
|
|
(
|
|
|
|
key Integer NOT NULL PRIMARY KEY,
|
|
|
|
a Date[] NOT NULL, -- Date
|
|
|
|
b Timestamp[] NOT NULL, -- DateTime
|
|
|
|
c real[][] NOT NULL, -- Float32
|
|
|
|
d double precision[][] NOT NULL, -- Float64
|
|
|
|
e decimal(5, 5)[][][] NOT NULL, -- Decimal32
|
|
|
|
f integer[][][] NOT NULL, -- Int32
|
|
|
|
g Text[][][][][] NOT NULL, -- String
|
|
|
|
h Integer[][][], -- Nullable(Int32)
|
|
|
|
i Char(2)[][][][], -- Nullable(String)
|
|
|
|
k Char(2)[] -- Nullable(String)
|
|
|
|
)''')
|
|
|
|
|
|
|
|
instance.query(
|
2021-03-17 12:35:02 +00:00
|
|
|
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
|
2021-02-12 10:05:13 +00:00
|
|
|
|
|
|
|
for i in range(10):
|
|
|
|
instance.query('''
|
|
|
|
INSERT INTO postgres_database.test_data_types VALUES
|
|
|
|
({}, -32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12', '2000-05-12', 0.2, 0.2)'''.format(i))
|
|
|
|
|
|
|
|
check_tables_are_synchronized('test_data_types', 'id');
|
|
|
|
result = instance.query('SELECT * FROM test_database.test_data_types ORDER BY id LIMIT 1;')
|
|
|
|
assert(result == '0\t-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12\t2000-05-12\t0.20000\t0.20000\n')
|
2021-02-18 23:33:01 +00:00
|
|
|
|
|
|
|
for i in range(10):
|
|
|
|
col = random.choice(['a', 'b', 'c'])
|
|
|
|
cursor.execute('UPDATE test_data_types SET {} = {};'.format(col, i))
|
|
|
|
cursor.execute('''UPDATE test_data_types SET i = '2020-12-12';'''.format(col, i))
|
|
|
|
|
|
|
|
check_tables_are_synchronized('test_data_types', 'id');
|
2021-02-12 10:05:13 +00:00
|
|
|
cursor.execute('drop table test_data_types;')
|
|
|
|
|
|
|
|
instance.query("INSERT INTO postgres_database.test_array_data_type "
|
|
|
|
"VALUES ("
|
|
|
|
"0, "
|
|
|
|
"['2000-05-12', '2000-05-12'], "
|
|
|
|
"['2000-05-12 12:12:12', '2000-05-12 12:12:12'], "
|
|
|
|
"[[1.12345], [1.12345], [1.12345]], "
|
|
|
|
"[[1.1234567891], [1.1234567891], [1.1234567891]], "
|
|
|
|
"[[[0.11111, 0.11111]], [[0.22222, 0.22222]], [[0.33333, 0.33333]]], "
|
|
|
|
"[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
|
|
|
|
"[[[[['winx', 'winx', 'winx']]]]], "
|
|
|
|
"[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]], "
|
|
|
|
"[[[[NULL]]]], "
|
|
|
|
"[]"
|
|
|
|
")")
|
|
|
|
|
|
|
|
expected = (
|
|
|
|
"0\t" +
|
|
|
|
"['2000-05-12','2000-05-12']\t" +
|
|
|
|
"['2000-05-12 12:12:12','2000-05-12 12:12:12']\t" +
|
|
|
|
"[[1.12345],[1.12345],[1.12345]]\t" +
|
|
|
|
"[[1.1234567891],[1.1234567891],[1.1234567891]]\t" +
|
|
|
|
"[[[0.11111,0.11111]],[[0.22222,0.22222]],[[0.33333,0.33333]]]\t"
|
|
|
|
"[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t"
|
|
|
|
"[[[[['winx','winx','winx']]]]]\t"
|
|
|
|
"[[[1,NULL],[NULL,1]],[[NULL,NULL],[NULL,NULL]],[[4,4],[5,5]]]\t"
|
|
|
|
"[[[[NULL]]]]\t"
|
|
|
|
"[]\n"
|
|
|
|
)
|
|
|
|
|
|
|
|
check_tables_are_synchronized('test_array_data_type');
|
|
|
|
result = instance.query('SELECT * FROM test_database.test_array_data_type ORDER BY key;')
|
|
|
|
instance.query("DROP DATABASE test_database")
|
|
|
|
assert(result == expected)
|
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(120)
|
2021-02-12 18:21:55 +00:00
|
|
|
def test_load_and_sync_subset_of_database_tables(started_cluster):
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
|
|
conn = get_postgres_conn(True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 10
|
|
|
|
|
|
|
|
publication_tables = ''
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, number from numbers(50)".format(i))
|
|
|
|
|
2021-02-15 22:49:13 +00:00
|
|
|
if i < int(NUM_TABLES/2):
|
2021-02-12 18:21:55 +00:00
|
|
|
if publication_tables != '':
|
|
|
|
publication_tables += ', '
|
|
|
|
publication_tables += table_name
|
|
|
|
|
|
|
|
instance.query('''
|
|
|
|
CREATE DATABASE test_database
|
2021-03-17 12:35:02 +00:00
|
|
|
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
|
2021-02-22 12:35:53 +00:00
|
|
|
SETTINGS postgresql_replica_tables_list = '{}';
|
2021-02-12 18:21:55 +00:00
|
|
|
'''.format(publication_tables))
|
|
|
|
assert 'test_database' in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
2021-02-15 22:49:13 +00:00
|
|
|
for i in range(int(NUM_TABLES/2)):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
assert_nested_table_is_created(table_name)
|
|
|
|
|
2021-02-12 18:21:55 +00:00
|
|
|
result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''')
|
2021-02-15 22:49:13 +00:00
|
|
|
assert(int(result) == int(NUM_TABLES/2))
|
2021-02-12 18:21:55 +00:00
|
|
|
|
|
|
|
database_tables = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
2021-02-15 22:49:13 +00:00
|
|
|
if i < int(NUM_TABLES/2):
|
2021-02-12 18:21:55 +00:00
|
|
|
assert table_name in database_tables
|
|
|
|
else:
|
|
|
|
assert table_name not in database_tables
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT 50 + number, {} from numbers(100)".format(table_name, i))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
2021-02-15 22:49:13 +00:00
|
|
|
if i < int(NUM_TABLES/2):
|
2021-02-12 18:21:55 +00:00
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
cursor.execute('drop table {};'.format(table_name))
|
|
|
|
|
|
|
|
instance.query("DROP DATABASE test_database")
|
|
|
|
assert 'test_database' not in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
|
2021-03-20 15:27:13 +00:00
|
|
|
@pytest.mark.timeout(120)
|
2021-02-18 23:33:01 +00:00
|
|
|
def test_table_schema_changes(started_cluster):
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
|
|
conn = get_postgres_conn(True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2);
|
2021-02-19 10:40:59 +00:00
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i))
|
2021-02-18 23:33:01 +00:00
|
|
|
|
|
|
|
instance.query(
|
2021-02-22 12:35:53 +00:00
|
|
|
"""CREATE DATABASE test_database
|
2021-03-17 12:35:02 +00:00
|
|
|
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
|
2021-02-22 12:35:53 +00:00
|
|
|
SETTINGS postgresql_replica_allow_minimal_ddl = 1;
|
|
|
|
""")
|
2021-02-18 23:33:01 +00:00
|
|
|
|
2021-02-19 10:40:59 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i))
|
|
|
|
|
2021-02-18 23:33:01 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
2021-02-19 10:40:59 +00:00
|
|
|
expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key");
|
2021-02-21 22:41:18 +00:00
|
|
|
|
|
|
|
altered_table = random.randint(0, 4)
|
|
|
|
cursor.execute("ALTER TABLE postgresql_replica_{} DROP COLUMN value2".format(altered_table))
|
2021-02-18 23:33:01 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute("INSERT INTO postgresql_replica_{} VALUES (50, {}, {})".format(i, i, i))
|
|
|
|
cursor.execute("UPDATE postgresql_replica_{} SET value3 = 12 WHERE key%2=0".format(i))
|
|
|
|
|
2021-02-19 10:40:59 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
2021-02-18 23:33:01 +00:00
|
|
|
|
2021-02-21 22:41:18 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
if i != altered_table:
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(i, i, i, i))
|
|
|
|
else:
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
2021-02-22 12:35:53 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table postgresql_replica_{};'.format(i))
|
|
|
|
|
|
|
|
instance.query("DROP DATABASE test_database")
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.timeout(120)
|
|
|
|
def test_changing_replica_identity_value(started_cluster):
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
|
|
conn = get_postgres_conn(True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica');
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)")
|
|
|
|
|
|
|
|
instance.query(
|
2021-03-17 12:35:02 +00:00
|
|
|
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
|
2021-02-22 12:35:53 +00:00
|
|
|
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)")
|
|
|
|
check_tables_are_synchronized('postgresql_replica');
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET key=key-25 WHERE key<100 ")
|
|
|
|
check_tables_are_synchronized('postgresql_replica');
|
|
|
|
|
2021-02-18 23:33:01 +00:00
|
|
|
|
2021-02-08 23:23:51 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
cluster.start()
|
|
|
|
input("Cluster created, press any key to destroy...")
|
|
|
|
cluster.shutdown()
|