2021-11-30 12:26:57 +00:00
import os . path as p
import random
import threading
2024-09-27 10:19:39 +00:00
import time
import uuid
from random import randrange
2021-11-30 12:26:57 +00:00
2024-09-27 10:19:39 +00:00
import psycopg2
import pytest
from psycopg2 . extensions import ISOLATION_LEVEL_AUTOCOMMIT
2022-01-08 12:26:29 +00:00
2024-09-27 10:19:39 +00:00
from helpers . cluster import ClickHouseCluster
2022-01-08 12:26:29 +00:00
from helpers . postgres_utility import (
2024-09-27 10:19:39 +00:00
PostgresManager ,
assert_nested_table_is_created ,
assert_number_of_columns ,
check_several_tables_are_synchronized ,
check_tables_are_synchronized ,
create_postgres_schema ,
create_postgres_table ,
2022-01-08 12:26:29 +00:00
create_postgres_table_with_schema ,
2024-09-27 10:19:39 +00:00
create_replication_slot ,
drop_postgres_schema ,
drop_postgres_table ,
2022-01-08 12:26:29 +00:00
drop_postgres_table_with_schema ,
2024-09-27 10:19:39 +00:00
drop_replication_slot ,
get_postgres_conn ,
2022-01-08 12:26:29 +00:00
postgres_table_template ,
postgres_table_template_2 ,
postgres_table_template_3 ,
postgres_table_template_4 ,
postgres_table_template_5 ,
2023-12-06 22:12:35 +00:00
postgres_table_template_6 ,
2024-09-27 10:19:39 +00:00
queries ,
2022-03-22 16:39:58 +00:00
)
2024-09-27 10:19:39 +00:00
from helpers . test_tools import TSV , assert_eq_with_retry
2022-01-08 12:26:29 +00:00
2021-11-30 12:26:57 +00:00
cluster = ClickHouseCluster ( __file__ )
instance = cluster . add_instance (
" instance " ,
main_configs = [ " configs/log_conf.xml " ] ,
user_configs = [ " configs/users.xml " ] ,
with_postgres = True ,
stay_alive = True ,
)
2023-05-09 14:10:53 +00:00
instance2 = cluster . add_instance (
" instance2 " ,
main_configs = [ " configs/log_conf.xml " , " configs/merge_tree_too_many_parts.xml " ] ,
user_configs = [ " configs/users.xml " ] ,
with_postgres = True ,
stay_alive = True ,
)
2022-01-08 12:26:29 +00:00
pg_manager = PostgresManager ( )
2023-05-09 14:10:53 +00:00
pg_manager2 = PostgresManager ( )
2023-10-16 11:01:36 +00:00
pg_manager_instance2 = PostgresManager ( )
2021-11-30 12:26:57 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
2023-05-09 14:10:53 +00:00
pg_manager . init (
instance ,
cluster . postgres_ip ,
cluster . postgres_port ,
2023-05-11 16:09:46 +00:00
default_database = " postgres_database " ,
2023-05-09 14:10:53 +00:00
)
2023-10-16 11:01:36 +00:00
pg_manager_instance2 . init (
instance2 ,
cluster . postgres_ip ,
cluster . postgres_port ,
default_database = " postgres_database " ,
postgres_db_exists = True ,
)
2023-05-09 14:10:53 +00:00
pg_manager2 . init (
2023-05-11 16:09:46 +00:00
instance2 , cluster . postgres_ip , cluster . postgres_port , " postgres_database2 "
2023-05-09 14:10:53 +00:00
)
2023-10-16 11:01:36 +00:00
2021-11-30 12:26:57 +00:00
yield cluster
finally :
cluster . shutdown ( )
2022-01-08 12:26:29 +00:00
@pytest.fixture ( autouse = True )
def setup_teardown ( ) :
print ( " PostgreSQL is available - running test " )
yield # run test
pg_manager . restart ( )
2021-11-30 12:26:57 +00:00
def test_add_new_table_to_replication ( started_cluster ) :
NUM_TABLES = 5
2023-05-11 16:09:46 +00:00
pg_manager . create_and_fill_postgres_tables ( NUM_TABLES , 10000 )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip , port = started_cluster . postgres_port
)
check_several_tables_are_synchronized ( instance , NUM_TABLES )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert (
result
== " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
)
table_name = " postgresql_replica_5 "
2023-05-11 16:09:46 +00:00
pg_manager . create_and_fill_postgres_table ( table_name )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
) # Check without ip
2023-02-16 16:22:29 +00:00
assert result [ - 51 : ] == " \\ ' postgres_database \\ ' , \\ ' postgres \\ ' , \\ ' [HIDDEN] \\ ' ) \n "
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query_and_get_error (
" ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list= ' tabl1 ' "
)
assert (
" Changing setting `materialized_postgresql_tables_list` is not allowed "
in result
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query_and_get_error (
" ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables= ' tabl1 ' "
)
assert " Database engine MaterializedPostgreSQL does not support setting " in result
2022-03-22 16:39:58 +00:00
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert (
result
== " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n postgresql_replica_5 \n "
)
2022-03-22 16:39:58 +00:00
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , table_name )
instance . query (
f " INSERT INTO postgres_database. { table_name } SELECT number, number from numbers(10000, 10000) "
)
check_tables_are_synchronized ( instance , table_name )
2022-03-22 16:39:58 +00:00
2022-01-08 12:26:29 +00:00
result = instance . query_and_get_error ( f " ATTACH TABLE test_database. { table_name } " )
2021-11-30 12:26:57 +00:00
assert " Table test_database.postgresql_replica_5 already exists " in result
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query_and_get_error ( " ATTACH TABLE test_database.unknown_table " )
assert " PostgreSQL table unknown_table does not exist " in result
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
)
assert (
result [ - 180 : ]
== " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5 \\ ' \n "
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
table_name = " postgresql_replica_6 "
2023-05-11 16:09:46 +00:00
pg_manager . create_postgres_table ( table_name )
2021-11-30 12:26:57 +00:00
instance . query (
" INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format (
table_name
)
2022-03-22 16:39:58 +00:00
)
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " )
2021-11-30 12:26:57 +00:00
instance . restart_clickhouse ( )
table_name = " postgresql_replica_7 "
2023-05-11 16:09:46 +00:00
pg_manager . create_postgres_table ( table_name )
2021-11-30 12:26:57 +00:00
instance . query (
" INSERT INTO postgres_database. {} SELECT number, number from numbers(10000) " . format (
table_name
)
2022-03-22 16:39:58 +00:00
)
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " )
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
)
assert (
result [ - 222 : ]
== " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7 \\ ' \n "
)
2022-03-22 16:39:58 +00:00
2022-03-09 16:50:02 +00:00
instance . query (
f " INSERT INTO postgres_database. { table_name } SELECT number, number from numbers(10000, 10000) "
)
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert (
result
== " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n postgresql_replica_5 \n postgresql_replica_6 \n postgresql_replica_7 \n "
)
2022-01-08 12:26:29 +00:00
check_several_tables_are_synchronized ( instance , NUM_TABLES + 3 )
2021-11-30 12:26:57 +00:00
def test_remove_table_from_replication ( started_cluster ) :
NUM_TABLES = 5
2022-01-08 12:26:29 +00:00
pg_manager . create_and_fill_postgres_tables ( NUM_TABLES , 10000 )
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip , port = started_cluster . postgres_port
)
check_several_tables_are_synchronized ( instance , NUM_TABLES )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert (
result
== " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
)
2023-02-16 16:22:29 +00:00
assert result [ - 51 : ] == " \\ ' postgres_database \\ ' , \\ ' postgres \\ ' , \\ ' [HIDDEN] \\ ' ) \n "
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
table_name = " postgresql_replica_4 "
2022-03-09 16:50:02 +00:00
instance . query ( f " DETACH TABLE test_database. { table_name } PERMANENTLY " )
2022-01-08 12:26:29 +00:00
result = instance . query_and_get_error ( f " SELECT * FROM test_database. { table_name } " )
2023-03-13 13:59:00 +00:00
assert " UNKNOWN_TABLE " in result
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW TABLES FROM test_database " )
assert (
result
== " postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n "
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
)
assert (
result [ - 138 : ]
== " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3 \\ ' \n "
2022-03-22 16:39:58 +00:00
)
2022-01-08 12:26:29 +00:00
instance . query ( f " ATTACH TABLE test_database. { table_name } " )
check_tables_are_synchronized ( instance , table_name )
check_several_tables_are_synchronized ( instance , NUM_TABLES )
2022-03-09 16:50:02 +00:00
instance . query (
f " INSERT INTO postgres_database. { table_name } SELECT number, number from numbers(10000, 10000) "
)
check_tables_are_synchronized ( instance , table_name )
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
)
assert (
result [ - 159 : ]
== " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4 \\ ' \n "
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
table_name = " postgresql_replica_1 "
2022-03-09 16:50:02 +00:00
instance . query ( f " DETACH TABLE test_database. { table_name } PERMANENTLY " )
2021-11-30 12:26:57 +00:00
result = instance . query ( " SHOW CREATE DATABASE test_database " )
assert (
result [ : 63 ]
== " CREATE DATABASE test_database \\ nENGINE = MaterializedPostgreSQL( "
)
assert (
result [ - 138 : ]
== " ) \\ nSETTINGS materialized_postgresql_tables_list = \\ ' postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4 \\ ' \n "
)
2023-05-11 16:09:46 +00:00
pg_manager . execute ( f " drop table if exists postgresql_replica_0; " )
2021-11-30 12:26:57 +00:00
2021-12-25 15:35:38 +00:00
# Removing from replication table which does not exist in PostgreSQL must be ok.
2022-03-09 16:50:02 +00:00
instance . query ( " DETACH TABLE test_database.postgresql_replica_0 PERMANENTLY " )
2021-12-25 15:35:38 +00:00
assert instance . contains_in_log (
" from publication, because table does not exist in PostgreSQL "
)
2021-11-30 12:26:57 +00:00
def test_predefined_connection_configuration ( started_cluster ) :
2023-05-11 16:09:46 +00:00
pg_manager . execute ( f " DROP TABLE IF EXISTS test_table " )
pg_manager . execute (
f " CREATE TABLE test_table (key integer PRIMARY KEY, value integer) "
)
pg_manager . execute ( f " INSERT INTO test_table SELECT 1, 2 " )
2021-12-02 19:48:21 +00:00
instance . query (
" CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list= ' test_table ' "
)
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized ( instance , " test_table " )
pg_manager . drop_materialized_db ( )
2021-11-30 12:26:57 +00:00
insert_counter = 0
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
def test_database_with_single_non_default_schema ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 5
schema_name = " test_schema "
2021-12-25 15:35:38 +00:00
materialized_db = " test_database "
2021-11-30 12:26:57 +00:00
clickhouse_postgres_db = " postgres_database_with_schema "
global insert_counter
insert_counter = 0
def insert_into_tables ( ) :
global insert_counter
clickhouse_postgres_db = " postgres_database_with_schema "
for i in range ( NUM_TABLES ) :
table_name = f " postgresql_replica_ { i } "
instance . query (
f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) "
)
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( " SHOW TABLES FROM test_database " )
assert result == expected
print ( " assert show tables Ok " )
def check_all_tables_are_synchronized ( ) :
for i in range ( NUM_TABLES ) :
print ( " checking table " , i )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized (
instance ,
f " postgresql_replica_ { i } " ,
postgres_database = clickhouse_postgres_db ,
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
print ( " synchronization Ok " )
create_postgres_schema ( cursor , schema_name )
2022-01-08 12:26:29 +00:00
pg_manager . create_clickhouse_postgres_db (
2023-05-11 16:09:46 +00:00
database_name = clickhouse_postgres_db ,
2022-01-08 12:26:29 +00:00
schema_name = schema_name ,
2023-05-11 16:09:46 +00:00
postgres_database = " postgres_database " ,
2022-01-08 12:26:29 +00:00
)
2021-11-30 12:26:57 +00:00
for i in range ( NUM_TABLES ) :
2022-01-08 12:26:29 +00:00
create_postgres_table_with_schema (
cursor , schema_name , f " postgresql_replica_ { i } "
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
insert_into_tables ( )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
2021-11-30 12:26:57 +00:00
settings = [
f " materialized_postgresql_schema = ' { schema_name } ' " ,
2022-03-22 16:39:58 +00:00
] ,
2021-11-30 12:26:57 +00:00
)
insert_into_tables ( )
check_all_tables_are_synchronized ( )
assert_show_tables (
" postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
)
instance . restart_clickhouse ( )
check_all_tables_are_synchronized ( )
assert_show_tables (
" postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
)
insert_into_tables ( )
check_all_tables_are_synchronized ( )
altered_table = random . randint ( 0 , NUM_TABLES - 1 )
2023-05-11 16:09:46 +00:00
pg_manager . execute (
2021-11-30 12:26:57 +00:00
" ALTER TABLE test_schema.postgresql_replica_ {} ADD COLUMN value2 integer " . format (
altered_table
)
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
instance . query (
f " INSERT INTO { clickhouse_postgres_db } .postgresql_replica_ { altered_table } SELECT number, number, number from numbers(5000, 1000) "
2022-03-22 16:39:58 +00:00
)
2023-02-16 16:22:29 +00:00
assert instance . wait_for_log_line (
f " Table postgresql_replica_ { altered_table } is skipped from replication stream "
)
instance . query (
f " DETACH TABLE test_database.postgresql_replica_ { altered_table } PERMANENTLY "
)
2021-12-27 06:51:29 +00:00
assert not instance . contains_in_log (
" from publication, because table does not exist in PostgreSQL "
)
2023-02-16 15:41:31 +00:00
instance . query ( f " ATTACH TABLE test_database.postgresql_replica_ { altered_table } " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized (
2023-02-16 15:41:31 +00:00
instance ,
f " postgresql_replica_ { altered_table } " ,
postgres_database = clickhouse_postgres_db ,
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
def test_database_with_multiple_non_default_schemas_1 ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 5
schema_name = " test_schema "
clickhouse_postgres_db = " postgres_database_with_schema "
2021-12-25 15:35:38 +00:00
materialized_db = " test_database "
2021-11-30 12:26:57 +00:00
publication_tables = " "
global insert_counter
insert_counter = 0
def insert_into_tables ( ) :
global insert_counter
clickhouse_postgres_db = " postgres_database_with_schema "
for i in range ( NUM_TABLES ) :
table_name = f " postgresql_replica_ { i } "
instance . query (
f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) "
)
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( " SHOW TABLES FROM test_database " )
assert result == expected
print ( " assert show tables Ok " )
def check_all_tables_are_synchronized ( ) :
for i in range ( NUM_TABLES ) :
print ( " checking table " , i )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized (
instance ,
" postgresql_replica_ {} " . format ( i ) ,
schema_name = schema_name ,
postgres_database = clickhouse_postgres_db ,
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
print ( " synchronization Ok " )
create_postgres_schema ( cursor , schema_name )
2022-01-08 12:26:29 +00:00
pg_manager . create_clickhouse_postgres_db (
2023-05-11 16:09:46 +00:00
database_name = clickhouse_postgres_db ,
2022-01-08 12:26:29 +00:00
schema_name = schema_name ,
2023-05-11 16:09:46 +00:00
postgres_database = " postgres_database " ,
2022-01-08 12:26:29 +00:00
)
2021-11-30 12:26:57 +00:00
for i in range ( NUM_TABLES ) :
table_name = " postgresql_replica_ {} " . format ( i )
create_postgres_table_with_schema ( cursor , schema_name , table_name )
if publication_tables != " " :
publication_tables + = " , "
publication_tables + = schema_name + " . " + table_name
insert_into_tables ( )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
2021-11-30 12:26:57 +00:00
settings = [
f " materialized_postgresql_tables_list = ' { publication_tables } ' " ,
" materialized_postgresql_tables_list_with_schema=1 " ,
2022-03-22 16:39:58 +00:00
] ,
2021-11-30 12:26:57 +00:00
)
check_all_tables_are_synchronized ( )
assert_show_tables (
" test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n "
)
instance . restart_clickhouse ( )
check_all_tables_are_synchronized ( )
assert_show_tables (
" test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n "
)
insert_into_tables ( )
check_all_tables_are_synchronized ( )
altered_table = random . randint ( 0 , NUM_TABLES - 1 )
2023-05-11 16:09:46 +00:00
pg_manager . execute (
2021-11-30 12:26:57 +00:00
" ALTER TABLE test_schema.postgresql_replica_ {} ADD COLUMN value2 integer " . format (
altered_table
)
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
instance . query (
f " INSERT INTO { clickhouse_postgres_db } .postgresql_replica_ { altered_table } SELECT number, number, number from numbers(5000, 1000) "
2022-01-08 12:26:29 +00:00
)
2022-03-22 16:39:58 +00:00
2023-02-16 16:22:29 +00:00
assert instance . wait_for_log_line (
f " Table test_schema.postgresql_replica_ { altered_table } is skipped from replication stream "
)
altered_materialized_table = (
f " { materialized_db } .`test_schema.postgresql_replica_ { altered_table } ` "
)
2023-02-16 15:41:31 +00:00
instance . query ( f " DETACH TABLE { altered_materialized_table } PERMANENTLY " )
2021-12-27 06:51:29 +00:00
assert not instance . contains_in_log (
" from publication, because table does not exist in PostgreSQL "
)
2023-02-16 15:41:31 +00:00
instance . query ( f " ATTACH TABLE { altered_materialized_table } " )
2021-12-25 15:35:38 +00:00
assert_show_tables (
" test_schema.postgresql_replica_0 \n test_schema.postgresql_replica_1 \n test_schema.postgresql_replica_2 \n test_schema.postgresql_replica_3 \n test_schema.postgresql_replica_4 \n "
)
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized (
instance ,
2023-02-16 15:41:31 +00:00
f " postgresql_replica_ { altered_table } " ,
2022-01-08 12:26:29 +00:00
schema_name = schema_name ,
postgres_database = clickhouse_postgres_db ,
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
def test_database_with_multiple_non_default_schemas_2 ( started_cluster ) :
2022-01-08 12:26:29 +00:00
cursor = pg_manager . get_db_cursor ( )
2021-11-30 12:26:57 +00:00
NUM_TABLES = 2
schemas_num = 2
schema_list = " schema0, schema1 "
2021-12-25 15:35:38 +00:00
materialized_db = " test_database "
2021-11-30 12:26:57 +00:00
global insert_counter
insert_counter = 0
def check_all_tables_are_synchronized ( ) :
for i in range ( schemas_num ) :
schema_name = f " schema { i } "
clickhouse_postgres_db = f " clickhouse_postgres_db { i } "
for ti in range ( NUM_TABLES ) :
table_name = f " postgresql_replica_ { ti } "
print ( f " checking table { schema_name } . { table_name } " )
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized (
instance ,
f " { table_name } " ,
schema_name = schema_name ,
postgres_database = clickhouse_postgres_db ,
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
print ( " synchronized Ok " )
def insert_into_tables ( ) :
global insert_counter
for i in range ( schemas_num ) :
clickhouse_postgres_db = f " clickhouse_postgres_db { i } "
for ti in range ( NUM_TABLES ) :
table_name = f " postgresql_replica_ { ti } "
instance . query (
f " INSERT INTO { clickhouse_postgres_db } . { table_name } SELECT number, number from numbers(1000 * { insert_counter } , 1000) "
)
insert_counter + = 1
def assert_show_tables ( expected ) :
result = instance . query ( " SHOW TABLES FROM test_database " )
assert result == expected
print ( " assert show tables Ok " )
for i in range ( schemas_num ) :
schema_name = f " schema { i } "
clickhouse_postgres_db = f " clickhouse_postgres_db { i } "
create_postgres_schema ( cursor , schema_name )
2022-01-08 12:26:29 +00:00
pg_manager . create_clickhouse_postgres_db (
2023-07-18 10:20:56 +00:00
database_name = clickhouse_postgres_db ,
schema_name = schema_name ,
postgres_database = " postgres_database " ,
2022-01-08 12:26:29 +00:00
)
2021-11-30 12:26:57 +00:00
for ti in range ( NUM_TABLES ) :
table_name = f " postgresql_replica_ { ti } "
create_postgres_table_with_schema ( cursor , schema_name , table_name )
insert_into_tables ( )
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_schema_list = ' { schema_list } ' " ,
2022-03-22 16:39:58 +00:00
] ,
2022-01-08 12:26:29 +00:00
)
2021-11-30 12:26:57 +00:00
check_all_tables_are_synchronized ( )
insert_into_tables ( )
assert_show_tables (
" schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n "
)
instance . restart_clickhouse ( )
assert_show_tables (
" schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n "
)
check_all_tables_are_synchronized ( )
insert_into_tables ( )
check_all_tables_are_synchronized ( )
print ( " ALTER " )
altered_schema = random . randint ( 0 , schemas_num - 1 )
altered_table = random . randint ( 0 , NUM_TABLES - 1 )
2021-12-25 15:35:38 +00:00
clickhouse_postgres_db = f " clickhouse_postgres_db { altered_schema } "
2023-05-11 16:09:46 +00:00
pg_manager . execute (
2021-11-30 12:26:57 +00:00
f " ALTER TABLE schema { altered_schema } .postgresql_replica_ { altered_table } ADD COLUMN value2 integer "
)
2022-03-22 16:39:58 +00:00
2021-11-30 12:26:57 +00:00
instance . query (
f " INSERT INTO clickhouse_postgres_db { altered_schema } .postgresql_replica_ { altered_table } SELECT number, number, number from numbers(1000 * { insert_counter } , 1000) "
)
2022-03-22 16:39:58 +00:00
2023-02-16 16:22:29 +00:00
assert instance . wait_for_log_line (
f " Table schema { altered_schema } .postgresql_replica_ { altered_table } is skipped from replication stream "
)
2023-02-16 15:41:31 +00:00
2023-02-16 16:22:29 +00:00
altered_materialized_table = (
f " { materialized_db } .`schema { altered_schema } .postgresql_replica_ { altered_table } ` "
)
2023-02-16 15:41:31 +00:00
instance . query ( f " DETACH TABLE { altered_materialized_table } PERMANENTLY " )
2021-12-27 06:51:29 +00:00
assert not instance . contains_in_log (
" from publication, because table does not exist in PostgreSQL "
)
2023-02-16 15:41:31 +00:00
instance . query ( f " ATTACH TABLE { altered_materialized_table } " )
2021-12-25 15:35:38 +00:00
assert_show_tables (
" schema0.postgresql_replica_0 \n schema0.postgresql_replica_1 \n schema1.postgresql_replica_0 \n schema1.postgresql_replica_1 \n "
)
2022-01-08 12:26:29 +00:00
check_tables_are_synchronized (
instance ,
f " postgresql_replica_ { altered_table } " ,
2023-02-16 15:41:31 +00:00
schema_name = f " schema { altered_schema } " ,
2022-01-08 12:26:29 +00:00
postgres_database = clickhouse_postgres_db ,
2022-03-22 16:39:58 +00:00
)
2021-11-30 12:26:57 +00:00
2021-12-14 13:53:47 +00:00
def test_table_override ( started_cluster ) :
table_name = " table_override "
materialized_database = " test_database "
2023-12-06 22:12:35 +00:00
pg_manager . create_postgres_table ( table_name , template = postgres_table_template_6 )
2021-12-14 13:53:47 +00:00
instance . query (
2023-12-06 22:12:35 +00:00
f " insert into postgres_database. { table_name } select number, ' test ' from numbers(10) "
2021-12-14 13:53:47 +00:00
)
2023-12-06 22:12:35 +00:00
table_overrides = f " TABLE OVERRIDE { table_name } (COLUMNS (key Int32, value String) PARTITION BY key) "
2022-01-08 12:26:29 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [ f " materialized_postgresql_tables_list = ' { table_name } ' " ] ,
2023-12-06 22:12:35 +00:00
materialized_database = materialized_database ,
2022-01-08 12:26:29 +00:00
table_overrides = table_overrides ,
)
2023-12-06 22:12:35 +00:00
check_tables_are_synchronized (
instance , table_name , postgres_database = pg_manager . get_default_database ( )
)
2023-12-07 10:11:44 +00:00
assert 10 == int (
instance . query ( f " SELECT count() FROM { materialized_database } . { table_name } " )
)
2023-12-06 22:12:35 +00:00
2024-07-24 12:29:14 +00:00
expected = " CREATE TABLE test_database.table_override \\ n( \\ n `key` Int32, \\ n `value` String, \\ n `_sign` Int8 MATERIALIZED 1, \\ n `_version` UInt64 MATERIALIZED 1 \\ n) \\ nENGINE = ReplacingMergeTree(_version) \\ nPARTITION BY key \\ nORDER BY tuple(key) "
2023-12-07 10:11:44 +00:00
assert (
expected
== instance . query (
f " show create table { materialized_database } . { table_name } "
) . strip ( )
)
2023-12-06 22:12:35 +00:00
2023-12-07 10:11:44 +00:00
assert (
" test "
== instance . query (
f " SELECT value FROM { materialized_database } . { table_name } WHERE key = 2 "
) . strip ( )
)
2023-12-06 22:12:35 +00:00
conn = get_postgres_conn (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
database_name = " postgres_database " ,
database = True ,
auto_commit = True ,
)
cursor = conn . cursor ( )
cursor . execute ( f " SELECT count(*) FROM { table_name } " )
assert 10 == cursor . fetchall ( ) [ 0 ] [ 0 ]
pg_manager . execute ( f " UPDATE { table_name } SET value= ' kek ' WHERE key=2 " )
cursor . execute ( f " SELECT value FROM { table_name } WHERE key=2 " )
assert " kek " == cursor . fetchall ( ) [ 0 ] [ 0 ]
pg_manager . execute ( f " DELETE FROM { table_name } WHERE key=2 " )
cursor . execute ( f " SELECT count(*) FROM { table_name } " )
assert 9 == cursor . fetchall ( ) [ 0 ] [ 0 ]
conn . close ( )
check_tables_are_synchronized (
instance , table_name , postgres_database = pg_manager . get_default_database ( )
)
2023-12-07 10:11:44 +00:00
assert (
" "
== instance . query (
f " SELECT value FROM { materialized_database } . { table_name } WHERE key = 2 "
) . strip ( )
)
2021-12-14 13:53:47 +00:00
2023-02-27 21:29:16 +00:00
def test_materialized_view ( started_cluster ) :
2023-05-11 16:09:46 +00:00
pg_manager . execute ( f " DROP TABLE IF EXISTS test_table " )
pg_manager . execute (
f " CREATE TABLE test_table (key integer PRIMARY KEY, value integer) "
)
pg_manager . execute ( f " INSERT INTO test_table SELECT 1, 2 " )
2023-02-27 21:29:16 +00:00
instance . query ( " DROP DATABASE IF EXISTS test_database " )
instance . query (
" CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list= ' test_table ' "
)
check_tables_are_synchronized ( instance , " test_table " )
instance . query ( " DROP TABLE IF EXISTS mv " )
instance . query (
" CREATE MATERIALIZED VIEW mv ENGINE=MergeTree ORDER BY tuple() POPULATE AS SELECT * FROM test_database.test_table "
)
assert " 1 \t 2 " == instance . query ( " SELECT * FROM mv " ) . strip ( )
2023-05-11 16:09:46 +00:00
pg_manager . execute ( f " INSERT INTO test_table SELECT 3, 4 " )
2023-02-27 21:29:16 +00:00
check_tables_are_synchronized ( instance , " test_table " )
2023-02-28 10:32:33 +00:00
assert " 1 \t 2 \n 3 \t 4 " == instance . query ( " SELECT * FROM mv ORDER BY 1, 2 " ) . strip ( )
2024-02-28 00:00:17 +00:00
instance . query ( " DROP VIEW mv " )
2023-02-27 21:29:16 +00:00
pg_manager . drop_materialized_db ( )
2023-05-09 14:10:53 +00:00
def test_too_many_parts ( started_cluster ) :
table = " test_table "
pg_manager2 . create_and_fill_postgres_table ( table )
pg_manager2 . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' test_table ' , materialized_postgresql_backoff_min_ms = 100, materialized_postgresql_backoff_max_ms = 100 "
] ,
)
check_tables_are_synchronized (
instance2 , " test_table " , postgres_database = pg_manager2 . get_default_database ( )
)
assert (
" 50 " == instance2 . query ( " SELECT count() FROM test_database.test_table " ) . strip ( )
)
instance2 . query ( " SYSTEM STOP MERGES " )
num = 50
for i in range ( 10 ) :
instance2 . query (
f """
INSERT INTO { pg_manager2 . get_default_database ( ) } . test_table SELECT { num } , { num } ;
"""
)
num = num + 1
for i in range ( 30 ) :
if num == int (
instance2 . query ( " SELECT count() FROM test_database.test_table " )
) or instance2 . contains_in_log ( " DB::Exception: Too many parts " ) :
break
time . sleep ( 1 )
print ( f " wait sync try { i } " )
2023-07-17 13:04:39 +00:00
instance2 . query ( " SYSTEM FLUSH LOGS " )
2023-05-09 14:10:53 +00:00
if instance2 . contains_in_log ( " DB::Exception: Too many parts " ) :
break
assert num == int (
instance2 . query ( " SELECT count() FROM test_database.test_table " )
2023-07-18 14:54:33 +00:00
) or num - 1 == int (
instance2 . query ( " SELECT count() FROM test_database.test_table " )
)
2023-05-09 14:10:53 +00:00
assert instance2 . contains_in_log ( " DB::Exception: Too many parts " )
print ( num )
2023-07-18 10:20:56 +00:00
assert num == int (
instance2 . query ( " SELECT count() FROM test_database.test_table " )
) or num - 1 == int ( instance2 . query ( " SELECT count() FROM test_database.test_table " ) )
2023-05-09 14:10:53 +00:00
instance2 . query ( " SYSTEM START MERGES " )
check_tables_are_synchronized (
instance2 , " test_table " , postgres_database = pg_manager2 . get_default_database ( )
)
# assert "200" == instance.query("SELECT count FROM test_database.test_table").strip()
pg_manager2 . drop_materialized_db ( )
2023-10-15 12:19:44 +00:00
def test_toast ( started_cluster ) :
table = " test_toast "
2023-10-16 11:01:36 +00:00
pg_manager . create_postgres_table (
table ,
" " ,
""" CREATE TABLE " {} " (id integer PRIMARY KEY, txt text, other text) """ ,
2023-10-15 12:19:44 +00:00
)
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
pg_manager . execute (
f """ \
INSERT INTO { table } ( id , txt ) \
VALUES ( 1 , ( SELECT array_to_string ( ARRAY ( SELECT chr ( ( 100 + round ( random ( ) * 25 ) ) : : integer ) FROM generate_series ( 1 , 30000 ) as t ( i ) ) , ' ' ) ) )
"""
)
check_tables_are_synchronized (
instance ,
table ,
postgres_database = pg_manager . get_default_database ( ) ,
order_by = " id " ,
)
2023-10-18 11:01:35 +00:00
2023-10-12 14:32:56 +00:00
def test_replica_consumer ( started_cluster ) :
table = " test_replica_consumer "
2023-10-17 09:35:15 +00:00
pg_manager_instance2 . restart ( )
2023-10-12 14:32:56 +00:00
2023-10-18 10:08:47 +00:00
pg_manager . create_postgres_table ( table )
instance . query (
f " INSERT INTO postgres_database. { table } SELECT number, number from numbers(0, 50) "
)
2023-10-16 11:01:36 +00:00
for pm in [ pg_manager , pg_manager_instance2 ] :
2023-10-12 14:32:56 +00:00
pm . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
2023-10-12 17:47:39 +00:00
" materialized_postgresql_use_unique_replication_consumer_identifier = 1 " ,
2023-10-12 14:32:56 +00:00
] ,
)
2023-10-15 12:19:44 +00:00
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
check_tables_are_synchronized (
2023-10-16 11:01:36 +00:00
instance2 , table , postgres_database = pg_manager_instance2 . get_default_database ( )
2023-10-15 12:19:44 +00:00
)
2023-10-12 14:32:56 +00:00
assert 50 == int ( instance . query ( f " SELECT count() FROM test_database. { table } " ) )
assert 50 == int ( instance2 . query ( f " SELECT count() FROM test_database. { table } " ) )
2023-10-12 17:47:39 +00:00
instance . query (
f " INSERT INTO postgres_database. { table } SELECT number, number from numbers(1000, 1000) "
)
2023-10-12 14:32:56 +00:00
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
check_tables_are_synchronized (
2023-10-16 11:01:36 +00:00
instance2 , table , postgres_database = pg_manager_instance2 . get_default_database ( )
2023-10-12 14:32:56 +00:00
)
assert 1050 == int ( instance . query ( f " SELECT count() FROM test_database. { table } " ) )
assert 1050 == int ( instance2 . query ( f " SELECT count() FROM test_database. { table } " ) )
2023-10-16 11:01:36 +00:00
for pm in [ pg_manager , pg_manager_instance2 ] :
2023-10-15 16:12:01 +00:00
pm . drop_materialized_db ( )
2023-10-16 11:01:36 +00:00
pg_manager_instance2 . clear ( )
2023-10-12 14:32:56 +00:00
2023-12-05 12:22:35 +00:00
def test_bad_connection_options ( started_cluster ) :
table = " test_bad_connection_options "
pg_manager . create_postgres_table ( table )
instance . query (
f " INSERT INTO postgres_database. { table } SELECT number, number from numbers(0, 50) "
)
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
user = " postrges " ,
password = " kek " ,
)
instance . wait_for_log_line ( ' role " postrges " does not exist ' )
2023-12-05 12:40:11 +00:00
assert instance . contains_in_log (
" <Error> void DB::DatabaseMaterializedPostgreSQL::startSynchronization(): std::exception. Code: 1001, type: pqxx::broken_connection "
)
2023-12-05 12:22:35 +00:00
assert " test_database " in instance . query ( " SHOW DATABASES " )
assert " " == instance . query ( " show tables from test_database " ) . strip ( )
2023-12-05 17:40:56 +00:00
pg_manager . drop_materialized_db ( " test_database " )
2023-12-05 12:22:35 +00:00
2023-12-06 18:34:52 +00:00
def test_failed_load_from_snapshot ( started_cluster ) :
2023-12-07 11:43:52 +00:00
if instance . is_built_with_sanitizer ( ) or instance . is_debug_build ( ) :
2023-12-07 11:45:15 +00:00
pytest . skip (
" Sanitizers and debug mode are skipped, because this test thrown logical error "
)
2023-12-07 11:43:52 +00:00
2023-12-06 18:34:52 +00:00
table = " failed_load "
pg_manager . create_postgres_table (
table ,
template = """
CREATE TABLE IF NOT EXISTS " {} " (
key text NOT NULL , value text [ ] , PRIMARY KEY ( key ) )
""" ,
)
instance . query (
f " INSERT INTO postgres_database. { table } SELECT number, [1, 2] from numbers(0, 1000000) "
)
# Create a table with wrong table structure
2023-12-07 11:45:15 +00:00
assert " Could not convert string to i " in instance . query_and_get_error (
f """
2023-12-06 18:34:52 +00:00
SET allow_experimental_materialized_postgresql_table = 1 ;
CREATE TABLE { table } ( a Int32 , b Int32 ) ENGINE = MaterializedPostgreSQL ( ' {started_cluster.postgres_ip} : {started_cluster.postgres_port} ' , ' postgres_database ' , ' {table} ' , ' postgres ' , ' mysecretpassword ' ) ORDER BY a
"""
)
2023-12-06 19:23:09 +00:00
def test_symbols_in_publication_name ( started_cluster ) :
2024-08-19 13:58:44 +00:00
id = uuid . uuid4 ( )
2024-08-22 19:05:07 +00:00
db = f " test_ { id } "
2024-08-19 13:58:44 +00:00
table = f " test_symbols_in_publication_name "
pg_manager3 = PostgresManager ( )
pg_manager3 . init (
instance ,
cluster . postgres_ip ,
cluster . postgres_port ,
default_database = db ,
)
2023-12-06 19:23:09 +00:00
pg_manager3 . create_postgres_table ( table )
instance . query (
2024-08-19 13:58:44 +00:00
f " INSERT INTO ` { db } `.` { table } ` SELECT number, number from numbers(0, 50) "
2023-12-06 19:23:09 +00:00
)
pg_manager3 . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
2024-08-19 13:58:44 +00:00
materialized_database = db ,
2023-12-06 19:23:09 +00:00
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
check_tables_are_synchronized (
2024-08-19 13:58:44 +00:00
instance , table , materialized_database = db , postgres_database = db
2023-12-08 10:28:14 +00:00
)
2024-08-19 13:58:44 +00:00
pg_manager3 . drop_materialized_db ( db )
pg_manager3 . execute ( f ' drop table " { table } " ' )
2023-12-06 19:23:09 +00:00
2023-12-08 11:46:59 +00:00
def test_generated_columns ( started_cluster ) :
table = " test_generated_columns "
pg_manager . create_postgres_table (
table ,
" " ,
f """ CREATE TABLE { table } (
key integer PRIMARY KEY ,
2024-08-02 07:46:56 +00:00
x integer DEFAULT 0 ,
temp integer DEFAULT 0 ,
2023-12-08 11:46:59 +00:00
y integer GENERATED ALWAYS AS ( x * 2 ) STORED ,
2024-08-02 07:46:56 +00:00
z text DEFAULT ' z ' ) ;
2023-12-08 11:46:59 +00:00
""" ,
)
2024-08-02 07:46:56 +00:00
pg_manager . execute ( f " alter table { table } drop column temp; " )
2023-12-08 11:46:59 +00:00
pg_manager . execute ( f " insert into { table } (key, x, z) values (1,1, ' 1 ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, z) values (2,2, ' 2 ' ); " )
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
pg_manager . execute ( f " insert into { table } (key, x, z) values (3,3, ' 3 ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, z) values (4,4, ' 4 ' ); " )
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
pg_manager . execute ( f " insert into { table } (key, x, z) values (5,5, ' 5 ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, z) values (6,6, ' 6 ' ); " )
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
2024-08-02 07:46:56 +00:00
def test_generated_columns_with_sequence ( started_cluster ) :
table = " test_generated_columns_with_sequence "
pg_manager . create_postgres_table (
table ,
" " ,
f """ CREATE TABLE { table } (
key integer PRIMARY KEY ,
x integer ,
y integer GENERATED ALWAYS AS ( x * 2 ) STORED ,
z text ) ;
""" ,
)
pg_manager . execute (
f " create sequence { table } _id_seq increment by 1 minvalue 1 start 1; "
)
pg_manager . execute (
f " alter table { table } alter key set default nextval( ' { table } _id_seq ' ); "
)
pg_manager . execute ( f " insert into { table } (key, x, z) values (1,1, ' 1 ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, z) values (2,2, ' 2 ' ); " )
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
2023-12-08 11:46:59 +00:00
def test_default_columns ( started_cluster ) :
table = " test_default_columns "
pg_manager . create_postgres_table (
table ,
" " ,
f """ CREATE TABLE { table } (
key integer PRIMARY KEY ,
x integer ,
y text DEFAULT ' y1 ' ,
z integer ,
a text DEFAULT ' a1 ' ,
b integer ) ;
""" ,
)
pg_manager . execute ( f " insert into { table } (key, x, z, b) values (1,1,1,1); " )
pg_manager . execute ( f " insert into { table } (key, x, z, b) values (2,2,2,2); " )
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
pg_manager . execute ( f " insert into { table } (key, x, z, b) values (3,3,3,3); " )
pg_manager . execute ( f " insert into { table } (key, x, z, b) values (4,4,4,4); " )
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
pg_manager . execute ( f " insert into { table } (key, x, z, b) values (5,5,5,5); " )
pg_manager . execute ( f " insert into { table } (key, x, z, b) values (6,6,6,6); " )
check_tables_are_synchronized (
instance , table , postgres_database = pg_manager . get_default_database ( )
)
2023-12-11 16:30:23 +00:00
def test_dependent_loading ( started_cluster ) :
table = " test_dependent_loading "
pg_manager . create_postgres_table ( table )
instance . query (
f " INSERT INTO postgres_database. { table } SELECT number, number from numbers(0, 50) "
)
instance . query (
f """
SET allow_experimental_materialized_postgresql_table = 1 ;
CREATE TABLE { table } ( key Int32 , value Int32 )
ENGINE = MaterializedPostgreSQL ( ' {started_cluster.postgres_ip} : {started_cluster.postgres_port} ' , ' postgres_database ' , ' {table} ' , ' postgres ' , ' mysecretpassword ' ) ORDER BY key
"""
)
check_tables_are_synchronized (
instance ,
table ,
postgres_database = pg_manager . get_default_database ( ) ,
materialized_database = " default " ,
)
assert 50 == int ( instance . query ( f " SELECT count() FROM { table } " ) )
instance . restart_clickhouse ( )
check_tables_are_synchronized (
instance ,
table ,
postgres_database = pg_manager . get_default_database ( ) ,
materialized_database = " default " ,
)
assert 50 == int ( instance . query ( f " SELECT count() FROM { table } " ) )
uuid = instance . query (
f " SELECT uuid FROM system.tables WHERE name= ' { table } ' and database= ' default ' limit 1 "
) . strip ( )
nested_table = f " default.` { uuid } _nested` "
instance . contains_in_log (
f " Table default. { table } has 1 dependencies: { nested_table } (level 1) "
)
instance . query ( " SYSTEM FLUSH LOGS " )
nested_time = instance . query (
f " SELECT event_time_microseconds FROM system.text_log WHERE message like ' Loading table default. { uuid } _nested ' and message not like ' %like% ' "
) . strip ( )
2024-08-09 14:38:24 +00:00
time = (
instance . query (
f " SELECT event_time_microseconds FROM system.text_log WHERE message like ' Loading table default. { table } ' and message not like ' %like% ' "
)
. strip ( )
. split ( " \n " ) [ - 1 ]
)
2023-12-11 16:30:23 +00:00
instance . query (
f " SELECT toDateTime64( ' { nested_time } ' , 6) < toDateTime64( ' { time } ' , 6) "
)
2024-01-18 15:56:17 +00:00
instance . query ( f " DROP TABLE { table } SYNC " )
2023-12-11 16:30:23 +00:00
2024-08-30 13:47:41 +00:00
def test_partial_table ( started_cluster ) :
table = " test_partial_table "
pg_manager . create_postgres_table (
table ,
" " ,
f """ CREATE TABLE { table } (
key integer PRIMARY KEY ,
x integer DEFAULT 0 ,
y integer ,
z text DEFAULT ' z ' ) ;
""" ,
)
pg_manager . execute ( f " insert into { table } (key, x, z) values (1,1, ' a ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, z) values (2,2, ' b ' ); " )
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
f " materialized_postgresql_tables_list = ' { table } (z, key) ' " ,
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
check_tables_are_synchronized (
instance ,
table ,
postgres_database = pg_manager . get_default_database ( ) ,
columns = [ " key " , " z " ] ,
)
pg_manager . execute ( f " insert into { table } (key, x, z) values (3,3, ' c ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, z) values (4,4, ' d ' ); " )
check_tables_are_synchronized (
instance ,
table ,
postgres_database = pg_manager . get_default_database ( ) ,
columns = [ " key " , " z " ] ,
)
2024-09-26 16:10:20 +00:00
# it is important to check case when table name, with subset of columns, is substring of table with full set of columns
@pytest.mark.parametrize (
" table_list " ,
[
" {} (key, x, z), {} _full, {} _full1 " ,
" {} _full, {} (key, x, z), {} _full1 " ,
" {} _full, {} (key, x, z), {} _full1 " ,
" {} _full, {} _full1, {} (key, x, z) " ,
] ,
)
def test_partial_and_full_table ( started_cluster , table_list ) :
2024-08-30 13:47:41 +00:00
table = " test_partial_and_full_table "
2024-09-26 16:10:20 +00:00
table_list = table_list . format ( table , table , table )
print ( table_list )
2024-08-30 13:47:41 +00:00
pg_manager . create_postgres_table (
table ,
" " ,
2024-09-26 16:10:20 +00:00
f """ CREATE TABLE { table } (
2024-08-30 13:47:41 +00:00
key integer PRIMARY KEY ,
x integer DEFAULT 0 ,
y integer ,
z text DEFAULT ' z ' ) ;
""" ,
)
2024-09-26 16:10:20 +00:00
pg_manager . execute ( f " insert into { table } (key, x, y, z) values (1,1,1, ' 1 ' ); " )
pg_manager . execute ( f " insert into { table } (key, x, y, z) values (2,2,2, ' 2 ' ); " )
2024-08-30 13:47:41 +00:00
pg_manager . create_postgres_table (
2024-09-26 16:10:20 +00:00
table + " _full " ,
2024-08-30 13:47:41 +00:00
" " ,
2024-09-26 16:10:20 +00:00
f """ CREATE TABLE { table } _full (
2024-08-30 13:47:41 +00:00
key integer PRIMARY KEY ,
x integer DEFAULT 0 ,
y integer ,
z text DEFAULT ' z ' ) ;
""" ,
)
2024-09-26 16:10:20 +00:00
pg_manager . execute ( f " insert into { table } _full (key, x, y, z) values (3,3,3, ' 3 ' ); " )
pg_manager . execute ( f " insert into { table } _full (key, x, y, z) values (4,4,4, ' 4 ' ); " )
pg_manager . create_postgres_table (
table + " _full1 " ,
" " ,
f """ CREATE TABLE { table } _full1 (
key integer PRIMARY KEY ,
x integer DEFAULT 0 ,
y integer ,
z text DEFAULT ' z ' ) ;
""" ,
)
pg_manager . execute ( f " insert into { table } _full1 (key, x, y, z) values (5,5,5, ' 5 ' ); " )
pg_manager . execute ( f " insert into { table } _full1 (key, x, y, z) values (6,6,6, ' 6 ' ); " )
2024-08-30 13:47:41 +00:00
pg_manager . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
settings = [
2024-09-26 16:10:20 +00:00
f " materialized_postgresql_tables_list = ' { table_list } ' " ,
2024-08-30 13:47:41 +00:00
" materialized_postgresql_backoff_min_ms = 100 " ,
" materialized_postgresql_backoff_max_ms = 100 " ,
] ,
)
check_tables_are_synchronized (
instance ,
2024-09-26 16:10:20 +00:00
f " { table } " ,
2024-08-30 13:47:41 +00:00
postgres_database = pg_manager . get_default_database ( ) ,
columns = [ " key " , " x " , " z " ] ,
)
check_tables_are_synchronized (
2024-09-26 16:10:20 +00:00
instance , f " { table } _full " , postgres_database = pg_manager . get_default_database ( )
)
check_tables_are_synchronized (
instance , f " { table } _full1 " , postgres_database = pg_manager . get_default_database ( )
2024-08-30 13:47:41 +00:00
)
2024-09-26 16:10:20 +00:00
pg_manager . execute ( f " insert into { table } (key, x, z) values (7,7, ' 7 ' ); " )
pg_manager . execute ( f " insert into { table } _full (key, x, z) values (8,8, ' 8 ' ); " )
pg_manager . execute ( f " insert into { table } _full1 (key, x, z) values (9,9, ' 9 ' ); " )
2024-08-30 13:47:41 +00:00
check_tables_are_synchronized (
instance ,
2024-09-26 16:10:20 +00:00
f " { table } " ,
2024-08-30 13:47:41 +00:00
postgres_database = pg_manager . get_default_database ( ) ,
columns = [ " key " , " x " , " z " ] ,
)
check_tables_are_synchronized (
2024-09-26 16:10:20 +00:00
instance , f " { table } _full " , postgres_database = pg_manager . get_default_database ( )
)
check_tables_are_synchronized (
instance , f " { table } _full1 " , postgres_database = pg_manager . get_default_database ( )
2024-08-30 13:47:41 +00:00
)
2024-04-25 11:37:24 +00:00
def test_quoting_publication ( started_cluster ) :
postgres_database = " postgres-postgres "
2024-09-17 12:11:17 +00:00
pg_manager3 = PostgresManager ( )
pg_manager3 . init (
instance ,
cluster . postgres_ip ,
cluster . postgres_port ,
default_database = postgres_database ,
)
NUM_TABLES = 5
2024-04-25 11:37:24 +00:00
materialized_database = " test-database "
pg_manager3 . create_and_fill_postgres_tables ( NUM_TABLES , 10000 )
check_table_name_1 = " postgresql-replica-5 "
pg_manager3 . create_and_fill_postgres_table ( check_table_name_1 )
pg_manager3 . create_materialized_db (
ip = started_cluster . postgres_ip ,
port = started_cluster . postgres_port ,
materialized_database = materialized_database ,
)
check_several_tables_are_synchronized (
instance ,
NUM_TABLES ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
result = instance . query ( f " SHOW TABLES FROM ` { materialized_database } ` " )
assert (
result
== " postgresql-replica-5 \n postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
)
check_tables_are_synchronized (
instance ,
check_table_name_1 ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
instance . query (
f " INSERT INTO ` { postgres_database } `.` { check_table_name_1 } ` SELECT number, number from numbers(10000, 10000) "
)
check_tables_are_synchronized (
instance ,
check_table_name_1 ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
check_table_name_2 = " postgresql-replica-6 "
pg_manager3 . create_and_fill_postgres_table ( check_table_name_2 )
instance . query ( f " ATTACH TABLE ` { materialized_database } `.` { check_table_name_2 } ` " )
result = instance . query ( f " SHOW TABLES FROM ` { materialized_database } ` " )
assert (
result
== " postgresql-replica-5 \n postgresql-replica-6 \n postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
)
check_tables_are_synchronized (
instance ,
check_table_name_2 ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
instance . query (
f " INSERT INTO ` { postgres_database } `.` { check_table_name_2 } ` SELECT number, number from numbers(10000, 10000) "
)
check_tables_are_synchronized (
instance ,
check_table_name_2 ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
instance . restart_clickhouse ( )
check_tables_are_synchronized (
instance ,
check_table_name_1 ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
check_tables_are_synchronized (
instance ,
check_table_name_2 ,
materialized_database = materialized_database ,
postgres_database = postgres_database ,
)
instance . query (
f " DETACH TABLE ` { materialized_database } `.` { check_table_name_2 } ` PERMANENTLY "
)
time . sleep ( 5 )
result = instance . query ( f " SHOW TABLES FROM ` { materialized_database } ` " )
assert (
result
== " postgresql-replica-5 \n postgresql_replica_0 \n postgresql_replica_1 \n postgresql_replica_2 \n postgresql_replica_3 \n postgresql_replica_4 \n "
2024-09-17 12:11:17 +00:00
)
2024-04-25 11:37:24 +00:00
2021-11-30 12:26:57 +00:00
if __name__ == " __main__ " :
cluster . start ( )
input ( " Cluster created, press any key to destroy... " )
cluster . shutdown ( )