2021-02-08 23:23:51 +00:00
import pytest
import time
import psycopg2
import os . path as p
2021-02-18 23:33:01 +00:00
import random
2021-02-08 23:23:51 +00:00
from helpers . cluster import ClickHouseCluster
from helpers . test_tools import assert_eq_with_retry
from psycopg2 . extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers . test_tools import TSV
2021-05-10 11:31:06 +00:00
from random import randrange
import threading
2021-02-08 23:23:51 +00:00
cluster = ClickHouseCluster ( __file__ )
2021-04-08 20:39:56 +00:00
instance = cluster . add_instance ( ' instance ' ,
2021-04-10 14:42:45 +00:00
main_configs = [ ' configs/log_conf.xml ' ] ,
2021-04-08 20:39:56 +00:00
user_configs = [ ' configs/users.xml ' ] ,
with_postgres = True , stay_alive = True )
2021-02-08 23:23:51 +00:00
postgres_table_template = """
2021-08-31 20:58:00 +00:00
CREATE TABLE IF NOT EXISTS " {} " (
2021-02-11 21:59:58 +00:00
key Integer NOT NULL , value Integer , PRIMARY KEY ( key ) )
2021-02-08 23:23:51 +00:00
"""
2021-02-18 23:33:01 +00:00
postgres_table_template_2 = """
2021-08-31 20:58:00 +00:00
CREATE TABLE IF NOT EXISTS " {} " (
2021-02-18 23:33:01 +00:00
key Integer NOT NULL , value1 Integer , value2 Integer , value3 Integer , PRIMARY KEY ( key ) )
"""
2021-04-10 14:42:45 +00:00
postgres_table_template_3 = """
2021-08-31 20:58:00 +00:00
CREATE TABLE IF NOT EXISTS " {} " (
2021-04-10 14:42:45 +00:00
key1 Integer NOT NULL , value1 Integer , key2 Integer NOT NULL , value2 Integer NOT NULL )
"""
2021-09-12 12:33:54 +00:00
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS " {} " . " {} " (
key Integer NOT NULL , value Integer , PRIMARY KEY ( key ) )
"""
2021-02-08 23:23:51 +00:00
2021-09-04 10:07:59 +00:00
def get_postgres_conn ( ip , port , database = False , auto_commit = True , database_name = ' postgres_database ' , replication = False ) :
2021-02-08 23:23:51 +00:00
if database == True :
2021-06-03 19:45:27 +00:00
conn_string = " host= {} port= {} dbname= ' {} ' user= ' postgres ' password= ' mysecretpassword ' " . format ( ip , port , database_name )
2021-02-08 23:23:51 +00:00
else :
2021-06-03 19:45:27 +00:00
conn_string = " host= {} port= {} user= ' postgres ' password= ' mysecretpassword ' " . format ( ip , port )
2021-09-04 10:07:59 +00:00
if replication :
conn_string + = " replication= ' database ' "
2021-02-08 23:23:51 +00:00
conn = psycopg2 . connect ( conn_string )
2021-05-10 14:51:17 +00:00
if auto_commit :
conn . set_isolation_level ( ISOLATION_LEVEL_AUTOCOMMIT )
conn . autocommit = True
2021-02-08 23:23:51 +00:00
return conn
2021-09-04 10:07:59 +00:00
def create_replication_slot ( conn , slot_name = ' user_slot ' ) :
cursor = conn . cursor ( )
cursor . execute ( ' CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT ' . format ( slot_name ) )
result = cursor . fetchall ( )
print ( result [ 0 ] [ 0 ] ) # slot name
print ( result [ 0 ] [ 1 ] ) # start lsn
print ( result [ 0 ] [ 2 ] ) # snapshot
return result [ 0 ] [ 2 ]
2021-09-04 20:55:59 +00:00
def drop_replication_slot ( conn , slot_name = ' user_slot ' ) :
cursor = conn . cursor ( )
cursor . execute ( " select pg_drop_replication_slot( ' {} ' ) " . format ( slot_name ) )
2021-02-08 23:23:51 +00:00
2021-05-23 12:09:20 +00:00
def create_postgres_db ( cursor , name = ' postgres_database ' ) :
2021-02-08 23:23:51 +00:00
cursor . execute ( " CREATE DATABASE {} " . format ( name ) )
2021-05-23 12:09:20 +00:00
def drop_postgres_db ( cursor , name = ' postgres_database ' ) :
2021-07-05 03:53:48 +00:00
cursor . execute ( " DROP DATABASE IF EXISTS {} " . format ( name ) )
2021-05-23 12:09:20 +00:00
2021-10-02 11:11:18 +00:00
def drop_postgres_schema ( cursor , schema_name ) :
cursor . execute ( ' DROP SCHEMA IF EXISTS {} CASCADE ' . format ( schema_name ) )
def create_postgres_schema ( cursor , schema_name ) :
drop_postgres_schema ( cursor , schema_name )
cursor . execute ( ' CREATE SCHEMA {} ' . format ( schema_name ) )
2021-09-12 12:33:54 +00:00
def create_clickhouse_postgres_db ( ip , port , name = ' postgres_database ' , database_name = ' postgres_database ' , schema_name = ' ' ) :
drop_clickhouse_postgres_db ( name )
if len ( schema_name ) == 0 :
instance . query ( '''
CREATE DATABASE { }
ENGINE = PostgreSQL ( ' {} : {} ' , ' {} ' , ' postgres ' , ' mysecretpassword ' ) ''' .format(name, ip, port, database_name))
else :
instance . query ( '''
CREATE DATABASE { }
ENGINE = PostgreSQL ( ' {} : {} ' , ' {} ' , ' postgres ' , ' mysecretpassword ' , ' {} ' ) ''' .format(name, ip, port, database_name, schema_name))
2021-05-23 12:09:20 +00:00
def drop_clickhouse_postgres_db ( name = ' postgres_database ' ) :
2021-09-12 12:33:54 +00:00
instance . query ( ' DROP DATABASE IF EXISTS {} ' . format ( name ) )
2021-06-03 19:45:27 +00:00
def create_materialized_db ( ip , port ,
materialized_database = ' test_database ' ,
postgres_database = ' postgres_database ' ,
settings = [ ] ) :
2021-10-08 20:52:15 +00:00
instance . query ( f " DROP DATABASE IF EXISTS { materialized_database } " )
create_query = f " CREATE DATABASE { materialized_database } ENGINE = MaterializedPostgreSQL( ' { ip } : { port } ' , ' { postgres_database } ' , ' postgres ' , ' mysecretpassword ' ) "
2021-06-03 19:45:27 +00:00
if len ( settings ) > 0 :
create_query + = " SETTINGS "
for i in range ( len ( settings ) ) :
if i != 0 :
create_query + = ' , '
create_query + = settings [ i ]
instance . query ( create_query )
2021-05-23 12:09:20 +00:00
assert materialized_database in instance . query ( ' SHOW DATABASES ' )
2021-07-04 08:54:41 +00:00
def drop_materialized_db ( materialized_database = ' test_database ' ) :
instance . query ( ' DROP DATABASE IF EXISTS {} ' . format ( materialized_database ) )
2021-05-23 12:09:20 +00:00
assert materialized_database not in instance . query ( ' SHOW DATABASES ' )
2021-02-08 23:23:51 +00:00
2021-08-31 20:58:00 +00:00
def drop_postgres_table ( cursor , table_name ) :
2021-09-01 14:32:09 +00:00
cursor . execute ( """ DROP TABLE IF EXISTS " {} " """ . format ( table_name ) )
2021-08-31 20:58:00 +00:00
2021-09-12 12:33:54 +00:00
def drop_postgres_table_with_schema ( cursor , schema_name , table_name ) :
cursor . execute ( """ DROP TABLE IF EXISTS " {} " . " {} " """ . format ( schema_name , table_name ) )
2021-02-18 23:33:01 +00:00
def create_postgres_table ( cursor , table_name , replica_identity_full = False , template = postgres_table_template ) :
2021-08-31 20:58:00 +00:00
drop_postgres_table ( cursor , table_name )
2021-02-18 23:33:01 +00:00
cursor . execute ( template . format ( table_name ) )
2021-02-18 18:14:05 +00:00
if replica_identity_full :
cursor . execute ( ' ALTER TABLE {} REPLICA IDENTITY FULL; ' . format ( table_name ) )
2021-02-08 23:23:51 +00:00
2021-09-12 12:33:54 +00:00
def create_postgres_table_with_schema ( cursor , schema_name , table_name ) :
drop_postgres_table_with_schema ( cursor , schema_name , table_name )
cursor . execute ( postgres_table_template_4 . format ( schema_name , table_name ) )
2021-05-23 12:09:20 +00:00
queries = [
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(0, 10000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE (value*value) % 3 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value - 125 WHERE key % 2 = 0; ' ,
" UPDATE postgresql_replica_ {} SET key=key+20000 WHERE key % 2=0 " ,
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(40000, 50000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE key % 10 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value + 101 WHERE key % 2 = 1; ' ,
" UPDATE postgresql_replica_ {} SET key=key+80000 WHERE key % 2=1 " ,
' DELETE FROM postgresql_replica_ {} WHERE value % 2 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value + 2000 WHERE key % 5 = 0; ' ,
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(200000, 250000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE value % 3 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value * 2 WHERE key % 3 = 0; ' ,
" UPDATE postgresql_replica_ {} SET key=key+500000 WHERE key % 2=1 " ,
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(1000000, 1050000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE value % 9 = 2; ' ,
" UPDATE postgresql_replica_ {} SET key=key+10000000 " ,
' UPDATE postgresql_replica_ {} SET value = value + 2 WHERE key % 3 = 1; ' ,
2021-07-01 07:33:58 +00:00
' DELETE FROM postgresql_replica_ {} WHERE value % 5 = 0; '
]
2021-05-23 12:09:20 +00:00
2021-02-08 23:23:51 +00:00
2021-09-12 12:33:54 +00:00
def assert_nested_table_is_created ( table_name , materialized_database = ' test_database ' , schema_name = ' ' ) :
if len ( schema_name ) == 0 :
table = table_name
else :
table = schema_name + " . " + table_name
print ( ' Checking table exists: ' , table )
2021-05-23 12:09:20 +00:00
database_tables = instance . query ( ' SHOW TABLES FROM {} ' . format ( materialized_database ) )
2021-09-12 12:33:54 +00:00
while table not in database_tables :
2021-02-15 22:49:13 +00:00
time . sleep ( 0.2 )
2021-05-23 12:09:20 +00:00
database_tables = instance . query ( ' SHOW TABLES FROM {} ' . format ( materialized_database ) )
2021-09-12 12:33:54 +00:00
assert ( table in database_tables )
2021-02-15 22:49:13 +00:00
2021-10-08 20:52:15 +00:00
def assert_number_of_columns ( expected , table_name , database_name = ' test_database ' ) :
result = instance . query ( f " select count() from system.columns where table = ' { table_name } ' and database = ' { database_name } ' and not startsWith(name, ' _ ' ) " )
while ( int ( result ) != expected ) :
time . sleep ( 1 )
result = instance . query ( f " select count() from system.columns where table = ' { table_name } ' and database = ' { database_name } ' and not startsWith(name, ' _ ' ) " )
2021-08-11 12:29:03 +00:00
@pytest.mark.timeout ( 320 )
2021-05-23 12:09:20 +00:00
def check_tables_are_synchronized ( table_name , order_by = ' key ' , postgres_database = ' postgres_database ' , materialized_database = ' test_database ' ) :
2021-06-27 19:09:17 +00:00
assert_nested_table_is_created ( table_name , materialized_database )
2021-02-17 20:42:18 +00:00
2021-09-12 12:33:54 +00:00
print ( " Checking table is synchronized: " , table_name )
2021-06-27 19:09:17 +00:00
expected = instance . query ( ' select * from {} . {} order by {} ; ' . format ( postgres_database , table_name , order_by ) )
2021-09-12 12:33:54 +00:00
if len ( schema_name ) == 0 :
result = instance . query ( ' select * from {} . {} order by {} ; ' . format ( materialized_database , table_name , order_by ) )
else :
result = instance . query ( ' select * from {} .` {} . {} ` order by {} ; ' . format ( materialized_database , schema_name , table_name , order_by ) )
2021-02-08 23:23:51 +00:00
2021-06-27 19:09:17 +00:00
while result != expected :
time . sleep ( 0.5 )
2021-09-12 12:33:54 +00:00
if len ( schema_name ) == 0 :
result = instance . query ( ' select * from {} . {} order by {} ; ' . format ( materialized_database , table_name , order_by ) )
else :
result = instance . query ( ' select * from {} .` {} . {} ` order by {} ; ' . format ( materialized_database , schema_name , table_name , order_by ) )
2021-02-08 23:23:51 +00:00
2021-06-27 19:09:17 +00:00
assert ( result == expected )
2021-02-08 23:23:51 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
2021-09-09 09:18:08 +00:00
conn = get_postgres_conn ( ip = cluster . postgres_ip , port = cluster . postgres_port )
2021-02-08 23:23:51 +00:00
cursor = conn . cursor ( )
create_postgres_db ( cursor , ' postgres_database ' )
2021-09-09 09:18:08 +00:00
create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port )
2021-05-23 12:09:20 +00:00
2021-02-08 23:23:51 +00:00
instance . query ( " DROP DATABASE IF EXISTS test_database " )
yield cluster
finally :
cluster . shutdown ( )
2021-02-12 18:21:55 +00:00
def test_load_and_sync_all_database_tables ( started_cluster ) :
2021-07-04 08:54:41 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-02-08 23:23:51 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
2021-02-15 22:49:13 +00:00
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table ( cursor , table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(50) " . format ( table_name ) )
2021-02-08 23:23:51 +00:00
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-02-08 23:23:51 +00:00
assert ' test_database ' in instance . query ( ' SHOW DATABASES ' )
for i in range ( NUM_TABLES ) :
2021-02-15 22:49:13 +00:00
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
cursor . execute ( ' drop table {} ; ' . format ( table_name ) )
2021-02-08 23:23:51 +00:00
result = instance . query ( ''' SELECT count() FROM system.tables WHERE database = ' test_database ' ; ''' )
assert ( int ( result ) == NUM_TABLES )
2021-07-05 03:53:48 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-02-08 23:23:51 +00:00
def test_replicating_dml ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-02-08 23:23:51 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, {} from numbers(50) " . format ( i , i ) )
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-02-08 23:23:51 +00:00
for i in range ( NUM_TABLES ) :
2021-02-11 21:59:58 +00:00
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT 50 + number, {} from numbers(1000) " . format ( i , i ) )
2021-02-08 23:23:51 +00:00
for i in range ( NUM_TABLES ) :
2021-02-15 22:49:13 +00:00
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
2021-02-08 23:23:51 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' UPDATE postgresql_replica_ {} SET value = {} * {} WHERE key < 50; ' . format ( i , i , i ) )
cursor . execute ( ' UPDATE postgresql_replica_ {} SET value = {} * {} * {} WHERE key >= 50; ' . format ( i , i , i , i ) )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
for i in range ( NUM_TABLES ) :
cursor . execute ( ' DELETE FROM postgresql_replica_ {} WHERE (value*value + {} ) % 2 = 0; ' . format ( i , i ) )
2021-02-12 10:05:13 +00:00
cursor . execute ( ' UPDATE postgresql_replica_ {} SET value = value - (value % 7) WHERE key > 128 AND key < 512; ' . format ( i ) )
cursor . execute ( ' DELETE FROM postgresql_replica_ {} WHERE key % 7 = 1; ' . format ( i , i ) )
2021-02-08 23:23:51 +00:00
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
for i in range ( NUM_TABLES ) :
2021-07-05 03:53:48 +00:00
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-02-08 23:23:51 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-02-08 23:23:51 +00:00
2021-02-12 10:05:13 +00:00
def test_different_data_types ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-02-12 10:05:13 +00:00
cursor = conn . cursor ( )
cursor . execute ( ' drop table if exists test_data_types; ' )
cursor . execute ( ' drop table if exists test_array_data_type; ' )
cursor . execute (
''' CREATE TABLE test_data_types (
id integer PRIMARY KEY , a smallint , b integer , c bigint , d real , e double precision , f serial , g bigserial ,
h timestamp , i date , j decimal ( 5 , 5 ) , k numeric ( 5 , 5 ) ) ''' )
cursor . execute (
''' CREATE TABLE test_array_data_type
(
key Integer NOT NULL PRIMARY KEY ,
a Date [ ] NOT NULL , - - Date
2021-07-14 00:51:14 +00:00
b Timestamp [ ] NOT NULL , - - DateTime64 ( 6 )
2021-02-12 10:05:13 +00:00
c real [ ] [ ] NOT NULL , - - Float32
d double precision [ ] [ ] NOT NULL , - - Float64
e decimal ( 5 , 5 ) [ ] [ ] [ ] NOT NULL , - - Decimal32
f integer [ ] [ ] [ ] NOT NULL , - - Int32
g Text [ ] [ ] [ ] [ ] [ ] NOT NULL , - - String
h Integer [ ] [ ] [ ] , - - Nullable ( Int32 )
i Char ( 2 ) [ ] [ ] [ ] [ ] , - - Nullable ( String )
k Char ( 2 ) [ ] - - Nullable ( String )
) ''' )
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-02-12 10:05:13 +00:00
for i in range ( 10 ) :
instance . query ( '''
INSERT INTO postgres_database . test_data_types VALUES
2021-07-13 07:28:15 +00:00
( { } , - 32768 , - 2147483648 , - 9223372036854775808 , 1.12345 , 1.1234567890 , 2147483647 , 9223372036854775807 , ' 2000-05-12 12:12:12.012345 ' , ' 2000-05-12 ' , 0.2 , 0.2 ) ''' .format(i))
2021-02-12 10:05:13 +00:00
check_tables_are_synchronized ( ' test_data_types ' , ' id ' ) ;
result = instance . query ( ' SELECT * FROM test_database.test_data_types ORDER BY id LIMIT 1; ' )
2021-08-16 02:00:39 +00:00
assert ( result == ' 0 \t -32768 \t -2147483648 \t -9223372036854775808 \t 1.12345 \t 1.123456789 \t 2147483647 \t 9223372036854775807 \t 2000-05-12 12:12:12.012345 \t 2000-05-12 \t 0.2 \t 0.2 \n ' )
2021-02-18 23:33:01 +00:00
for i in range ( 10 ) :
col = random . choice ( [ ' a ' , ' b ' , ' c ' ] )
cursor . execute ( ' UPDATE test_data_types SET {} = {} ; ' . format ( col , i ) )
cursor . execute ( ''' UPDATE test_data_types SET i = ' 2020-12-12 ' ; ''' . format ( col , i ) )
check_tables_are_synchronized ( ' test_data_types ' , ' id ' ) ;
2021-02-12 10:05:13 +00:00
instance . query ( " INSERT INTO postgres_database.test_array_data_type "
" VALUES ( "
" 0, "
" [ ' 2000-05-12 ' , ' 2000-05-12 ' ], "
2021-07-13 07:28:15 +00:00
" [ ' 2000-05-12 12:12:12.012345 ' , ' 2000-05-12 12:12:12.012345 ' ], "
2021-02-12 10:05:13 +00:00
" [[1.12345], [1.12345], [1.12345]], "
" [[1.1234567891], [1.1234567891], [1.1234567891]], "
" [[[0.11111, 0.11111]], [[0.22222, 0.22222]], [[0.33333, 0.33333]]], "
" [[[1, 1], [1, 1]], [[3, 3], [3, 3]], [[4, 4], [5, 5]]], "
" [[[[[ ' winx ' , ' winx ' , ' winx ' ]]]]], "
" [[[1, NULL], [NULL, 1]], [[NULL, NULL], [NULL, NULL]], [[4, 4], [5, 5]]], "
" [[[[NULL]]]], "
" [] "
" ) " )
expected = (
" 0 \t " +
" [ ' 2000-05-12 ' , ' 2000-05-12 ' ] \t " +
2021-07-13 07:28:15 +00:00
" [ ' 2000-05-12 12:12:12.012345 ' , ' 2000-05-12 12:12:12.012345 ' ] \t " +
2021-02-12 10:05:13 +00:00
" [[1.12345],[1.12345],[1.12345]] \t " +
" [[1.1234567891],[1.1234567891],[1.1234567891]] \t " +
" [[[0.11111,0.11111]],[[0.22222,0.22222]],[[0.33333,0.33333]]] \t "
" [[[1,1],[1,1]],[[3,3],[3,3]],[[4,4],[5,5]]] \t "
" [[[[[ ' winx ' , ' winx ' , ' winx ' ]]]]] \t "
" [[[1,NULL],[NULL,1]],[[NULL,NULL],[NULL,NULL]],[[4,4],[5,5]]] \t "
" [[[[NULL]]]] \t "
" [] \n "
)
check_tables_are_synchronized ( ' test_array_data_type ' ) ;
result = instance . query ( ' SELECT * FROM test_database.test_array_data_type ORDER BY key; ' )
assert ( result == expected )
2021-07-05 03:53:48 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
cursor . execute ( ' drop table if exists test_data_types; ' )
cursor . execute ( ' drop table if exists test_array_data_type; ' )
2021-02-12 10:05:13 +00:00
2021-02-12 18:21:55 +00:00
def test_load_and_sync_subset_of_database_tables ( started_cluster ) :
2021-07-04 08:54:41 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-02-12 18:21:55 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 10
publication_tables = ' '
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, number from numbers(50) " . format ( i ) )
2021-02-15 22:49:13 +00:00
if i < int ( NUM_TABLES / 2 ) :
2021-02-12 18:21:55 +00:00
if publication_tables != ' ' :
publication_tables + = ' , '
publication_tables + = table_name
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
2021-06-27 19:09:17 +00:00
settings = [ " materialized_postgresql_tables_list = ' {} ' " . format ( publication_tables ) ] )
2021-02-12 18:21:55 +00:00
assert ' test_database ' in instance . query ( ' SHOW DATABASES ' )
time . sleep ( 1 )
2021-02-15 22:49:13 +00:00
for i in range ( int ( NUM_TABLES / 2 ) ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
assert_nested_table_is_created ( table_name )
2021-02-12 18:21:55 +00:00
result = instance . query ( ''' SELECT count() FROM system.tables WHERE database = ' test_database ' ; ''' )
2021-02-15 22:49:13 +00:00
assert ( int ( result ) == int ( NUM_TABLES / 2 ) )
2021-02-12 18:21:55 +00:00
database_tables = instance . query ( ' SHOW TABLES FROM test_database ' )
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
2021-02-15 22:49:13 +00:00
if i < int ( NUM_TABLES / 2 ) :
2021-02-12 18:21:55 +00:00
assert table_name in database_tables
else :
assert table_name not in database_tables
instance . query ( " INSERT INTO postgres_database. {} SELECT 50 + number, {} from numbers(100) " . format ( table_name , i ) )
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
2021-02-15 22:49:13 +00:00
if i < int ( NUM_TABLES / 2 ) :
2021-02-12 18:21:55 +00:00
check_tables_are_synchronized ( table_name ) ;
2021-07-05 03:53:48 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-02-12 18:21:55 +00:00
2021-02-22 12:35:53 +00:00
def test_changing_replica_identity_value ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-02-22 12:35:53 +00:00
cursor = conn . cursor ( )
create_postgres_table ( cursor , ' postgresql_replica ' ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50) " )
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-02-22 12:35:53 +00:00
instance . query ( " INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50) " )
check_tables_are_synchronized ( ' postgresql_replica ' ) ;
cursor . execute ( " UPDATE postgresql_replica SET key=key-25 WHERE key<100 " )
check_tables_are_synchronized ( ' postgresql_replica ' ) ;
2021-07-05 03:53:48 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
cursor . execute ( ' drop table if exists postgresql_replica; ' )
2021-02-22 12:35:53 +00:00
2021-02-18 23:33:01 +00:00
2021-04-08 20:39:56 +00:00
def test_clickhouse_restart ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-04-08 20:39:56 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, {} from numbers(50) " . format ( i , i ) )
2021-06-27 19:09:17 +00:00
instance . query ( " CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL( ' postgres1:5432 ' , ' postgres_database ' , ' postgres ' , ' mysecretpassword ' ) " )
2021-04-08 20:39:56 +00:00
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
for i in range ( NUM_TABLES ) :
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT 50 + number, {} from numbers(50000) " . format ( i , i ) )
instance . restart_clickhouse ( )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
2021-07-05 03:53:48 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-04-08 20:39:56 +00:00
2021-04-10 14:42:45 +00:00
def test_replica_identity_index ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-10-02 12:49:20 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port , database = True )
2021-04-10 14:42:45 +00:00
cursor = conn . cursor ( )
create_postgres_table ( cursor , ' postgresql_replica ' , template = postgres_table_template_3 ) ;
cursor . execute ( " CREATE unique INDEX idx on postgresql_replica(key1, key2); " )
cursor . execute ( " ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx " )
instance . query ( " INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10) " )
2021-10-02 12:49:20 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port )
2021-04-10 14:42:45 +00:00
instance . query ( " INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10) " )
check_tables_are_synchronized ( ' postgresql_replica ' , order_by = ' key1 ' ) ;
cursor . execute ( " UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 " )
cursor . execute ( " UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 " )
cursor . execute ( " UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 " )
cursor . execute ( " UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 " )
check_tables_are_synchronized ( ' postgresql_replica ' , order_by = ' key1 ' ) ;
cursor . execute ( ' DELETE FROM postgresql_replica WHERE key2<75; ' )
check_tables_are_synchronized ( ' postgresql_replica ' , order_by = ' key1 ' ) ;
2021-07-05 03:53:48 +00:00
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
cursor . execute ( ' drop table if exists postgresql_replica; ' )
2021-04-10 14:42:45 +00:00
2021-05-03 17:28:54 +00:00
def test_table_schema_changes ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-05-03 17:28:54 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) , template = postgres_table_template_2 ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, {} , {} , {} from numbers(25) " . format ( i , i , i , i ) )
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
2021-06-27 19:09:17 +00:00
settings = [ " materialized_postgresql_allow_automatic_update = 1 " ] )
2021-05-03 17:28:54 +00:00
for i in range ( NUM_TABLES ) :
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT 25 + number, {} , {} , {} from numbers(25) " . format ( i , i , i , i ) )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
expected = instance . query ( " SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key " ) ;
2021-10-08 20:52:15 +00:00
altered_idx = random . randint ( 0 , 4 )
altered_table = f ' postgresql_replica_ { altered_idx } '
cursor . execute ( f " ALTER TABLE { altered_table } DROP COLUMN value2 " )
2021-05-03 17:28:54 +00:00
for i in range ( NUM_TABLES ) :
2021-10-08 20:52:15 +00:00
cursor . execute ( f " INSERT INTO postgresql_replica_ { i } VALUES (50, { i } , { i } ) " )
cursor . execute ( f " UPDATE { altered_table } SET value3 = 12 WHERE key%2=0 " )
2021-05-03 17:28:54 +00:00
2021-10-08 20:52:15 +00:00
time . sleep ( 2 )
assert_nested_table_is_created ( altered_table )
assert_number_of_columns ( 3 , altered_table )
check_tables_are_synchronized ( altered_table )
2021-05-03 17:28:54 +00:00
print ( ' check1 OK ' )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
for i in range ( NUM_TABLES ) :
2021-10-08 20:52:15 +00:00
if i != altered_idx :
2021-05-03 17:28:54 +00:00
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT 51 + number, {} , {} , {} from numbers(49) " . format ( i , i , i , i ) )
else :
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT 51 + number, {} , {} from numbers(49) " . format ( i , i , i ) )
2021-10-08 20:52:15 +00:00
check_tables_are_synchronized ( altered_table ) ;
2021-05-03 17:28:54 +00:00
print ( ' check2 OK ' )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table postgresql_replica_ {} ; ' . format ( i ) )
instance . query ( " DROP DATABASE test_database " )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-05-03 17:28:54 +00:00
2021-05-02 11:50:29 +00:00
2021-05-23 12:09:20 +00:00
def test_many_concurrent_queries ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-05-10 11:31:06 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
instance . query ( ' INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, number from numbers(10000) ' . format ( i ) )
n = [ 10000 ]
2021-05-23 12:09:20 +00:00
query_pool = [ ' DELETE FROM postgresql_replica_ {} WHERE (value*value) % 3 = 0; ' ,
2021-05-10 13:51:05 +00:00
' UPDATE postgresql_replica_ {} SET value = value - 125 WHERE key % 2 = 0; ' ,
2021-05-10 11:31:06 +00:00
' DELETE FROM postgresql_replica_ {} WHERE key % 10 = 0; ' ,
2021-05-10 14:51:17 +00:00
' UPDATE postgresql_replica_ {} SET value = value*5 WHERE key % 2 = 1; ' ,
2021-05-10 11:31:06 +00:00
' DELETE FROM postgresql_replica_ {} WHERE value % 2 = 0; ' ,
2021-05-10 13:51:05 +00:00
' UPDATE postgresql_replica_ {} SET value = value + 2000 WHERE key % 5 = 0; ' ,
2021-05-10 11:31:06 +00:00
' DELETE FROM postgresql_replica_ {} WHERE value % 3 = 0; ' ,
2021-05-10 13:51:05 +00:00
' UPDATE postgresql_replica_ {} SET value = value * 2 WHERE key % 3 = 0; ' ,
2021-05-10 11:31:06 +00:00
' DELETE FROM postgresql_replica_ {} WHERE value % 9 = 2; ' ,
2021-05-10 13:51:05 +00:00
' UPDATE postgresql_replica_ {} SET value = value + 2 WHERE key % 3 = 1; ' ,
' DELETE FROM postgresql_replica_ {} WHERE value % 5 = 0; ' ]
2021-05-10 11:31:06 +00:00
def attack ( thread_id ) :
print ( ' thread {} ' . format ( thread_id ) )
k = 10000
2021-05-23 12:09:20 +00:00
for i in range ( 20 ) :
query_id = random . randrange ( 0 , len ( query_pool ) - 1 )
2021-05-10 11:31:06 +00:00
table_id = random . randrange ( 0 , 5 ) # num tables
# random update / delete query
2021-05-23 12:09:20 +00:00
cursor . execute ( query_pool [ query_id ] . format ( table_id ) )
2021-05-10 11:31:06 +00:00
print ( " table {} query {} ok " . format ( table_id , query_id ) )
# allow some thread to do inserts (not to violate key constraints)
if thread_id < 5 :
2021-05-10 13:51:05 +00:00
print ( " try insert table {} " . format ( thread_id ) )
instance . query ( ' INSERT INTO postgres_database.postgresql_replica_ {} SELECT {} *10000*( {} + number), number from numbers(1000) ' . format ( i , thread_id , k ) )
2021-05-10 11:31:06 +00:00
k + = 1
print ( " insert table {} ok " . format ( thread_id ) )
2021-05-10 13:51:05 +00:00
if i == 5 :
# also change primary key value
print ( " try update primary key {} " . format ( thread_id ) )
cursor . execute ( " UPDATE postgresql_replica_ {} SET key=key % 100000+100000* {} WHERE key % {} =0 " . format ( thread_id , i + 1 , i + 1 ) )
print ( " update primary key {} ok " . format ( thread_id ) )
2021-05-10 11:31:06 +00:00
threads = [ ]
threads_num = 16
for i in range ( threads_num ) :
threads . append ( threading . Thread ( target = attack , args = ( i , ) ) )
2021-05-23 12:09:20 +00:00
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-05-23 12:09:20 +00:00
2021-05-10 11:31:06 +00:00
for thread in threads :
time . sleep ( random . uniform ( 0 , 1 ) )
thread . start ( )
n [ 0 ] = 50000
for table_id in range ( NUM_TABLES ) :
n [ 0 ] + = 1
instance . query ( ' INSERT INTO postgres_database.postgresql_replica_ {} SELECT {} + number, number from numbers(5000) ' . format ( table_id , n [ 0 ] ) )
2021-05-10 13:51:05 +00:00
#cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(table_id, table_id+1, table_id+1))
2021-05-10 11:31:06 +00:00
for thread in threads :
thread . join ( )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
2021-05-23 12:09:20 +00:00
count1 = instance . query ( ' SELECT count() FROM postgres_database.postgresql_replica_ {} ' . format ( i ) )
count2 = instance . query ( ' SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_ {} ) ' . format ( i ) )
assert ( int ( count1 ) == int ( count2 ) )
print ( count1 , count2 )
2021-07-05 03:53:48 +00:00
2021-05-23 12:09:20 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-05-10 11:31:06 +00:00
2021-05-10 14:51:17 +00:00
def test_single_transaction ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , auto_commit = False )
2021-05-10 14:51:17 +00:00
cursor = conn . cursor ( )
create_postgres_table ( cursor , ' postgresql_replica_0 ' ) ;
conn . commit ( )
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-05-23 12:09:20 +00:00
assert_nested_table_is_created ( ' postgresql_replica_0 ' )
2021-05-10 14:51:17 +00:00
for query in queries :
print ( ' query {} ' . format ( query ) )
cursor . execute ( query . format ( 0 ) )
time . sleep ( 5 )
result = instance . query ( " select count() from test_database.postgresql_replica_0 " )
# no commit yet
assert ( int ( result ) == 0 )
conn . commit ( )
2021-05-13 07:36:40 +00:00
check_tables_are_synchronized ( ' postgresql_replica_0 ' ) ;
2021-07-05 03:53:48 +00:00
2021-05-23 12:09:20 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
cursor . execute ( ' drop table if exists postgresql_replica_0; ' )
2021-05-13 07:36:40 +00:00
def test_virtual_columns ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-03 19:45:27 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-05-13 07:36:40 +00:00
cursor = conn . cursor ( )
create_postgres_table ( cursor , ' postgresql_replica_0 ' ) ;
2021-06-03 19:45:27 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
2021-06-27 19:09:17 +00:00
settings = [ " materialized_postgresql_allow_automatic_update = 1 " ] )
2021-05-13 07:36:40 +00:00
assert_nested_table_is_created ( ' postgresql_replica_0 ' )
instance . query ( " INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number from numbers(10) " )
check_tables_are_synchronized ( ' postgresql_replica_0 ' ) ;
2021-07-13 07:28:15 +00:00
# just check that it works, no check with `expected` because _version is taken as LSN, which will be different each time.
2021-05-13 07:36:40 +00:00
result = instance . query ( ' SELECT key, value, _sign, _version FROM test_database.postgresql_replica_0; ' )
print ( result )
cursor . execute ( " ALTER TABLE postgresql_replica_0 ADD COLUMN value2 integer " )
instance . query ( " INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(10, 10) " )
2021-10-08 20:52:15 +00:00
assert_number_of_columns ( 3 , ' postgresql_replica_0 ' )
2021-05-13 07:36:40 +00:00
check_tables_are_synchronized ( ' postgresql_replica_0 ' ) ;
result = instance . query ( ' SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0; ' )
print ( result )
instance . query ( " INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number, number from numbers(20, 10) " )
check_tables_are_synchronized ( ' postgresql_replica_0 ' ) ;
result = instance . query ( ' SELECT key, value, value2, _sign, _version FROM test_database.postgresql_replica_0; ' )
print ( result )
2021-07-05 03:53:48 +00:00
2021-05-23 12:09:20 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
cursor . execute ( ' drop table if exists postgresql_replica_0; ' )
2021-05-13 07:36:40 +00:00
2021-05-23 12:09:20 +00:00
2021-06-26 22:05:20 +00:00
def test_multiple_databases ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( ' test_database_1 ' )
drop_materialized_db ( ' test_database_2 ' )
2021-06-26 22:05:20 +00:00
NUM_TABLES = 5
2021-06-27 16:15:28 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = False )
2021-06-26 22:05:20 +00:00
cursor = conn . cursor ( )
create_postgres_db ( cursor , ' postgres_database_1 ' )
create_postgres_db ( cursor , ' postgres_database_2 ' )
2021-06-27 16:15:28 +00:00
conn1 = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , database_name = ' postgres_database_1 ' )
conn2 = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , database_name = ' postgres_database_2 ' )
2021-06-26 22:05:20 +00:00
cursor1 = conn1 . cursor ( )
cursor2 = conn2 . cursor ( )
2021-10-02 12:49:20 +00:00
create_clickhouse_postgres_db ( cluster . postgres_ip , cluster . postgres_port , ' postgres_database_1 ' , ' postgres_database_1 ' )
create_clickhouse_postgres_db ( cluster . postgres_ip , cluster . postgres_port , ' postgres_database_2 ' , ' postgres_database_2 ' )
2021-06-26 22:05:20 +00:00
cursors = [ cursor1 , cursor2 ]
for cursor_id in range ( len ( cursors ) ) :
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table ( cursors [ cursor_id ] , table_name ) ;
instance . query ( " INSERT INTO postgres_database_ {} . {} SELECT number, number from numbers(50) " . format ( cursor_id + 1 , table_name ) )
print ( ' database 1 tables: ' , instance . query ( ''' SELECT name FROM system.tables WHERE database = ' postgres_database_1 ' ; ''' ) )
print ( ' database 2 tables: ' , instance . query ( ''' SELECT name FROM system.tables WHERE database = ' postgres_database_2 ' ; ''' ) )
2021-06-27 16:15:28 +00:00
create_materialized_db ( started_cluster . postgres_ip , started_cluster . postgres_port ,
' test_database_1 ' , ' postgres_database_1 ' )
create_materialized_db ( started_cluster . postgres_ip , started_cluster . postgres_port ,
' test_database_2 ' , ' postgres_database_2 ' )
2021-06-26 22:05:20 +00:00
cursors = [ cursor1 , cursor2 ]
for cursor_id in range ( len ( cursors ) ) :
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
instance . query ( " INSERT INTO postgres_database_ {} . {} SELECT 50 + number, number from numbers(50) " . format ( cursor_id + 1 , table_name ) )
for cursor_id in range ( len ( cursors ) ) :
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized (
table_name , ' key ' , ' postgres_database_ {} ' . format ( cursor_id + 1 ) , ' test_database_ {} ' . format ( cursor_id + 1 ) ) ;
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor1 . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
for i in range ( NUM_TABLES ) :
cursor2 . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-06-26 22:05:20 +00:00
drop_clickhouse_postgres_db ( ' postgres_database_1 ' )
drop_clickhouse_postgres_db ( ' postgres_database_2 ' )
2021-07-05 03:53:48 +00:00
2021-06-26 22:05:20 +00:00
drop_materialized_db ( ' test_database_1 ' )
drop_materialized_db ( ' test_database_2 ' )
def test_concurrent_transactions ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-27 16:15:28 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-06-26 22:05:20 +00:00
cursor = conn . cursor ( )
NUM_TABLES = 6
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
def transaction ( thread_id ) :
2021-06-27 16:15:28 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , auto_commit = False )
2021-06-26 22:05:20 +00:00
cursor_ = conn . cursor ( )
for query in queries :
cursor_ . execute ( query . format ( thread_id ) )
print ( ' thread {} , query {} ' . format ( thread_id , query ) )
2021-06-27 16:15:28 +00:00
conn . commit ( )
2021-06-26 22:05:20 +00:00
threads = [ ]
threads_num = 6
for i in range ( threads_num ) :
threads . append ( threading . Thread ( target = transaction , args = ( i , ) ) )
2021-06-27 16:15:28 +00:00
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
2021-06-26 22:05:20 +00:00
for thread in threads :
time . sleep ( random . uniform ( 0 , 0.5 ) )
thread . start ( )
for thread in threads :
thread . join ( )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
count1 = instance . query ( ' SELECT count() FROM postgres_database.postgresql_replica_ {} ' . format ( i ) )
count2 = instance . query ( ' SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_ {} ) ' . format ( i ) )
print ( int ( count1 ) , int ( count2 ) , sep = ' ' )
assert ( int ( count1 ) == int ( count2 ) )
2021-07-05 03:53:48 +00:00
2021-06-26 22:05:20 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-05-10 14:51:17 +00:00
2021-06-29 23:11:46 +00:00
def test_abrupt_connection_loss_while_heavy_replication ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-29 23:11:46 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
NUM_TABLES = 6
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
def transaction ( thread_id ) :
if thread_id % 2 :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , auto_commit = True )
else :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , auto_commit = False )
cursor_ = conn . cursor ( )
for query in queries :
cursor_ . execute ( query . format ( thread_id ) )
print ( ' thread {} , query {} ' . format ( thread_id , query ) )
if thread_id % 2 == 0 :
conn . commit ( )
threads = [ ]
threads_num = 6
for i in range ( threads_num ) :
threads . append ( threading . Thread ( target = transaction , args = ( i , ) ) )
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
for thread in threads :
time . sleep ( random . uniform ( 0 , 0.5 ) )
thread . start ( )
# Join here because it takes time for data to reach wal
for thread in threads :
thread . join ( )
time . sleep ( 1 )
started_cluster . pause_container ( ' postgres1 ' )
for i in range ( NUM_TABLES ) :
result = instance . query ( " SELECT count() FROM test_database.postgresql_replica_ {} " . format ( i ) )
print ( result ) # Just debug
started_cluster . unpause_container ( ' postgres1 ' )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
for i in range ( NUM_TABLES ) :
result = instance . query ( " SELECT count() FROM test_database.postgresql_replica_ {} " . format ( i ) )
print ( result ) # Just debug
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-06-29 23:11:46 +00:00
def test_drop_database_while_replication_startup_not_finished ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-29 23:11:46 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table ( cursor , table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(100000) " . format ( table_name ) )
for i in range ( 6 ) :
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port )
time . sleep ( 0.5 * i )
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-06-29 23:11:46 +00:00
def test_restart_server_while_replication_startup_not_finished ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-06-29 23:11:46 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table ( cursor , table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(100000) " . format ( table_name ) )
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port )
time . sleep ( 0.5 )
instance . restart_clickhouse ( )
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
2021-07-05 03:53:48 +00:00
2021-06-29 23:11:46 +00:00
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table postgresql_replica_ {} ; ' . format ( i ) )
2021-06-29 23:11:46 +00:00
def test_abrupt_server_restart_while_heavy_replication ( started_cluster ) :
2021-07-03 13:35:11 +00:00
drop_materialized_db ( )
2021-07-01 07:33:58 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
2021-07-01 08:20:13 +00:00
NUM_TABLES = 6
2021-07-01 07:33:58 +00:00
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
def transaction ( thread_id ) :
2021-07-01 08:20:13 +00:00
if thread_id % 2 :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , auto_commit = True )
else :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True , auto_commit = False )
2021-07-01 07:33:58 +00:00
cursor_ = conn . cursor ( )
for query in queries :
cursor_ . execute ( query . format ( thread_id ) )
print ( ' thread {} , query {} ' . format ( thread_id , query ) )
2021-07-01 08:20:13 +00:00
if thread_id % 2 == 0 :
conn . commit ( )
2021-07-01 07:33:58 +00:00
threads = [ ]
2021-07-01 08:20:13 +00:00
threads_num = 6
2021-07-01 07:33:58 +00:00
for i in range ( threads_num ) :
threads . append ( threading . Thread ( target = transaction , args = ( i , ) ) )
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
for thread in threads :
time . sleep ( random . uniform ( 0 , 0.5 ) )
thread . start ( )
# Join here because it takes time for data to reach wal
for thread in threads :
thread . join ( )
instance . restart_clickhouse ( )
for i in range ( NUM_TABLES ) :
result = instance . query ( " SELECT count() FROM test_database.postgresql_replica_ {} " . format ( i ) )
print ( result ) # Just debug
for i in range ( NUM_TABLES ) :
check_tables_are_synchronized ( ' postgresql_replica_ {} ' . format ( i ) ) ;
for i in range ( NUM_TABLES ) :
result = instance . query ( " SELECT count() FROM test_database.postgresql_replica_ {} " . format ( i ) )
print ( result ) # Just debug
drop_materialized_db ( )
2021-07-05 03:53:48 +00:00
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-06-29 23:11:46 +00:00
2021-08-31 20:58:00 +00:00
def test_quoting ( started_cluster ) :
table_name = ' user '
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
create_postgres_table ( cursor , table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(50) " . format ( table_name ) )
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port )
check_tables_are_synchronized ( table_name ) ;
drop_postgres_table ( cursor , table_name )
drop_materialized_db ( )
2021-09-04 10:07:59 +00:00
def test_user_managed_slots ( started_cluster ) :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
table_name = ' test_table '
create_postgres_table ( cursor , table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format ( table_name ) )
slot_name = ' user_slot '
replication_connection = get_postgres_conn ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
database = True , replication = True , auto_commit = True )
snapshot = create_replication_slot ( replication_connection , slot_name = slot_name )
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [ " materialized_postgresql_replication_slot = ' {} ' " . format ( slot_name ) ,
" materialized_postgresql_snapshot = ' {} ' " . format ( snapshot ) ] )
check_tables_are_synchronized ( table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000, 10000) " . format ( table_name ) )
check_tables_are_synchronized ( table_name ) ;
instance . restart_clickhouse ( )
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(20000, 10000) " . format ( table_name ) )
check_tables_are_synchronized ( table_name ) ;
drop_postgres_table ( cursor , table_name )
drop_materialized_db ( )
2021-09-04 20:55:59 +00:00
drop_replication_slot ( replication_connection , slot_name )
2021-10-04 08:38:38 +00:00
cursor . execute ( ' DROP TABLE IF EXISTS test_table ' )
2021-09-04 10:07:59 +00:00
2021-09-08 19:49:45 +00:00
def test_add_new_table_to_replication ( started_cluster ) :
2021-09-03 11:16:32 +00:00
drop_materialized_db ( )
2021-09-08 22:25:08 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
2021-09-03 11:16:32 +00:00
cursor = conn . cursor ( )
2021-08-27 12:50:45 +00:00
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, {} from numbers(10000) " . format ( i , i ) )
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
table_name = ' postgresql_replica_5 '
create_postgres_table ( cursor , table_name )
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format ( table_name ) )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " ) # Check without ip
assert ( result [ - 59 : ] == " \\ ' postgres_database \\ ' , \\ ' postgres \\ ' , \\ ' mysecretpassword \\ ' ) \n " )
result = instance . query_and_get_error ( " ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list= ' tabl1 ' " )
assert ( ' Changing setting `materialized_postgresql_tables_list` is not allowed ' in result )
result = instance . query_and_get_error ( " ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables= ' tabl1 ' " )
assert ( ' Database engine MaterializedPostgreSQL does not support setting ' in result )
instance . query ( " ATTACH TABLE test_database. {} " . format ( table_name ) ) ;
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n postgresql_replica_5 \n " )
check_tables_are_synchronized ( table_name ) ;
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000, 10000) " . format ( table_name ) )
check_tables_are_synchronized ( table_name ) ;
result = instance . query_and_get_error ( " ATTACH TABLE test_database. {} " . format ( table_name ) ) ;
assert ( ' Table test_database.postgresql_replica_5 already exists ' in result )
result = instance . query_and_get_error ( " ATTACH TABLE test_database.unknown_table " ) ;
assert ( ' PostgreSQL table unknown_table does not exist ' in result )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 180 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5 \\ ' \n " )
table_name = ' postgresql_replica_6 '
create_postgres_table ( cursor , table_name )
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format ( table_name ) )
instance . query ( " ATTACH TABLE test_database. {} " . format ( table_name ) ) ;
instance . restart_clickhouse ( )
table_name = ' postgresql_replica_7 '
create_postgres_table ( cursor , table_name )
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format ( table_name ) )
instance . query ( " ATTACH TABLE test_database. {} " . format ( table_name ) ) ;
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 222 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7 \\ ' \n " )
2021-09-03 11:16:32 +00:00
2021-08-27 12:50:45 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n postgresql_replica_5 \n postgresql_replica_6 \n postgresql_replica_7 \n " )
for i in range ( NUM_TABLES + 3 ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
for i in range ( NUM_TABLES + 3 ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-08-28 13:42:36 +00:00
def test_remove_table_from_replication ( started_cluster ) :
2021-09-03 11:16:32 +00:00
drop_materialized_db ( )
2021-08-28 13:42:36 +00:00
conn = get_postgres_conn ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database = True )
cursor = conn . cursor ( )
NUM_TABLES = 5
for i in range ( NUM_TABLES ) :
create_postgres_table ( cursor , ' postgresql_replica_ {} ' . format ( i ) ) ;
instance . query ( " INSERT INTO postgres_database.postgresql_replica_ {} SELECT number, {} from numbers(10000) " . format ( i , i ) )
create_materialized_db ( ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port )
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 59 : ] == " \\ ' postgres_database \\ ' , \\ ' postgres \\ ' , \\ ' mysecretpassword \\ ' ) \n " )
table_name = ' postgresql_replica_4 '
instance . query ( ' DETACH TABLE test_database. {} ' . format ( table_name ) ) ;
result = instance . query_and_get_error ( ' SELECT * FROM test_database. {} ' . format ( table_name ) )
assert ( " doesn ' t exist " in result )
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n " )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 138 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3 \\ ' \n " )
instance . query ( ' ATTACH TABLE test_database. {} ' . format ( table_name ) ) ;
check_tables_are_synchronized ( table_name ) ;
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
check_tables_are_synchronized ( table_name ) ;
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 159 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4 \\ ' \n " )
table_name = ' postgresql_replica_1 '
instance . query ( ' DETACH TABLE test_database. {} ' . format ( table_name ) ) ;
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 138 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4 \\ ' \n " )
for i in range ( NUM_TABLES ) :
cursor . execute ( ' drop table if exists postgresql_replica_ {} ; ' . format ( i ) )
2021-09-03 11:16:32 +00:00
2021-09-22 15:10:25 +00:00
def test_predefined_connection_configuration ( started_cluster ) :
drop_materialized_db ( )
conn = get_postgres_conn ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port , database = True )
cursor = conn . cursor ( )
cursor . execute ( f ' DROP TABLE IF EXISTS test_table ' )
cursor . execute ( f ' CREATE TABLE test_table (key integer PRIMARY KEY, value integer) ' )
instance . query ( " CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) " )
check_tables_are_synchronized ( " test_table " ) ;
drop_materialized_db ( )
2021-10-04 08:38:38 +00:00
cursor . execute ( ' DROP TABLE IF EXISTS test_table ' )
2021-09-22 15:10:25 +00:00
2021-10-02 11:11:18 +00:00
insert_counter = 0
def test_database_with_multiple_non_default_schemas_1 ( started_cluster ) :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port , database = True )
2021-09-12 12:33:54 +00:00
cursor = conn . cursor ( )
2021-10-02 11:11:18 +00:00
NUM_TABLES = 5
2021-09-12 12:33:54 +00:00
schema_name = ' test_schema '
clickhouse_postgres_db = ' postgres_database_with_schema '
publication_tables = ' '
2021-10-02 11:11:18 +00:00
insert_counter = 0
def insert_into_tables ( ) :
global insert_counter
clickhouse_postgres_db = ' postgres_database_with_schema '
for i in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { i } '
instance . query ( f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) " )
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( ' SHOW TABLES FROM test_database ' )
assert ( result == expected )
print ( ' assert show tables Ok ' )
def check_all_tables_are_synchronized ( ) :
for i in range ( NUM_TABLES ) :
print ( ' checking table ' , i )
check_tables_are_synchronized ( " postgresql_replica_ {} " . format ( i ) , schema_name = schema_name , postgres_database = clickhouse_postgres_db ) ;
print ( ' synchronization Ok ' )
create_postgres_schema ( cursor , schema_name )
create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port , name = clickhouse_postgres_db , schema_name = schema_name )
2021-09-12 12:33:54 +00:00
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table_with_schema ( cursor , schema_name , table_name ) ;
if publication_tables != ' ' :
publication_tables + = ' , '
publication_tables + = schema_name + ' . ' + table_name
2021-10-02 11:11:18 +00:00
insert_into_tables ( )
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
2021-10-03 08:55:50 +00:00
settings = [ f " materialized_postgresql_tables_list = ' { publication_tables } ' " , " materialized_postgresql_tables_list_with_schema=1 " , " materialized_postgresql_allow_automatic_update = 1 " ] )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
check_all_tables_are_synchronized ( )
assert_show_tables ( " test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n " )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
instance . restart_clickhouse ( )
check_all_tables_are_synchronized ( )
assert_show_tables ( " test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n " )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
insert_into_tables ( )
check_all_tables_are_synchronized ( )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
#altered_table = random.randint(0, NUM_TABLES-1)
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
#table_name = 'postgresql_replica_{}'.format(altered_table)
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
#print('Ok')
drop_materialized_db ( )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
def test_database_with_multiple_non_default_schemas_2 ( started_cluster ) :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port , database = True )
cursor = conn . cursor ( )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
NUM_TABLES = 2
schemas_num = 2
schema_list = ' schema0, schema1 '
insert_counter = 0
def check_all_tables_are_synchronized ( ) :
for i in range ( schemas_num ) :
schema_name = f ' schema { i } '
clickhouse_postgres_db = f ' clickhouse_postgres_db { i } '
for ti in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { ti } '
print ( f ' checking table { schema_name } . { table_name } ' )
check_tables_are_synchronized ( f ' { table_name } ' , schema_name = schema_name , postgres_database = clickhouse_postgres_db ) ;
print ( ' synchronized Ok ' )
def insert_into_tables ( ) :
global insert_counter
for i in range ( schemas_num ) :
clickhouse_postgres_db = f ' clickhouse_postgres_db { i } '
for ti in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { ti } '
instance . query ( f ' INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) ' )
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( ' SHOW TABLES FROM test_database ' )
assert ( result == expected )
print ( ' assert show tables Ok ' )
for i in range ( schemas_num ) :
schema_name = f ' schema { i } '
clickhouse_postgres_db = f ' clickhouse_postgres_db { i } '
create_postgres_schema ( cursor , schema_name )
create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port , name = clickhouse_postgres_db , schema_name = schema_name )
for ti in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { ti } '
create_postgres_table_with_schema ( cursor , schema_name , table_name ) ;
insert_into_tables ( )
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
settings = [ f " materialized_postgresql_schema_list = ' { schema_list } ' " , " materialized_postgresql_allow_automatic_update = 1 " ] )
check_all_tables_are_synchronized ( )
insert_into_tables ( )
assert_show_tables ( " schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n " )
instance . restart_clickhouse ( )
assert_show_tables ( " schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n " )
check_all_tables_are_synchronized ( )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
2021-09-12 12:33:54 +00:00
2021-10-01 15:54:01 +00:00
#altered_table = random.randint(0, NUM_TABLES-1)
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
2021-09-12 12:33:54 +00:00
2021-10-01 15:54:01 +00:00
#table_name = 'postgresql_replica_{}'.format(altered_table)
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), schema_name=schema_name, postgres_database=clickhouse_postgres_db);
#print('Ok')
2021-09-12 12:33:54 +00:00
drop_materialized_db ( )
2021-10-02 11:11:18 +00:00
def test_database_with_single_non_default_schema ( started_cluster ) :
conn = get_postgres_conn ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port , database = True )
2021-09-12 12:33:54 +00:00
cursor = conn . cursor ( )
2021-10-02 11:11:18 +00:00
NUM_TABLES = 5
2021-09-12 12:33:54 +00:00
schema_name = ' test_schema '
clickhouse_postgres_db = ' postgres_database_with_schema '
2021-10-02 11:11:18 +00:00
insert_counter = 0
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
def insert_into_tables ( ) :
global insert_counter
clickhouse_postgres_db = ' postgres_database_with_schema '
for i in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { i } '
instance . query ( f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) " )
insert_counter + = 1
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
def assert_show_tables ( expected ) :
result = instance . query ( ' SHOW TABLES FROM test_database ' )
assert ( result == expected )
print ( ' assert show tables Ok ' )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
def check_all_tables_are_synchronized ( ) :
for i in range ( NUM_TABLES ) :
print ( ' checking table ' , i )
check_tables_are_synchronized ( " postgresql_replica_ {} " . format ( i ) , postgres_database = clickhouse_postgres_db ) ;
print ( ' synchronization Ok ' )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
create_postgres_schema ( cursor , schema_name )
create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port , name = clickhouse_postgres_db , schema_name = schema_name )
2021-09-12 12:33:54 +00:00
for i in range ( NUM_TABLES ) :
2021-10-02 11:11:18 +00:00
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table_with_schema ( cursor , schema_name , table_name ) ;
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
insert_into_tables ( )
create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
settings = [ f " materialized_postgresql_schema = ' { schema_name } ' " , " materialized_postgresql_allow_automatic_update = 1 " ] )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
insert_into_tables ( )
check_all_tables_are_synchronized ( )
assert_show_tables ( " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
2021-09-12 12:33:54 +00:00
2021-10-02 11:11:18 +00:00
instance . restart_clickhouse ( )
check_all_tables_are_synchronized ( )
assert_show_tables ( " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
2021-09-12 12:33:54 +00:00
2021-10-01 15:54:01 +00:00
#altered_table = random.randint(0, NUM_TABLES-1)
#cursor.execute("ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(altered_table))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), postgres_database=clickhouse_postgres_db);
2021-09-12 12:33:54 +00:00
2021-10-01 15:54:01 +00:00
#table_name = 'postgresql_replica_{}'.format(altered_table)
#instance.query("INSERT INTO {}.{} SELECT number, number, number from numbers(5000, 1000)".format(clickhouse_postgres_db, table_name))
#check_tables_are_synchronized("postgresql_replica_{}".format(altered_table), postgres_database=clickhouse_postgres_db);
#print('Ok')
2021-09-12 12:33:54 +00:00
drop_materialized_db ( )
2021-02-08 23:23:51 +00:00
if __name__ == ' __main__ ' :
cluster . start ( )
input ( " Cluster created, press any key to destroy... " )
cluster . shutdown ( )