2021-02-08 23:23:51 +00:00
|
|
|
import pytest
|
|
|
|
import time
|
|
|
|
import psycopg2
|
|
|
|
import os.path as p
|
2021-02-18 23:33:01 +00:00
|
|
|
import random
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
|
|
from helpers.test_tools import TSV
|
|
|
|
|
2021-05-10 11:31:06 +00:00
|
|
|
from random import randrange
|
|
|
|
import threading
|
|
|
|
|
2021-02-08 23:23:51 +00:00
|
|
|
cluster = ClickHouseCluster(__file__)
|
2021-04-08 20:39:56 +00:00
|
|
|
instance = cluster.add_instance('instance',
|
2021-04-10 14:42:45 +00:00
|
|
|
main_configs = ['configs/log_conf.xml'],
|
2021-04-08 20:39:56 +00:00
|
|
|
user_configs = ['configs/users.xml'],
|
|
|
|
with_postgres=True, stay_alive=True)
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
postgres_table_template = """
|
2021-08-31 20:58:00 +00:00
|
|
|
CREATE TABLE IF NOT EXISTS "{}" (
|
2021-02-11 21:59:58 +00:00
|
|
|
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
2021-02-08 23:23:51 +00:00
|
|
|
"""
|
2021-02-18 23:33:01 +00:00
|
|
|
postgres_table_template_2 = """
|
2021-08-31 20:58:00 +00:00
|
|
|
CREATE TABLE IF NOT EXISTS "{}" (
|
2021-02-18 23:33:01 +00:00
|
|
|
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
|
|
|
|
"""
|
2021-04-10 14:42:45 +00:00
|
|
|
postgres_table_template_3 = """
|
2021-08-31 20:58:00 +00:00
|
|
|
CREATE TABLE IF NOT EXISTS "{}" (
|
2021-04-10 14:42:45 +00:00
|
|
|
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
|
|
|
|
"""
|
2021-09-12 12:33:54 +00:00
|
|
|
postgres_table_template_4 = """
|
|
|
|
CREATE TABLE IF NOT EXISTS "{}"."{}" (
|
|
|
|
key Integer NOT NULL, value Integer, PRIMARY KEY(key))
|
|
|
|
"""
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-09-04 10:07:59 +00:00
|
|
|
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
|
2021-02-08 23:23:51 +00:00
|
|
|
if database == True:
|
2021-06-03 19:45:27 +00:00
|
|
|
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
|
2021-02-08 23:23:51 +00:00
|
|
|
else:
|
2021-06-03 19:45:27 +00:00
|
|
|
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
|
|
|
|
|
2021-09-04 10:07:59 +00:00
|
|
|
if replication:
|
|
|
|
conn_string += " replication='database'"
|
|
|
|
|
2021-02-08 23:23:51 +00:00
|
|
|
conn = psycopg2.connect(conn_string)
|
2021-05-10 14:51:17 +00:00
|
|
|
if auto_commit:
|
|
|
|
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
|
|
|
conn.autocommit = True
|
2021-02-08 23:23:51 +00:00
|
|
|
return conn
|
|
|
|
|
2021-09-04 10:07:59 +00:00
|
|
|
def create_replication_slot(conn, slot_name='user_slot'):
|
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name))
|
|
|
|
result = cursor.fetchall()
|
|
|
|
print(result[0][0]) # slot name
|
|
|
|
print(result[0][1]) # start lsn
|
|
|
|
print(result[0][2]) # snapshot
|
|
|
|
return result[0][2]
|
|
|
|
|
2021-09-04 20:55:59 +00:00
|
|
|
def drop_replication_slot(conn, slot_name='user_slot'):
|
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
def create_postgres_db(cursor, name='postgres_database'):
|
2021-02-08 23:23:51 +00:00
|
|
|
cursor.execute("CREATE DATABASE {}".format(name))
|
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
def drop_postgres_db(cursor, name='postgres_database'):
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
|
2021-05-23 12:09:20 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
def drop_postgres_schema(cursor, schema_name):
|
|
|
|
cursor.execute('DROP SCHEMA IF EXISTS {} CASCADE'.format(schema_name))
|
|
|
|
|
|
|
|
def create_postgres_schema(cursor, schema_name):
|
|
|
|
drop_postgres_schema(cursor, schema_name)
|
|
|
|
cursor.execute('CREATE SCHEMA {}'.format(schema_name))
|
|
|
|
|
2021-09-12 12:33:54 +00:00
|
|
|
def create_clickhouse_postgres_db(ip, port, name='postgres_database', database_name='postgres_database', schema_name=''):
|
|
|
|
drop_clickhouse_postgres_db(name)
|
|
|
|
if len(schema_name) == 0:
|
|
|
|
instance.query('''
|
|
|
|
CREATE DATABASE {}
|
|
|
|
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')'''.format(name, ip, port, database_name))
|
|
|
|
else:
|
|
|
|
instance.query('''
|
|
|
|
CREATE DATABASE {}
|
|
|
|
ENGINE = PostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword', '{}')'''.format(name, ip, port, database_name, schema_name))
|
2021-05-23 12:09:20 +00:00
|
|
|
|
|
|
|
def drop_clickhouse_postgres_db(name='postgres_database'):
|
2021-09-12 12:33:54 +00:00
|
|
|
instance.query('DROP DATABASE IF EXISTS {}'.format(name))
|
2021-06-03 19:45:27 +00:00
|
|
|
|
|
|
|
def create_materialized_db(ip, port,
|
|
|
|
materialized_database='test_database',
|
|
|
|
postgres_database='postgres_database',
|
|
|
|
settings=[]):
|
2021-06-27 19:09:17 +00:00
|
|
|
create_query = "CREATE DATABASE {} ENGINE = MaterializedPostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, ip, port, postgres_database)
|
2021-06-03 19:45:27 +00:00
|
|
|
if len(settings) > 0:
|
|
|
|
create_query += " SETTINGS "
|
|
|
|
for i in range(len(settings)):
|
|
|
|
if i != 0:
|
|
|
|
create_query += ', '
|
|
|
|
create_query += settings[i]
|
|
|
|
instance.query(create_query)
|
2021-05-23 12:09:20 +00:00
|
|
|
assert materialized_database in instance.query('SHOW DATABASES')
|
|
|
|
|
2021-07-04 08:54:41 +00:00
|
|
|
def drop_materialized_db(materialized_database='test_database'):
|
|
|
|
instance.query('DROP DATABASE IF EXISTS {}'.format(materialized_database))
|
2021-05-23 12:09:20 +00:00
|
|
|
assert materialized_database not in instance.query('SHOW DATABASES')
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-08-31 20:58:00 +00:00
|
|
|
def drop_postgres_table(cursor, table_name):
|
2021-09-01 14:32:09 +00:00
|
|
|
cursor.execute("""DROP TABLE IF EXISTS "{}" """.format(table_name))
|
2021-08-31 20:58:00 +00:00
|
|
|
|
2021-09-12 12:33:54 +00:00
|
|
|
def drop_postgres_table_with_schema(cursor, schema_name, table_name):
|
|
|
|
cursor.execute("""DROP TABLE IF EXISTS "{}"."{}" """.format(schema_name, table_name))
|
|
|
|
|
2021-02-18 23:33:01 +00:00
|
|
|
def create_postgres_table(cursor, table_name, replica_identity_full=False, template=postgres_table_template):
|
2021-08-31 20:58:00 +00:00
|
|
|
drop_postgres_table(cursor, table_name)
|
2021-02-18 23:33:01 +00:00
|
|
|
cursor.execute(template.format(table_name))
|
2021-02-18 18:14:05 +00:00
|
|
|
if replica_identity_full:
|
|
|
|
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-09-12 12:33:54 +00:00
|
|
|
def create_postgres_table_with_schema(cursor, schema_name, table_name):
|
|
|
|
drop_postgres_table_with_schema(cursor, schema_name, table_name)
|
|
|
|
cursor.execute(postgres_table_template_4.format(schema_name, table_name))
|
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
queries = [
|
|
|
|
'INSERT INTO postgresql_replica_{} select i, i from generate_series(0, 10000) as t(i);',
|
|
|
|
'DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
|
|
|
|
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
|
|
|
|
"UPDATE postgresql_replica_{} SET key=key+20000 WHERE key%2=0",
|
|
|
|
'INSERT INTO postgresql_replica_{} select i, i from generate_series(40000, 50000) as t(i);',
|
|
|
|
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
|
|
|
|
'UPDATE postgresql_replica_{} SET value = value + 101 WHERE key % 2 = 1;',
|
|
|
|
"UPDATE postgresql_replica_{} SET key=key+80000 WHERE key%2=1",
|
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
|
|
|
|
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
|
|
|
|
'INSERT INTO postgresql_replica_{} select i, i from generate_series(200000, 250000) as t(i);',
|
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
|
|
|
|
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
|
|
|
|
"UPDATE postgresql_replica_{} SET key=key+500000 WHERE key%2=1",
|
|
|
|
'INSERT INTO postgresql_replica_{} select i, i from generate_series(1000000, 1050000) as t(i);',
|
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
|
|
|
|
"UPDATE postgresql_replica_{} SET key=key+10000000",
|
|
|
|
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
|
2021-07-01 07:33:58 +00:00
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;'
|
|
|
|
]
|
2021-05-23 12:09:20 +00:00
|
|
|
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-09-12 12:33:54 +00:00
|
|
|
def assert_nested_table_is_created(table_name, materialized_database='test_database', schema_name=''):
|
|
|
|
if len(schema_name) == 0:
|
|
|
|
table = table_name
|
|
|
|
else:
|
|
|
|
table = schema_name + "." + table_name
|
|
|
|
print('Checking table exists:', table)
|
2021-05-23 12:09:20 +00:00
|
|
|
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
|
2021-09-12 12:33:54 +00:00
|
|
|
while table not in database_tables:
|
2021-02-15 22:49:13 +00:00
|
|
|
time.sleep(0.2)
|
2021-05-23 12:09:20 +00:00
|
|
|
database_tables = instance.query('SHOW TABLES FROM {}'.format(materialized_database))
|
2021-09-12 12:33:54 +00:00
|
|
|
assert(table in database_tables)
|
2021-02-15 22:49:13 +00:00
|
|
|
|
|
|
|
|
2021-08-11 12:29:03 +00:00
|
|
|
@pytest.mark.timeout(320)
|
2021-09-12 12:33:54 +00:00
|
|
|
def check_tables_are_synchronized(table_name, order_by='key', postgres_database='postgres_database', materialized_database='test_database', schema_name=''):
|
|
|
|
assert_nested_table_is_created(table_name, materialized_database, schema_name)
|
2021-02-17 20:42:18 +00:00
|
|
|
|
2021-09-12 12:33:54 +00:00
|
|
|
print("Checking table is synchronized:", table_name)
|
2021-06-27 19:09:17 +00:00
|
|
|
expected = instance.query('select * from {}.{} order by {};'.format(postgres_database, table_name, order_by))
|
2021-09-12 12:33:54 +00:00
|
|
|
if len(schema_name) == 0:
|
|
|
|
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
|
|
|
|
else:
|
|
|
|
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-06-27 19:09:17 +00:00
|
|
|
while result != expected:
|
|
|
|
time.sleep(0.5)
|
2021-09-12 12:33:54 +00:00
|
|
|
if len(schema_name) == 0:
|
|
|
|
result = instance.query('select * from {}.{} order by {};'.format(materialized_database, table_name, order_by))
|
|
|
|
else:
|
|
|
|
result = instance.query('select * from {}.`{}.{}` order by {};'.format(materialized_database, schema_name, table_name, order_by))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-06-27 19:09:17 +00:00
|
|
|
assert(result == expected)
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def started_cluster():
|
|
|
|
try:
|
|
|
|
cluster.start()
|
2021-09-09 09:18:08 +00:00
|
|
|
conn = get_postgres_conn(ip=cluster.postgres_ip, port=cluster.postgres_port)
|
2021-02-08 23:23:51 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_db(cursor, 'postgres_database')
|
2021-09-09 09:18:08 +00:00
|
|
|
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port)
|
2021-05-23 12:09:20 +00:00
|
|
|
|
2021-02-08 23:23:51 +00:00
|
|
|
instance.query("DROP DATABASE IF EXISTS test_database")
|
|
|
|
yield cluster
|
|
|
|
|
|
|
|
finally:
|
|
|
|
cluster.shutdown()
|
|
|
|
|
|
|
|
|
2021-02-12 18:21:55 +00:00
|
|
|
def test_load_and_sync_all_database_tables(started_cluster):
|
2021-07-04 08:54:41 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-02-08 23:23:51 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-15 22:49:13 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursor, table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-02-08 23:23:51 +00:00
|
|
|
assert 'test_database' in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-15 22:49:13 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
cursor.execute('drop table {};'.format(table_name))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''')
|
|
|
|
assert(int(result) == NUM_TABLES)
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_replicating_dml(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-02-08 23:23:51 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i))
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-11 21:59:58 +00:00
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(1000)".format(i, i))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-02-15 22:49:13 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('UPDATE postgresql_replica_{} SET value = {} * {} WHERE key < 50;'.format(i, i, i))
|
|
|
|
cursor.execute('UPDATE postgresql_replica_{} SET value = {} * {} * {} WHERE key >= 50;'.format(i, i, i, i))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('DELETE FROM postgresql_replica_{} WHERE (value*value + {}) % 2 = 0;'.format(i, i))
|
2021-02-12 10:05:13 +00:00
|
|
|
cursor.execute('UPDATE postgresql_replica_{} SET value = value - (value % 7) WHERE key > 128 AND key < 512;'.format(i))
|
|
|
|
cursor.execute('DELETE FROM postgresql_replica_{} WHERE key % 7 = 1;'.format(i, i))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-02-08 23:23:51 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-02-08 23:23:51 +00:00
|
|
|
|
|
|
|
|
2021-02-12 10:05:13 +00:00
|
|
|
def test_different_data_types(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-02-12 10:05:13 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('drop table if exists test_data_types;')
|
|
|
|
cursor.execute('drop table if exists test_array_data_type;')
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
|
'''CREATE TABLE test_data_types (
|
|
|
|
id integer PRIMARY KEY, a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
|
|
|
|
h timestamp, i date, j decimal(5, 5), k numeric(5, 5))''')
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
|
'''CREATE TABLE test_array_data_type
|
|
|
|
(
|
|
|
|
key Integer NOT NULL PRIMARY KEY,
|
|
|
|
a Date[] NOT NULL, -- Date
|
2021-07-14 00:51:14 +00:00
|
|
|
b Timestamp[] NOT NULL, -- DateTime64(6)
|
2021-02-12 10:05:13 +00:00
|
|
|
c real[][] NOT NULL, -- Float32
|
|
|
|
d double precision[][] NOT NULL, -- Float64
|
|
|
|
e decimal(5, 5)[][][] NOT NULL, -- Decimal32
|
|
|
|
f integer[][][] NOT NULL, -- Int32
|
|
|
|
g Text[][][][][] NOT NULL, -- String
|
|
|
|
h Integer[][][], -- Nullable(Int32)
|
|
|
|
i Char(2)[][][][], -- Nullable(String)
|
|
|
|
k Char(2)[] -- Nullable(String)
|
|
|
|
)''')
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-02-12 10:05:13 +00:00
|
|
|
|
|
|
|
for i in range(10):
|
|
|
|
instance.query('''
|
|
|
|
INSERT INTO postgres_database.test_data_types VALUES
|
2021-07-13 07:28:15 +00:00
|
|
|
({}, -32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12.012345', '2000-05-12', 0.2, 0.2)'''.format(i))
|
2021-02-12 10:05:13 +00:00
|
|
|
|
|
|
|
check_tables_are_synchronized('test_data_types', 'id');
|
|
|
|
result = instance.query('SELECT * FROM test_database.test_data_types ORDER BY id LIMIT 1;')
|
2021-08-16 02:00:39 +00:00
|
|
|
assert(result == '0\t-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12.012345\t2000-05-12\t0.2\t0.2\n')
|
2021-02-18 23:33:01 +00:00
|
|
|
|
|
|
|
for i in range(10):
|
|
|
|
col = random.choice(['a', 'b', 'c'])
|
|
|
|
cursor.execute('UPDATE test_data_types SET {} = {};'.format(col, i))
|
|
|
|
cursor.execute('''UPDATE test_data_types SET i = '2020-12-12';'''.format(col, i))
|
|
|
|
|
|
|
|
check_tables_are_synchronized('test_data_types', 'id');
|
2021-02-12 10:05:13 +00:00
|
|
|
|
|
|
|
instance.query("INSERT INTO postgres_database.test_array_data_type "
|
|
|
|
"VALUES ("
|
|
|
|
"0, "
|
|
|
|
"['2000-05-12', '2000-05-12'], "
|
2021-07-13 07:28:15 +00:00
|
|
|
"['2000-05-12 12:12:12.012345', '2000-05-12 12:12:12.012345'], "
|
2021-02-12 10:05:13 +00:00
|
|
|
"[[1.12345], [1.12345], [1.12345]], "
|
|
|
|
"[[1.1234567891], [1.1234567891], [1.1234567891]], "
|
|
|
|
"[[[0.11111, 0.11111]], [[0.22222, 0.22222]], [[0.33333, 0.33333]]], "
|
|
|
|
"[[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
|
|
|
|
"[[[[['winx', 'winx', 'winx']]]]], "
|
|
|
|
"[[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]], "
|
|
|
|
"[[[[NULL]]]], "
|
|
|
|
"[]"
|
|
|
|
")")
|
|
|
|
|
|
|
|
expected = (
|
|
|
|
"0\t" +
|
|
|
|
"['2000-05-12','2000-05-12']\t" +
|
2021-07-13 07:28:15 +00:00
|
|
|
"['2000-05-12 12:12:12.012345','2000-05-12 12:12:12.012345']\t" +
|
2021-02-12 10:05:13 +00:00
|
|
|
"[[1.12345],[1.12345],[1.12345]]\t" +
|
|
|
|
"[[1.1234567891],[1.1234567891],[1.1234567891]]\t" +
|
|
|
|
"[[[0.11111,0.11111]],[[0.22222,0.22222]],[[0.33333,0.33333]]]\t"
|
|
|
|
"[[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]]\t"
|
|
|
|
"[[[[['winx','winx','winx']]]]]\t"
|
|
|
|
"[[[1,NULL],[NULL,1]],[[NULL,NULL],[NULL,NULL]],[[4,4],[5,5]]]\t"
|
|
|
|
"[[[[NULL]]]]\t"
|
|
|
|
"[]\n"
|
|
|
|
)
|
|
|
|
|
|
|
|
check_tables_are_synchronized('test_array_data_type');
|
|
|
|
result = instance.query('SELECT * FROM test_database.test_array_data_type ORDER BY key;')
|
|
|
|
assert(result == expected)
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute('drop table if exists test_data_types;')
|
|
|
|
cursor.execute('drop table if exists test_array_data_type;')
|
2021-02-12 10:05:13 +00:00
|
|
|
|
|
|
|
|
2021-02-12 18:21:55 +00:00
|
|
|
def test_load_and_sync_subset_of_database_tables(started_cluster):
|
2021-07-04 08:54:41 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-02-12 18:21:55 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 10
|
|
|
|
|
|
|
|
publication_tables = ''
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, number from numbers(50)".format(i))
|
|
|
|
|
2021-02-15 22:49:13 +00:00
|
|
|
if i < int(NUM_TABLES/2):
|
2021-02-12 18:21:55 +00:00
|
|
|
if publication_tables != '':
|
|
|
|
publication_tables += ', '
|
|
|
|
publication_tables += table_name
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
2021-06-27 19:09:17 +00:00
|
|
|
settings=["materialized_postgresql_tables_list = '{}'".format(publication_tables)])
|
2021-02-12 18:21:55 +00:00
|
|
|
assert 'test_database' in instance.query('SHOW DATABASES')
|
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
2021-02-15 22:49:13 +00:00
|
|
|
for i in range(int(NUM_TABLES/2)):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
assert_nested_table_is_created(table_name)
|
|
|
|
|
2021-02-12 18:21:55 +00:00
|
|
|
result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''')
|
2021-02-15 22:49:13 +00:00
|
|
|
assert(int(result) == int(NUM_TABLES/2))
|
2021-02-12 18:21:55 +00:00
|
|
|
|
|
|
|
database_tables = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
2021-02-15 22:49:13 +00:00
|
|
|
if i < int(NUM_TABLES/2):
|
2021-02-12 18:21:55 +00:00
|
|
|
assert table_name in database_tables
|
|
|
|
else:
|
|
|
|
assert table_name not in database_tables
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT 50 + number, {} from numbers(100)".format(table_name, i))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
2021-02-15 22:49:13 +00:00
|
|
|
if i < int(NUM_TABLES/2):
|
2021-02-12 18:21:55 +00:00
|
|
|
check_tables_are_synchronized(table_name);
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-02-12 18:21:55 +00:00
|
|
|
|
|
|
|
|
2021-02-22 12:35:53 +00:00
|
|
|
def test_changing_replica_identity_value(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-02-22 12:35:53 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica');
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)")
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-02-22 12:35:53 +00:00
|
|
|
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)")
|
|
|
|
check_tables_are_synchronized('postgresql_replica');
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET key=key-25 WHERE key<100 ")
|
|
|
|
check_tables_are_synchronized('postgresql_replica');
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute('drop table if exists postgresql_replica;')
|
2021-02-22 12:35:53 +00:00
|
|
|
|
2021-02-18 23:33:01 +00:00
|
|
|
|
2021-04-08 20:39:56 +00:00
|
|
|
def test_clickhouse_restart(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-04-08 20:39:56 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i))
|
|
|
|
|
2021-06-27 19:09:17 +00:00
|
|
|
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
|
2021-04-08 20:39:56 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(50000)".format(i, i))
|
|
|
|
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-04-08 20:39:56 +00:00
|
|
|
|
|
|
|
|
2021-04-10 14:42:45 +00:00
|
|
|
def test_replica_identity_index(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-10-02 12:49:20 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
|
2021-04-10 14:42:45 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica', template=postgres_table_template_3);
|
|
|
|
cursor.execute("CREATE unique INDEX idx on postgresql_replica(key1, key2);")
|
|
|
|
cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx")
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)")
|
|
|
|
|
2021-10-02 12:49:20 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
|
2021-04-10 14:42:45 +00:00
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)")
|
|
|
|
check_tables_are_synchronized('postgresql_replica', order_by='key1');
|
|
|
|
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 ")
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 ")
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 ")
|
|
|
|
cursor.execute("UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 ")
|
|
|
|
check_tables_are_synchronized('postgresql_replica', order_by='key1');
|
|
|
|
|
|
|
|
cursor.execute('DELETE FROM postgresql_replica WHERE key2<75;')
|
|
|
|
check_tables_are_synchronized('postgresql_replica', order_by='key1');
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute('drop table if exists postgresql_replica;')
|
2021-04-10 14:42:45 +00:00
|
|
|
|
|
|
|
|
2021-05-03 17:28:54 +00:00
|
|
|
def test_table_schema_changes(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-05-03 17:28:54 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2);
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i))
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
2021-06-27 19:09:17 +00:00
|
|
|
settings=["materialized_postgresql_allow_automatic_update = 1"])
|
2021-05-03 17:28:54 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key");
|
|
|
|
|
|
|
|
altered_table = random.randint(0, 4)
|
|
|
|
cursor.execute("ALTER TABLE postgresql_replica_{} DROP COLUMN value2".format(altered_table))
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute("INSERT INTO postgresql_replica_{} VALUES (50, {}, {})".format(i, i, i))
|
|
|
|
cursor.execute("UPDATE postgresql_replica_{} SET value3 = 12 WHERE key%2=0".format(i))
|
|
|
|
|
|
|
|
assert_nested_table_is_created('postgresql_replica_{}'.format(altered_table))
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(altered_table))
|
|
|
|
print('check1 OK')
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
if i != altered_table:
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(i, i, i, i))
|
|
|
|
else:
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i))
|
|
|
|
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(altered_table));
|
|
|
|
print('check2 OK')
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table postgresql_replica_{};'.format(i))
|
|
|
|
|
|
|
|
instance.query("DROP DATABASE test_database")
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-05-03 17:28:54 +00:00
|
|
|
|
2021-05-02 11:50:29 +00:00
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
def test_many_concurrent_queries(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-05-10 11:31:06 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT number, number from numbers(10000)'.format(i))
|
|
|
|
n = [10000]
|
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
query_pool = ['DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;',
|
2021-05-10 13:51:05 +00:00
|
|
|
'UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;',
|
2021-05-10 11:31:06 +00:00
|
|
|
'DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;',
|
2021-05-10 14:51:17 +00:00
|
|
|
'UPDATE postgresql_replica_{} SET value = value*5 WHERE key % 2 = 1;',
|
2021-05-10 11:31:06 +00:00
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;',
|
2021-05-10 13:51:05 +00:00
|
|
|
'UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;',
|
2021-05-10 11:31:06 +00:00
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;',
|
2021-05-10 13:51:05 +00:00
|
|
|
'UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;',
|
2021-05-10 11:31:06 +00:00
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;',
|
2021-05-10 13:51:05 +00:00
|
|
|
'UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;',
|
|
|
|
'DELETE FROM postgresql_replica_{} WHERE value%5 = 0;']
|
2021-05-10 11:31:06 +00:00
|
|
|
|
|
|
|
def attack(thread_id):
|
|
|
|
print('thread {}'.format(thread_id))
|
|
|
|
k = 10000
|
2021-05-23 12:09:20 +00:00
|
|
|
for i in range(20):
|
|
|
|
query_id = random.randrange(0, len(query_pool)-1)
|
2021-05-10 11:31:06 +00:00
|
|
|
table_id = random.randrange(0, 5) # num tables
|
|
|
|
|
|
|
|
# random update / delete query
|
2021-05-23 12:09:20 +00:00
|
|
|
cursor.execute(query_pool[query_id].format(table_id))
|
2021-05-10 11:31:06 +00:00
|
|
|
print("table {} query {} ok".format(table_id, query_id))
|
|
|
|
|
|
|
|
# allow some thread to do inserts (not to violate key constraints)
|
|
|
|
if thread_id < 5:
|
2021-05-10 13:51:05 +00:00
|
|
|
print("try insert table {}".format(thread_id))
|
|
|
|
instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT {}*10000*({} + number), number from numbers(1000)'.format(i, thread_id, k))
|
2021-05-10 11:31:06 +00:00
|
|
|
k += 1
|
|
|
|
print("insert table {} ok".format(thread_id))
|
|
|
|
|
2021-05-10 13:51:05 +00:00
|
|
|
if i == 5:
|
|
|
|
# also change primary key value
|
|
|
|
print("try update primary key {}".format(thread_id))
|
|
|
|
cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(thread_id, i+1, i+1))
|
|
|
|
print("update primary key {} ok".format(thread_id))
|
|
|
|
|
2021-05-10 11:31:06 +00:00
|
|
|
threads = []
|
|
|
|
threads_num = 16
|
|
|
|
for i in range(threads_num):
|
|
|
|
threads.append(threading.Thread(target=attack, args=(i,)))
|
2021-05-23 12:09:20 +00:00
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-05-23 12:09:20 +00:00
|
|
|
|
2021-05-10 11:31:06 +00:00
|
|
|
for thread in threads:
|
|
|
|
time.sleep(random.uniform(0, 1))
|
|
|
|
thread.start()
|
|
|
|
|
|
|
|
n[0] = 50000
|
|
|
|
for table_id in range(NUM_TABLES):
|
|
|
|
n[0] += 1
|
|
|
|
instance.query('INSERT INTO postgres_database.postgresql_replica_{} SELECT {} + number, number from numbers(5000)'.format(table_id, n[0]))
|
2021-05-10 13:51:05 +00:00
|
|
|
#cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(table_id, table_id+1, table_id+1))
|
2021-05-10 11:31:06 +00:00
|
|
|
|
|
|
|
for thread in threads:
|
|
|
|
thread.join()
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
2021-05-23 12:09:20 +00:00
|
|
|
count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i))
|
|
|
|
count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i))
|
|
|
|
assert(int(count1) == int(count2))
|
|
|
|
print(count1, count2)
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-05-10 11:31:06 +00:00
|
|
|
|
|
|
|
|
2021-05-10 14:51:17 +00:00
|
|
|
def test_single_transaction(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, auto_commit=False)
|
2021-05-10 14:51:17 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_0');
|
|
|
|
conn.commit()
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-05-23 12:09:20 +00:00
|
|
|
assert_nested_table_is_created('postgresql_replica_0')
|
2021-05-10 14:51:17 +00:00
|
|
|
|
|
|
|
for query in queries:
|
|
|
|
print('query {}'.format(query))
|
|
|
|
cursor.execute(query.format(0))
|
|
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
result = instance.query("select count() from test_database.postgresql_replica_0")
|
|
|
|
# no commit yet
|
|
|
|
assert(int(result) == 0)
|
|
|
|
|
|
|
|
conn.commit()
|
2021-05-13 07:36:40 +00:00
|
|
|
check_tables_are_synchronized('postgresql_replica_0');
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute('drop table if exists postgresql_replica_0;')
|
2021-05-13 07:36:40 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_virtual_columns(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-03 19:45:27 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-05-13 07:36:40 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_0');
|
|
|
|
|
2021-06-03 19:45:27 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
2021-06-27 19:09:17 +00:00
|
|
|
settings=["materialized_postgresql_allow_automatic_update = 1"])
|
2021-05-13 07:36:40 +00:00
|
|
|
assert_nested_table_is_created('postgresql_replica_0')
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number from numbers(10)")
|
|
|
|
check_tables_are_synchronized('postgresql_replica_0');
|
|
|
|
|
2021-07-13 07:28:15 +00:00
|
|
|
# just check that it works, no check with `expected` because _version is taken as LSN, which will be different each time.
|
2021-05-13 07:36:40 +00:00
|
|
|
result = instance.query('SELECT key, value, _sign, _version FROM test_database.postgresql_replica_0;')
|
|
|
|
print(result)
|
|
|
|
|
|
|
|
cursor.execute("ALTER TABLE postgresql_replica_0 ADD COLUMN value2 integer")
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(10, 10)")
|
|
|
|
check_tables_are_synchronized('postgresql_replica_0');
|
|
|
|
|
|
|
|
result = instance.query('SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0;')
|
|
|
|
print(result)
|
|
|
|
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(20, 10)")
|
|
|
|
check_tables_are_synchronized('postgresql_replica_0');
|
|
|
|
|
|
|
|
result = instance.query('SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0;')
|
|
|
|
print(result)
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
cursor.execute('drop table if exists postgresql_replica_0;')
|
2021-05-13 07:36:40 +00:00
|
|
|
|
2021-05-23 12:09:20 +00:00
|
|
|
|
2021-06-26 22:05:20 +00:00
|
|
|
def test_multiple_databases(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db('test_database_1')
|
|
|
|
drop_materialized_db('test_database_2')
|
2021-06-26 22:05:20 +00:00
|
|
|
NUM_TABLES = 5
|
|
|
|
|
2021-06-27 16:15:28 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=False)
|
2021-06-26 22:05:20 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_db(cursor, 'postgres_database_1')
|
|
|
|
create_postgres_db(cursor, 'postgres_database_2')
|
|
|
|
|
2021-06-27 16:15:28 +00:00
|
|
|
conn1 = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, database_name='postgres_database_1')
|
|
|
|
conn2 = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, database_name='postgres_database_2')
|
2021-06-26 22:05:20 +00:00
|
|
|
|
|
|
|
cursor1 = conn1.cursor()
|
|
|
|
cursor2 = conn2.cursor()
|
|
|
|
|
2021-10-02 12:49:20 +00:00
|
|
|
create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_1', 'postgres_database_1')
|
|
|
|
create_clickhouse_postgres_db(cluster.postgres_ip, cluster.postgres_port, 'postgres_database_2', 'postgres_database_2')
|
2021-06-26 22:05:20 +00:00
|
|
|
|
|
|
|
cursors = [cursor1, cursor2]
|
|
|
|
for cursor_id in range(len(cursors)):
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursors[cursor_id], table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database_{}.{} SELECT number, number from numbers(50)".format(cursor_id + 1, table_name))
|
|
|
|
print('database 1 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_1';'''))
|
|
|
|
print('database 2 tables: ', instance.query('''SELECT name FROM system.tables WHERE database = 'postgres_database_2';'''))
|
|
|
|
|
2021-06-27 16:15:28 +00:00
|
|
|
create_materialized_db(started_cluster.postgres_ip, started_cluster.postgres_port,
|
|
|
|
'test_database_1', 'postgres_database_1')
|
|
|
|
create_materialized_db(started_cluster.postgres_ip, started_cluster.postgres_port,
|
|
|
|
'test_database_2', 'postgres_database_2')
|
2021-06-26 22:05:20 +00:00
|
|
|
|
|
|
|
cursors = [cursor1, cursor2]
|
|
|
|
for cursor_id in range(len(cursors)):
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
instance.query("INSERT INTO postgres_database_{}.{} SELECT 50 + number, number from numbers(50)".format(cursor_id + 1, table_name))
|
|
|
|
|
|
|
|
for cursor_id in range(len(cursors)):
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(
|
|
|
|
table_name, 'key', 'postgres_database_{}'.format(cursor_id + 1), 'test_database_{}'.format(cursor_id + 1));
|
|
|
|
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor1.execute('drop table if exists postgresql_replica_{};'.format(i))
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor2.execute('drop table if exists postgresql_replica_{};'.format(i))
|
|
|
|
|
2021-06-26 22:05:20 +00:00
|
|
|
drop_clickhouse_postgres_db('postgres_database_1')
|
|
|
|
drop_clickhouse_postgres_db('postgres_database_2')
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-06-26 22:05:20 +00:00
|
|
|
drop_materialized_db('test_database_1')
|
|
|
|
drop_materialized_db('test_database_2')
|
|
|
|
|
|
|
|
|
|
|
|
def test_concurrent_transactions(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-27 16:15:28 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-06-26 22:05:20 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 6
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
def transaction(thread_id):
|
2021-06-27 16:15:28 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, auto_commit=False)
|
2021-06-26 22:05:20 +00:00
|
|
|
cursor_ = conn.cursor()
|
|
|
|
for query in queries:
|
|
|
|
cursor_.execute(query.format(thread_id))
|
|
|
|
print('thread {}, query {}'.format(thread_id, query))
|
2021-06-27 16:15:28 +00:00
|
|
|
conn.commit()
|
2021-06-26 22:05:20 +00:00
|
|
|
|
|
|
|
threads = []
|
|
|
|
threads_num = 6
|
|
|
|
for i in range(threads_num):
|
|
|
|
threads.append(threading.Thread(target=transaction, args=(i,)))
|
|
|
|
|
2021-06-27 16:15:28 +00:00
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
2021-06-26 22:05:20 +00:00
|
|
|
|
|
|
|
for thread in threads:
|
|
|
|
time.sleep(random.uniform(0, 0.5))
|
|
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
|
|
thread.join()
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
count1 = instance.query('SELECT count() FROM postgres_database.postgresql_replica_{}'.format(i))
|
|
|
|
count2 = instance.query('SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})'.format(i))
|
|
|
|
print(int(count1), int(count2), sep=' ')
|
|
|
|
assert(int(count1) == int(count2))
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-06-26 22:05:20 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-05-10 14:51:17 +00:00
|
|
|
|
|
|
|
|
2021-06-29 23:11:46 +00:00
|
|
|
def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-29 23:11:46 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 6
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
def transaction(thread_id):
|
|
|
|
if thread_id % 2:
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, auto_commit=True)
|
|
|
|
else:
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, auto_commit=False)
|
|
|
|
cursor_ = conn.cursor()
|
|
|
|
for query in queries:
|
|
|
|
cursor_.execute(query.format(thread_id))
|
|
|
|
print('thread {}, query {}'.format(thread_id, query))
|
|
|
|
if thread_id % 2 == 0:
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
|
threads = []
|
|
|
|
threads_num = 6
|
|
|
|
for i in range(threads_num):
|
|
|
|
threads.append(threading.Thread(target=transaction, args=(i,)))
|
|
|
|
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
|
|
|
|
|
|
|
for thread in threads:
|
|
|
|
time.sleep(random.uniform(0, 0.5))
|
|
|
|
thread.start()
|
|
|
|
|
|
|
|
# Join here because it takes time for data to reach wal
|
|
|
|
for thread in threads:
|
|
|
|
thread.join()
|
|
|
|
time.sleep(1)
|
|
|
|
started_cluster.pause_container('postgres1')
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
|
|
|
|
print(result) # Just debug
|
|
|
|
|
|
|
|
started_cluster.unpause_container('postgres1')
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
|
|
|
|
print(result) # Just debug
|
|
|
|
|
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-06-29 23:11:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_drop_database_while_replication_startup_not_finished(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-29 23:11:46 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursor, table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(100000)".format(table_name))
|
|
|
|
|
|
|
|
for i in range(6):
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
|
|
|
|
time.sleep(0.5 * i)
|
|
|
|
drop_materialized_db()
|
|
|
|
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
|
|
|
|
2021-06-29 23:11:46 +00:00
|
|
|
|
|
|
|
def test_restart_server_while_replication_startup_not_finished(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-06-29 23:11:46 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table(cursor, table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(100000)".format(table_name))
|
|
|
|
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
|
|
|
|
time.sleep(0.5)
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
2021-07-05 03:53:48 +00:00
|
|
|
|
2021-06-29 23:11:46 +00:00
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table postgresql_replica_{};'.format(i))
|
2021-06-29 23:11:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
2021-07-03 13:35:11 +00:00
|
|
|
drop_materialized_db()
|
2021-07-01 07:33:58 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
2021-07-01 08:20:13 +00:00
|
|
|
NUM_TABLES = 6
|
2021-07-01 07:33:58 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
def transaction(thread_id):
|
2021-07-01 08:20:13 +00:00
|
|
|
if thread_id % 2:
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, auto_commit=True)
|
|
|
|
else:
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True, auto_commit=False)
|
2021-07-01 07:33:58 +00:00
|
|
|
cursor_ = conn.cursor()
|
|
|
|
for query in queries:
|
|
|
|
cursor_.execute(query.format(thread_id))
|
|
|
|
print('thread {}, query {}'.format(thread_id, query))
|
2021-07-01 08:20:13 +00:00
|
|
|
if thread_id % 2 == 0:
|
|
|
|
conn.commit()
|
2021-07-01 07:33:58 +00:00
|
|
|
|
|
|
|
threads = []
|
2021-07-01 08:20:13 +00:00
|
|
|
threads_num = 6
|
2021-07-01 07:33:58 +00:00
|
|
|
for i in range(threads_num):
|
|
|
|
threads.append(threading.Thread(target=transaction, args=(i,)))
|
|
|
|
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
|
|
|
|
|
|
|
for thread in threads:
|
|
|
|
time.sleep(random.uniform(0, 0.5))
|
|
|
|
thread.start()
|
|
|
|
|
|
|
|
# Join here because it takes time for data to reach wal
|
|
|
|
for thread in threads:
|
|
|
|
thread.join()
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
|
|
|
|
print(result) # Just debug
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
result = instance.query("SELECT count() FROM test_database.postgresql_replica_{}".format(i))
|
|
|
|
print(result) # Just debug
|
|
|
|
|
|
|
|
drop_materialized_db()
|
2021-07-05 03:53:48 +00:00
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-06-29 23:11:46 +00:00
|
|
|
|
|
|
|
|
2021-08-31 20:58:00 +00:00
|
|
|
def test_quoting(started_cluster):
|
|
|
|
table_name = 'user'
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
create_postgres_table(cursor, table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name))
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
drop_postgres_table(cursor, table_name)
|
|
|
|
drop_materialized_db()
|
|
|
|
|
|
|
|
|
2021-09-04 10:07:59 +00:00
|
|
|
def test_user_managed_slots(started_cluster):
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
table_name = 'test_table'
|
|
|
|
create_postgres_table(cursor, table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
|
|
|
|
|
|
|
|
slot_name = 'user_slot'
|
|
|
|
replication_connection = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
|
|
|
|
database=True, replication=True, auto_commit=True)
|
|
|
|
snapshot = create_replication_slot(replication_connection, slot_name=slot_name)
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
settings=["materialized_postgresql_replication_slot = '{}'".format(slot_name),
|
|
|
|
"materialized_postgresql_snapshot = '{}'".format(snapshot)])
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(20000, 10000)".format(table_name))
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
drop_postgres_table(cursor, table_name)
|
|
|
|
drop_materialized_db()
|
2021-09-04 20:55:59 +00:00
|
|
|
drop_replication_slot(replication_connection, slot_name)
|
2021-09-04 10:07:59 +00:00
|
|
|
|
|
|
|
|
2021-09-08 19:49:45 +00:00
|
|
|
def test_add_new_table_to_replication(started_cluster):
|
2021-09-03 11:16:32 +00:00
|
|
|
drop_materialized_db()
|
2021-09-08 22:25:08 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
2021-09-03 11:16:32 +00:00
|
|
|
cursor = conn.cursor()
|
2021-08-27 12:50:45 +00:00
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
|
|
|
|
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
|
|
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
|
|
|
|
|
|
|
|
table_name = 'postgresql_replica_5'
|
|
|
|
create_postgres_table(cursor, table_name)
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
|
|
|
|
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(") # Check without ip
|
|
|
|
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
|
|
|
|
|
|
|
|
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'")
|
|
|
|
assert('Changing setting `materialized_postgresql_tables_list` is not allowed' in result)
|
|
|
|
|
|
|
|
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'")
|
|
|
|
assert('Database engine MaterializedPostgreSQL does not support setting' in result)
|
|
|
|
|
|
|
|
instance.query("ATTACH TABLE test_database.{}".format(table_name));
|
|
|
|
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
|
|
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n")
|
|
|
|
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
result = instance.query_and_get_error("ATTACH TABLE test_database.{}".format(table_name));
|
|
|
|
assert('Table test_database.postgresql_replica_5 already exists' in result)
|
|
|
|
|
|
|
|
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table");
|
|
|
|
assert('PostgreSQL table unknown_table does not exist' in result)
|
|
|
|
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
|
|
|
|
assert(result[-180:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5\\'\n")
|
|
|
|
|
|
|
|
table_name = 'postgresql_replica_6'
|
|
|
|
create_postgres_table(cursor, table_name)
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
|
|
|
|
instance.query("ATTACH TABLE test_database.{}".format(table_name));
|
|
|
|
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
|
|
|
|
table_name = 'postgresql_replica_7'
|
|
|
|
create_postgres_table(cursor, table_name)
|
|
|
|
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
|
|
|
|
instance.query("ATTACH TABLE test_database.{}".format(table_name));
|
|
|
|
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
|
|
|
|
assert(result[-222:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7\\'\n")
|
2021-09-03 11:16:32 +00:00
|
|
|
|
2021-08-27 12:50:45 +00:00
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
|
|
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n")
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES + 3):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES + 3):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
|
|
|
|
2021-08-28 13:42:36 +00:00
|
|
|
def test_remove_table_from_replication(started_cluster):
|
2021-09-03 11:16:32 +00:00
|
|
|
drop_materialized_db()
|
2021-08-28 13:42:36 +00:00
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port,
|
|
|
|
database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
NUM_TABLES = 5
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
|
|
|
|
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
|
|
|
|
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip,
|
|
|
|
port=started_cluster.postgres_port)
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
|
|
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
|
|
|
|
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
|
|
|
|
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
|
|
|
|
|
|
|
|
table_name = 'postgresql_replica_4'
|
|
|
|
instance.query('DETACH TABLE test_database.{}'.format(table_name));
|
|
|
|
result = instance.query_and_get_error('SELECT * FROM test_database.{}'.format(table_name))
|
|
|
|
assert("doesn't exist" in result)
|
|
|
|
|
|
|
|
result = instance.query("SHOW TABLES FROM test_database")
|
|
|
|
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\n")
|
|
|
|
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
|
|
|
|
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n")
|
|
|
|
|
|
|
|
instance.query('ATTACH TABLE test_database.{}'.format(table_name));
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
check_tables_are_synchronized(table_name);
|
|
|
|
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
|
|
|
|
assert(result[-159:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
|
|
|
|
|
|
|
|
table_name = 'postgresql_replica_1'
|
|
|
|
instance.query('DETACH TABLE test_database.{}'.format(table_name));
|
|
|
|
result = instance.query('SHOW CREATE DATABASE test_database')
|
|
|
|
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
|
|
|
|
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
|
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
|
2021-09-03 11:16:32 +00:00
|
|
|
|
|
|
|
|
2021-09-22 15:10:25 +00:00
|
|
|
def test_predefined_connection_configuration(started_cluster):
|
|
|
|
drop_materialized_db()
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
|
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(f'DROP TABLE IF EXISTS test_table')
|
|
|
|
cursor.execute(f'CREATE TABLE test_table (key integer PRIMARY KEY, value integer)')
|
|
|
|
|
|
|
|
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1)")
|
|
|
|
check_tables_are_synchronized("test_table");
|
|
|
|
drop_materialized_db()
|
|
|
|
|
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_counter = 0
|
|
|
|
|
|
|
|
def test_database_with_multiple_non_default_schemas_1(started_cluster):
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
|
2021-09-12 12:33:54 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
NUM_TABLES = 5
|
2021-09-12 12:33:54 +00:00
|
|
|
schema_name = 'test_schema'
|
|
|
|
clickhouse_postgres_db = 'postgres_database_with_schema'
|
|
|
|
publication_tables = ''
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_counter = 0
|
|
|
|
|
|
|
|
def insert_into_tables():
|
|
|
|
global insert_counter
|
|
|
|
clickhouse_postgres_db = 'postgres_database_with_schema'
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = f'postgresql_replica_{i}'
|
|
|
|
instance.query(f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)")
|
|
|
|
insert_counter += 1
|
|
|
|
|
|
|
|
def assert_show_tables(expected):
|
|
|
|
result = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
assert(result == expected)
|
|
|
|
print('assert show tables Ok')
|
|
|
|
|
|
|
|
def check_all_tables_are_synchronized():
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
print('checking table', i)
|
|
|
|
check_tables_are_synchronized("postgresql_replica_{}".format(i), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
|
|
|
|
print('synchronization Ok')
|
|
|
|
|
|
|
|
create_postgres_schema(cursor, schema_name)
|
|
|
|
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
|
2021-09-12 12:33:54 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table_with_schema(cursor, schema_name, table_name);
|
|
|
|
if publication_tables != '':
|
|
|
|
publication_tables += ', '
|
|
|
|
publication_tables += schema_name + '.' + table_name
|
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_into_tables()
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
|
|
|
|
settings=[f"materialized_postgresql_tables_list = '{publication_tables}'", "materialized_postgresql_allow_automatic_update = 1"])
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
check_all_tables_are_synchronized()
|
|
|
|
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
instance.restart_clickhouse()
|
|
|
|
check_all_tables_are_synchronized()
|
|
|
|
assert_show_tables("test_schema.postgresql_replica_0\ntest_schema.postgresql_replica_1\ntest_schema.postgresql_replica_2\ntest_schema.postgresql_replica_3\ntest_schema.postgresql_replica_4\n")
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_into_tables()
|
|
|
|
check_all_tables_are_synchronized()
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
#altered_table = random.randint(0, NUM_TABLES-1)
|
|
|
|
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
|
|
|
|
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
#table_name = 'postgresql_replica_{}'.format(altered_table)
|
|
|
|
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
|
|
|
|
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
|
|
|
|
#print('Ok')
|
|
|
|
|
|
|
|
drop_materialized_db()
|
2021-09-12 12:33:54 +00:00
|
|
|
|
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
def test_database_with_multiple_non_default_schemas_2(started_cluster):
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
|
|
|
|
cursor = conn.cursor()
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
NUM_TABLES = 2
|
|
|
|
schemas_num = 2
|
|
|
|
schema_list = 'schema0, schema1'
|
|
|
|
insert_counter = 0
|
|
|
|
|
|
|
|
def check_all_tables_are_synchronized():
|
|
|
|
for i in range(schemas_num):
|
|
|
|
schema_name = f'schema{i}'
|
|
|
|
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
|
|
|
|
for ti in range(NUM_TABLES):
|
|
|
|
table_name = f'postgresql_replica_{ti}'
|
|
|
|
print(f'checking table {schema_name}.{table_name}')
|
|
|
|
check_tables_are_synchronized(f'{table_name}', schema_name=schema_name, postgres_database=clickhouse_postgres_db);
|
|
|
|
print('synchronized Ok')
|
|
|
|
|
|
|
|
def insert_into_tables():
|
|
|
|
global insert_counter
|
|
|
|
for i in range(schemas_num):
|
|
|
|
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
|
|
|
|
for ti in range(NUM_TABLES):
|
|
|
|
table_name = f'postgresql_replica_{ti}'
|
|
|
|
instance.query(f'INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)')
|
|
|
|
insert_counter += 1
|
|
|
|
|
|
|
|
def assert_show_tables(expected):
|
|
|
|
result = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
assert(result == expected)
|
|
|
|
print('assert show tables Ok')
|
|
|
|
|
|
|
|
for i in range(schemas_num):
|
|
|
|
schema_name = f'schema{i}'
|
|
|
|
clickhouse_postgres_db = f'clickhouse_postgres_db{i}'
|
|
|
|
create_postgres_schema(cursor, schema_name)
|
|
|
|
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
|
|
|
|
for ti in range(NUM_TABLES):
|
|
|
|
table_name = f'postgresql_replica_{ti}'
|
|
|
|
create_postgres_table_with_schema(cursor, schema_name, table_name);
|
|
|
|
|
|
|
|
insert_into_tables()
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
|
|
|
|
settings=[f"materialized_postgresql_schema_list = '{schema_list}'", "materialized_postgresql_allow_automatic_update = 1"])
|
|
|
|
|
|
|
|
check_all_tables_are_synchronized()
|
|
|
|
insert_into_tables()
|
|
|
|
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
|
|
|
|
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
assert_show_tables("schema0.postgresql_replica_0\nschema0.postgresql_replica_1\nschema1.postgresql_replica_0\nschema1.postgresql_replica_1\n")
|
|
|
|
check_all_tables_are_synchronized()
|
|
|
|
insert_into_tables()
|
|
|
|
check_all_tables_are_synchronized()
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-01 15:54:01 +00:00
|
|
|
#altered_table = random.randint(0, NUM_TABLES-1)
|
|
|
|
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
|
|
|
|
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-01 15:54:01 +00:00
|
|
|
#table_name = 'postgresql_replica_{}'.format(altered_table)
|
|
|
|
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
|
|
|
|
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
|
|
|
|
#print('Ok')
|
2021-09-12 12:33:54 +00:00
|
|
|
|
|
|
|
drop_materialized_db()
|
|
|
|
|
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
def test_database_with_single_non_default_schema(started_cluster):
|
|
|
|
conn = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, database=True)
|
2021-09-12 12:33:54 +00:00
|
|
|
cursor = conn.cursor()
|
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
NUM_TABLES=5
|
2021-09-12 12:33:54 +00:00
|
|
|
schema_name = 'test_schema'
|
|
|
|
clickhouse_postgres_db = 'postgres_database_with_schema'
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_counter = 0
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
def insert_into_tables():
|
|
|
|
global insert_counter
|
|
|
|
clickhouse_postgres_db = 'postgres_database_with_schema'
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
table_name = f'postgresql_replica_{i}'
|
|
|
|
instance.query(f"INSERT INTO {clickhouse_postgres_db}.{table_name} SELECT number, number from numbers(1000 * {insert_counter}, 1000)")
|
|
|
|
insert_counter += 1
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
def assert_show_tables(expected):
|
|
|
|
result = instance.query('SHOW TABLES FROM test_database')
|
|
|
|
assert(result == expected)
|
|
|
|
print('assert show tables Ok')
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
def check_all_tables_are_synchronized():
|
|
|
|
for i in range(NUM_TABLES):
|
|
|
|
print('checking table', i)
|
|
|
|
check_tables_are_synchronized("postgresql_replica_{}".format(i), postgres_database=clickhouse_postgres_db);
|
|
|
|
print('synchronization Ok')
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
create_postgres_schema(cursor, schema_name)
|
|
|
|
create_clickhouse_postgres_db(ip=cluster.postgres_ip, port=cluster.postgres_port, name=clickhouse_postgres_db, schema_name=schema_name)
|
2021-09-12 12:33:54 +00:00
|
|
|
|
|
|
|
for i in range(NUM_TABLES):
|
2021-10-02 11:11:18 +00:00
|
|
|
table_name = 'postgresql_replica_{}'.format(i)
|
|
|
|
create_postgres_table_with_schema(cursor, schema_name, table_name);
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_into_tables()
|
|
|
|
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
|
|
|
|
settings=[f"materialized_postgresql_schema = '{schema_name}'", "materialized_postgresql_allow_automatic_update = 1"])
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
insert_into_tables()
|
|
|
|
check_all_tables_are_synchronized()
|
|
|
|
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-02 11:11:18 +00:00
|
|
|
instance.restart_clickhouse()
|
|
|
|
check_all_tables_are_synchronized()
|
|
|
|
assert_show_tables("postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
|
|
|
|
insert_into_tables()
|
|
|
|
check_all_tables_are_synchronized()
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-01 15:54:01 +00:00
|
|
|
#altered_table = random.randint(0, NUM_TABLES-1)
|
|
|
|
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
|
|
|
|
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), postgres_database=clickhouse_postgres_db);
|
2021-09-12 12:33:54 +00:00
|
|
|
|
2021-10-01 15:54:01 +00:00
|
|
|
#table_name = 'postgresql_replica_{}'.format(altered_table)
|
|
|
|
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
|
|
|
|
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), postgres_database=clickhouse_postgres_db);
|
|
|
|
#print('Ok')
|
2021-09-12 12:33:54 +00:00
|
|
|
|
|
|
|
drop_materialized_db()
|
|
|
|
|
|
|
|
|
2021-02-08 23:23:51 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
cluster.start()
|
|
|
|
input("Cluster created, press any key to destroy...")
|
|
|
|
cluster.shutdown()
|