2021-11-30 12:26:57 +00:00
import pytest
import time
import psycopg2
import os . path as p
import random
from helpers . cluster import ClickHouseCluster
from helpers . test_tools import assert_eq_with_retry
from psycopg2 . extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers . test_tools import TSV
from random import randrange
import threading
2022-01-08 12:26:29 +00:00
from helpers . postgres_utility import get_postgres_conn
from helpers . postgres_utility import PostgresManager
from helpers . postgres_utility import create_replication_slot , drop_replication_slot
from helpers . postgres_utility import create_postgres_schema , drop_postgres_schema
from helpers . postgres_utility import create_postgres_table , drop_postgres_table
from helpers . postgres_utility import create_postgres_table_with_schema , drop_postgres_table_with_schema
from helpers . postgres_utility import check_tables_are_synchronized
from helpers . postgres_utility import check_several_tables_are_synchronized
from helpers . postgres_utility import assert_nested_table_is_created
from helpers . postgres_utility import assert_number_of_columns
from helpers . postgres_utility import postgres_table_template , postgres_table_template_2 , postgres_table_template_3 , postgres_table_template_4 , postgres_table_template_5
from helpers . postgres_utility import queries
2021-11-30 12:26:57 +00:00
cluster = ClickHouseCluster ( __file__ )
instance = cluster . add_instance ( ' instance ' ,
main_configs = [ ' configs/log_conf.xml ' ] ,
user_configs = [ ' configs/users.xml ' ] ,
with_postgres = True , stay_alive = True )
2022-01-08 12:26:29 +00:00
pg_manager = PostgresManager ( )
2021-11-30 12:26:57 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
2022-01-08 12:26:29 +00:00
pg_manager . init ( instance , cluster . postgres_ip , cluster . postgres_port )
2021-11-30 12:26:57 +00:00
yield cluster
finally :
cluster . shutdown ( )
2022-01-08 12:26:29 +00:00
@pytest.fixture ( autouse = True )
def setup_teardown ( ) :
print ( " PostgreSQL is available - running test " )
yield # run test
pg_manager . restart ( )
2021-11-30 12:26:57 +00:00
def test_add_new_table_to_replication ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-12-02 13:08:11 +00:00
cursor . execute ( ' DROP TABLE IF EXISTS test_table ' )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 5
2022-01-08 12:26:29 +00:00
pg_manager . create_and_fill_postgres_tables_from_cursor ( cursor , NUM_TABLES , 10000 )
pg_manager . create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port )
check_several_tables_are_synchronized ( instance , NUM_TABLES )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
table_name = ' postgresql_replica_5 '
2022-01-08 12:26:29 +00:00
pg_manager . create_and_fill_postgres_table_from_cursor ( cursor , table_name )
2021-11-30 12:26:57 +00:00
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " ) # Check without ip
assert ( result [ - 59 : ] == " \\ ' postgres_database \\ ' , \\ ' postgres \\ ' , \\ ' mysecretpassword \\ ' ) \n " )
result = instance . query_and_get_error ( " ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list= ' tabl1 ' " )
assert ( ' Changing setting `materialized_postgresql_tables_list` is not allowed ' in result )
result = instance . query_and_get_error ( " ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables= ' tabl1 ' " )
assert ( ' Database engine MaterializedPostgreSQL does not support setting ' in result )
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " ) ;
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n postgresql_replica_5 \n " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, number from numbers(10000, 10000) " )
check_tables_are_synchronized ( instance , table_name ) ;
2021-11-30 12:26:57 +00:00
2022-01-08 12:26:29 +00:00
result = instance . query_and_get_error ( f " ATTACH TABLE test_database. { table_name } " ) ;
2021-11-30 12:26:57 +00:00
assert ( ' Table test_database.postgresql_replica_5 already exists ' in result )
result = instance . query_and_get_error ( " ATTACH TABLE test_database.unknown_table " ) ;
assert ( ' PostgreSQL table unknown_table does not exist ' in result )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 180 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5 \\ ' \n " )
table_name = ' postgresql_replica_6 '
create_postgres_table ( cursor , table_name )
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format ( table_name ) )
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " ) ;
2021-11-30 12:26:57 +00:00
instance . restart_clickhouse ( )
table_name = ' postgresql_replica_7 '
create_postgres_table ( cursor , table_name )
instance . query ( " INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format ( table_name ) )
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " ) ;
2021-11-30 12:26:57 +00:00
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 222 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7 \\ ' \n " )
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n postgresql_replica_5 \n postgresql_replica_6 \n postgresql_replica_7 \n " )
2022-01-08 12:26:29 +00:00
check_several_tables_are_synchronized ( instance , NUM_TABLES + 3 )
2021-11-30 12:26:57 +00:00
def test_remove_table_from_replication ( started_cluster ) :
NUM_TABLES = 5
2022-01-08 12:26:29 +00:00
pg_manager . create_and_fill_postgres_tables ( NUM_TABLES , 10000 )
pg_manager . create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port )
check_several_tables_are_synchronized ( instance , NUM_TABLES )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 59 : ] == " \\ ' postgres_database \\ ' , \\ ' postgres \\ ' , \\ ' mysecretpassword \\ ' ) \n " )
table_name = ' postgresql_replica_4 '
2022-01-08 12:26:29 +00:00
instance . query ( f ' DETACH TABLE test_database. { table_name } ' ) ;
result = instance . query_and_get_error ( f ' SELECT * FROM test_database. { table_name } ' )
2021-11-30 12:26:57 +00:00
assert ( " doesn ' t exist " in result )
result = instance . query ( " SHOW TABLES FROM test_database " )
assert ( result == " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n " )
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 138 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3 \\ ' \n " )
2022-01-08 12:26:29 +00:00
instance . query ( f ' ATTACH TABLE test_database. { table_name } ' ) ;
check_tables_are_synchronized ( instance , table_name ) ;
check_several_tables_are_synchronized ( instance , NUM_TABLES )
2021-11-30 12:26:57 +00:00
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 159 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4 \\ ' \n " )
table_name = ' postgresql_replica_1 '
2022-01-08 12:26:29 +00:00
instance . query ( f ' DETACH TABLE test_database. { table_name } ' ) ;
2021-11-30 12:26:57 +00:00
result = instance . query ( ' SHOW CREATE DATABASE test_database ' )
assert ( result [ : 63 ] == " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( " )
assert ( result [ - 138 : ] == " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4 \\ ' \n " )
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
cursor . execute ( f ' drop table if exists postgresql_replica_0; ' )
2021-11-30 12:26:57 +00:00
2021-12-25 15:35:38 +00:00
# Removing from replication table which does not exist in PostgreSQL must be ok.
instance . query ( ' DETACH TABLE test_database.postgresql_replica_0 ' ) ;
assert instance . contains_in_log ( " from publication, because table does not exist in PostgreSQL " )
2021-11-30 12:26:57 +00:00
def test_predefined_connection_configuration ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
cursor . execute ( f ' DROP TABLE IF EXISTS test_table ' )
cursor . execute ( f ' CREATE TABLE test_table (key integer PRIMARY KEY, value integer) ' )
2021-12-02 13:08:11 +00:00
cursor . execute ( f ' INSERT INTO test_table SELECT 1, 2 ' )
2021-12-02 19:48:21 +00:00
instance . query ( " CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list= ' test_table ' " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , " test_table " ) ;
pg_manager . drop_materialized_db ( )
2021-11-30 12:26:57 +00:00
insert_counter = 0
def test_database_with_single_non_default_schema ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 5
schema_name = ' test_schema '
2021-12-25 15:35:38 +00:00
materialized_db = ' test_database '
2021-11-30 12:26:57 +00:00
clickhouse_postgres_db = ' postgres_database_with_schema '
global insert_counter
insert_counter = 0
def insert_into_tables ( ) :
global insert_counter
clickhouse_postgres_db = ' postgres_database_with_schema '
for i in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { i } '
instance . query ( f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) " )
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( ' SHOW TABLES FROM test_database ' )
assert ( result == expected )
print ( ' assert show tables Ok ' )
def check_all_tables_are_synchronized ( ) :
for i in range ( NUM_TABLES ) :
print ( ' checking table ' , i )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , f " postgresql_replica_ { i } " , postgres_database = clickhouse_postgres_db ) ;
2021-11-30 12:26:57 +00:00
print ( ' synchronization Ok ' )
create_postgres_schema ( cursor , schema_name )
2022-01-08 12:26:29 +00:00
pg_manager . create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port , name = clickhouse_postgres_db , schema_name = schema_name )
2021-11-30 12:26:57 +00:00
for i in range ( NUM_TABLES ) :
2022-01-08 12:26:29 +00:00
create_postgres_table_with_schema ( cursor , schema_name , f ' postgresql_replica_ { i } ' ) ;
2021-11-30 12:26:57 +00:00
insert_into_tables ( )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
2021-11-30 12:26:57 +00:00
settings = [ f " materialized_postgresql_schema = ' { schema_name } ' " , " materialized_postgresql_allow_automatic_update = 1 " ] )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
assert_show_tables ( " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
instance . restart_clickhouse ( )
check_all_tables_are_synchronized ( )
assert_show_tables ( " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n " )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
print ( ' ALTER ' )
altered_table = random . randint ( 0 , NUM_TABLES - 1 )
cursor . execute ( " ALTER TABLE test_schema.postgresql_replica_ {} ADD COLUMN value2 integer " . format ( altered_table ) )
instance . query ( f " INSERT INTO { clickhouse_postgres_db } .postgresql_replica_ { altered_table } SELECT number, number, number from numbers(5000, 1000) " )
2022-01-08 12:26:29 +00:00
assert_number_of_columns ( instance , 3 , f ' postgresql_replica_ { altered_table } ' )
check_tables_are_synchronized ( instance , f " postgresql_replica_ { altered_table } " , postgres_database = clickhouse_postgres_db ) ;
2021-12-25 15:35:38 +00:00
print ( ' DETACH-ATTACH ' )
detached_table_name = " postgresql_replica_1 "
instance . query ( f " DETACH TABLE { materialized_db } . { detached_table_name } " )
2021-12-27 06:51:29 +00:00
assert not instance . contains_in_log ( " from publication, because table does not exist in PostgreSQL " )
2021-12-25 15:35:38 +00:00
instance . query ( f " ATTACH TABLE { materialized_db } . { detached_table_name } " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , detached_table_name , postgres_database = clickhouse_postgres_db ) ;
2021-11-30 12:26:57 +00:00
def test_database_with_multiple_non_default_schemas_1 ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 5
schema_name = ' test_schema '
clickhouse_postgres_db = ' postgres_database_with_schema '
2021-12-25 15:35:38 +00:00
materialized_db = ' test_database '
2021-11-30 12:26:57 +00:00
publication_tables = ' '
global insert_counter
insert_counter = 0
def insert_into_tables ( ) :
global insert_counter
clickhouse_postgres_db = ' postgres_database_with_schema '
for i in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { i } '
instance . query ( f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) " )
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( ' SHOW TABLES FROM test_database ' )
assert ( result == expected )
print ( ' assert show tables Ok ' )
def check_all_tables_are_synchronized ( ) :
for i in range ( NUM_TABLES ) :
print ( ' checking table ' , i )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , " postgresql_replica_ {} " . format ( i ) , schema_name = schema_name , postgres_database = clickhouse_postgres_db ) ;
2021-11-30 12:26:57 +00:00
print ( ' synchronization Ok ' )
create_postgres_schema ( cursor , schema_name )
2022-01-08 12:26:29 +00:00
pg_manager . create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port , name = clickhouse_postgres_db , schema_name = schema_name )
2021-11-30 12:26:57 +00:00
for i in range ( NUM_TABLES ) :
table_name = ' postgresql_replica_ {} ' . format ( i )
create_postgres_table_with_schema ( cursor , schema_name , table_name ) ;
if publication_tables != ' ' :
publication_tables + = ' , '
publication_tables + = schema_name + ' . ' + table_name
insert_into_tables ( )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db ( ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
2021-11-30 12:26:57 +00:00
settings = [ f " materialized_postgresql_tables_list = ' { publication_tables } ' " , " materialized_postgresql_tables_list_with_schema=1 " , " materialized_postgresql_allow_automatic_update = 1 " ] )
check_all_tables_are_synchronized ( )
assert_show_tables ( " test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n " )
instance . restart_clickhouse ( )
check_all_tables_are_synchronized ( )
assert_show_tables ( " test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n " )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
print ( ' ALTER ' )
altered_table = random . randint ( 0 , NUM_TABLES - 1 )
cursor . execute ( " ALTER TABLE test_schema.postgresql_replica_ {} ADD COLUMN value2 integer " . format ( altered_table ) )
instance . query ( f " INSERT INTO { clickhouse_postgres_db } .postgresql_replica_ { altered_table } SELECT number, number, number from numbers(5000, 1000) " )
2022-01-08 12:26:29 +00:00
assert_number_of_columns ( instance , 3 , f ' { schema_name } .postgresql_replica_ { altered_table } ' )
check_tables_are_synchronized ( instance , f " postgresql_replica_ { altered_table } " , schema_name = schema_name , postgres_database = clickhouse_postgres_db ) ;
2021-12-25 15:35:38 +00:00
print ( ' DETACH-ATTACH ' )
detached_table_name = " postgresql_replica_1 "
instance . query ( f " DETACH TABLE { materialized_db } .` { schema_name } . { detached_table_name } ` " )
2021-12-27 06:51:29 +00:00
assert not instance . contains_in_log ( " from publication, because table does not exist in PostgreSQL " )
2021-12-25 15:35:38 +00:00
instance . query ( f " ATTACH TABLE { materialized_db } .` { schema_name } . { detached_table_name } ` " )
assert_show_tables ( " test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , detached_table_name , schema_name = schema_name , postgres_database = clickhouse_postgres_db ) ;
2021-11-30 12:26:57 +00:00
def test_database_with_multiple_non_default_schemas_2 ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 2
schemas_num = 2
schema_list = ' schema0, schema1 '
2021-12-25 15:35:38 +00:00
materialized_db = ' test_database '
2021-11-30 12:26:57 +00:00
global insert_counter
insert_counter = 0
def check_all_tables_are_synchronized ( ) :
for i in range ( schemas_num ) :
schema_name = f ' schema { i } '
clickhouse_postgres_db = f ' clickhouse_postgres_db { i } '
for ti in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { ti } '
print ( f ' checking table { schema_name } . { table_name } ' )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , f ' { table_name } ' , schema_name = schema_name , postgres_database = clickhouse_postgres_db ) ;
2021-11-30 12:26:57 +00:00
print ( ' synchronized Ok ' )
def insert_into_tables ( ) :
global insert_counter
for i in range ( schemas_num ) :
clickhouse_postgres_db = f ' clickhouse_postgres_db { i } '
for ti in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { ti } '
instance . query ( f ' INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) ' )
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( ' SHOW TABLES FROM test_database ' )
assert ( result == expected )
print ( ' assert show tables Ok ' )
for i in range ( schemas_num ) :
schema_name = f ' schema { i } '
clickhouse_postgres_db = f ' clickhouse_postgres_db { i } '
create_postgres_schema ( cursor , schema_name )
2022-01-08 12:26:29 +00:00
pg_manager . create_clickhouse_postgres_db ( ip = cluster . postgres_ip , port = cluster . postgres_port , name = clickhouse_postgres_db , schema_name = schema_name )
2021-11-30 12:26:57 +00:00
for ti in range ( NUM_TABLES ) :
table_name = f ' postgresql_replica_ { ti } '
create_postgres_table_with_schema ( cursor , schema_name , table_name ) ;
insert_into_tables ( )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
settings = [ f " materialized_postgresql_schema_list = ' { schema_list } ' " ,
" materialized_postgresql_allow_automatic_update = 1 " ] )
2021-11-30 12:26:57 +00:00
check_all_tables_are_synchronized ( )
insert_into_tables ( )
assert_show_tables ( " schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n " )
instance . restart_clickhouse ( )
assert_show_tables ( " schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n " )
check_all_tables_are_synchronized ( )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
print ( ' ALTER ' )
altered_schema = random . randint ( 0 , schemas_num - 1 )
altered_table = random . randint ( 0 , NUM_TABLES - 1 )
2021-12-25 15:35:38 +00:00
clickhouse_postgres_db = f ' clickhouse_postgres_db { altered_schema } '
2021-11-30 12:26:57 +00:00
cursor . execute ( f " ALTER TABLE schema { altered_schema } .postgresql_replica_ { altered_table } ADD COLUMN value2 integer " )
instance . query ( f " INSERT INTO clickhouse_postgres_db { altered_schema } .postgresql_replica_ { altered_table } SELECT number, number, number from numbers(1000 * { insert_counter } , 1000) " )
2022-01-08 12:26:29 +00:00
assert_number_of_columns ( instance , 3 , f ' schema { altered_schema } .postgresql_replica_ { altered_table } ' )
check_tables_are_synchronized ( instance , f " postgresql_replica_ { altered_table } " , schema_name = f " schema { altered_schema } " , postgres_database = clickhouse_postgres_db ) ;
2021-12-25 15:35:38 +00:00
print ( ' DETACH-ATTACH ' )
detached_table_name = " postgresql_replica_1 "
detached_table_schema = " schema0 "
clickhouse_postgres_db = f ' clickhouse_postgres_db0 '
instance . query ( f " DETACH TABLE { materialized_db } .` { detached_table_schema } . { detached_table_name } ` " )
2021-12-27 06:51:29 +00:00
assert not instance . contains_in_log ( " from publication, because table does not exist in PostgreSQL " )
2021-12-25 15:35:38 +00:00
instance . query ( f " ATTACH TABLE { materialized_db } .` { detached_table_schema } . { detached_table_name } ` " )
assert_show_tables ( " schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , f " postgresql_replica_ { altered_table } " , schema_name = detached_table_schema , postgres_database = clickhouse_postgres_db ) ;
2021-11-30 12:26:57 +00:00
2021-12-14 13:53:47 +00:00
def test_table_override ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-12-14 13:53:47 +00:00
table_name = ' table_override '
materialized_database = ' test_database '
create_postgres_table ( cursor , table_name , template = postgres_table_template_5 ) ;
instance . query ( f " create table { table_name } (key Int32, value UUID) engine = PostgreSQL (postgres1, table= { table_name } ) " )
instance . query ( f " insert into { table_name } select number, generateUUIDv4() from numbers(10) " )
table_overrides = f " TABLE OVERRIDE { table_name } (COLUMNS (key Int32, value UUID)) "
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
settings = [ f " materialized_postgresql_tables_list = ' { table_name } ' " ] ,
table_overrides = table_overrides )
assert_nested_table_is_created ( instance , table_name , materialized_database )
2021-12-14 13:53:47 +00:00
result = instance . query ( f " show create table { materialized_database } . { table_name } " )
print ( result )
expected = " CREATE TABLE test_database.table_override \\ n( \\ n `key` Int32, \\ n `value` UUID, \\ n `_sign` Int8() MATERIALIZED 1, \\ n `_version` UInt64() MATERIALIZED 1 \\ n) \\ nENGINE = ReplacingMergeTree(_version) \\ nORDER BY tuple(key) "
assert ( result . strip ( ) == expected )
time . sleep ( 5 )
2021-12-15 21:38:46 +00:00
query = f " select * from { materialized_database } . { table_name } order by key "
2021-12-14 13:53:47 +00:00
expected = instance . query ( f " select * from { table_name } order by key " )
2022-01-08 12:26:29 +00:00
instance . query ( f " drop table { table_name } no delay " )
2021-12-15 21:38:46 +00:00
assert_eq_with_retry ( instance , query , expected )
2021-12-14 13:53:47 +00:00
2021-12-26 17:49:33 +00:00
def test_table_schema_changes_2 ( started_cluster ) :
2022-01-09 18:56:24 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-12-26 17:49:33 +00:00
table_name = " test_table "
create_postgres_table ( cursor , table_name , template = postgres_table_template_2 ) ;
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, number, number, number from numbers(25) " )
2022-01-09 18:56:24 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip , port = started_cluster . postgres_port ,
settings = [ " materialized_postgresql_allow_automatic_update = 1, materialized_postgresql_tables_list= ' test_table ' " ] )
2021-12-26 17:49:33 +00:00
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, number, number, number from numbers(25, 25) " )
2022-01-09 18:56:24 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
2021-12-26 17:49:33 +00:00
cursor . execute ( f " ALTER TABLE { table_name } DROP COLUMN value1 " )
cursor . execute ( f " ALTER TABLE { table_name } DROP COLUMN value2 " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value1 Text " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value2 Text " )
cursor . execute ( f " ALTER TABLE { table_name } DROP COLUMN value3 " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value3 Text " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value4 Text " )
cursor . execute ( f " UPDATE { table_name } SET value3 = ' kek ' WHERE key%2=0 " )
2022-01-09 18:56:24 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
2021-12-26 17:49:33 +00:00
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(50, 25) " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value5 Integer " )
cursor . execute ( f " ALTER TABLE { table_name } DROP COLUMN value2 " )
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, toString(number), toString(number), toString(number), number from numbers(75, 25) " )
2022-01-09 18:56:24 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
2022-01-08 21:37:11 +00:00
instance . restart_clickhouse ( )
2022-01-09 18:56:24 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
2022-01-08 21:37:11 +00:00
cursor . execute ( f " ALTER TABLE { table_name } DROP COLUMN value5 " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value5 Text " )
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(100, 25) " )
2022-01-09 18:56:24 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
2022-01-08 21:37:11 +00:00
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value6 Text " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value7 Integer " )
cursor . execute ( f " ALTER TABLE { table_name } ADD COLUMN value8 Integer " )
cursor . execute ( f " ALTER TABLE { table_name } DROP COLUMN value5 " )
instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, toString(number), toString(number), toString(number), toString(number), number, number from numbers(125, 25) " )
2022-01-09 18:56:24 +00:00
check_tables_are_synchronized ( instance , table_name ) ;
2021-12-26 17:49:33 +00:00
2021-11-30 12:26:57 +00:00
if __name__ == ' __main__ ' :
cluster . start ( )
input ( " Cluster created, press any key to destroy... " )
cluster . shutdown ( )