2022-01-07 21:27:46 +00:00
import psycopg2
import time
from psycopg2 . extensions import ISOLATION_LEVEL_AUTOCOMMIT
postgres_table_template = """
CREATE TABLE IF NOT EXISTS " {} " (
key Integer NOT NULL , value Integer , PRIMARY KEY ( key ) )
"""
postgres_table_template_2 = """
CREATE TABLE IF NOT EXISTS " {} " (
key Integer NOT NULL , value1 Integer , value2 Integer , value3 Integer , PRIMARY KEY ( key ) )
"""
postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS " {} " (
key1 Integer NOT NULL , value1 Integer , key2 Integer NOT NULL , value2 Integer NOT NULL )
"""
postgres_table_template_4 = """
CREATE TABLE IF NOT EXISTS " {} " . " {} " (
key Integer NOT NULL , value Integer , PRIMARY KEY ( key ) )
"""
2022-01-08 12:26:29 +00:00
postgres_table_template_5 = """
CREATE TABLE IF NOT EXISTS " {} " (
key Integer NOT NULL , value UUID , PRIMARY KEY ( key ) )
"""
2022-01-07 21:27:46 +00:00
def get_postgres_conn ( ip , port , database = False , auto_commit = True , database_name = ' postgres_database ' , replication = False ) :
if database == True :
conn_string = f " host= { ip } port= { port } dbname= ' { database_name } ' user= ' postgres ' password= ' mysecretpassword ' "
else :
conn_string = f " host= { ip } port= { port } user= ' postgres ' password= ' mysecretpassword ' "
if replication :
conn_string + = " replication= ' database ' "
conn = psycopg2 . connect ( conn_string )
if auto_commit :
conn . set_isolation_level ( ISOLATION_LEVEL_AUTOCOMMIT )
conn . autocommit = True
return conn
def create_replication_slot ( conn , slot_name = ' user_slot ' ) :
cursor = conn . cursor ( )
cursor . execute ( f ' CREATE_REPLICATION_SLOT { slot_name } LOGICAL pgoutput EXPORT_SNAPSHOT ' )
result = cursor . fetchall ( )
print ( result [ 0 ] [ 0 ] ) # slot name
print ( result [ 0 ] [ 1 ] ) # start lsn
print ( result [ 0 ] [ 2 ] ) # snapshot
return result [ 0 ] [ 2 ]
def drop_replication_slot ( conn , slot_name = ' user_slot ' ) :
cursor = conn . cursor ( )
cursor . execute ( f " select pg_drop_replication_slot( ' { slot_name } ' ) " )
def create_postgres_schema ( cursor , schema_name ) :
drop_postgres_schema ( cursor , schema_name )
cursor . execute ( f ' CREATE SCHEMA { schema_name } ' )
def drop_postgres_schema ( cursor , schema_name ) :
cursor . execute ( f ' DROP SCHEMA IF EXISTS { schema_name } CASCADE ' )
def create_postgres_table ( cursor , table_name , replica_identity_full = False , template = postgres_table_template ) :
drop_postgres_table ( cursor , table_name )
cursor . execute ( template . format ( table_name ) )
if replica_identity_full :
cursor . execute ( f ' ALTER TABLE { table_name } REPLICA IDENTITY FULL; ' )
def drop_postgres_table ( cursor , table_name ) :
cursor . execute ( f """ DROP TABLE IF EXISTS " { table_name } " """ )
def create_postgres_table_with_schema ( cursor , schema_name , table_name ) :
drop_postgres_table_with_schema ( cursor , schema_name , table_name )
cursor . execute ( postgres_table_template_4 . format ( schema_name , table_name ) )
def drop_postgres_table_with_schema ( cursor , schema_name , table_name ) :
cursor . execute ( f """ DROP TABLE IF EXISTS " { schema_name } " . " { table_name } " """ )
class PostgresManager :
def __init__ ( self ) :
self . created_postgres_db_list = set ( )
self . created_materialized_postgres_db_list = set ( )
self . created_ch_postgres_db_list = set ( )
def init ( self , instance , ip , port ) :
self . instance = instance
self . ip = ip
self . port = port
2022-01-08 12:26:29 +00:00
self . conn = get_postgres_conn ( ip = self . ip , port = self . port )
2022-01-07 21:27:46 +00:00
self . prepare ( )
def restart ( self ) :
try :
self . clear ( )
self . prepare ( )
except Exception as ex :
self . prepare ( )
raise ex
def prepare ( self ) :
conn = get_postgres_conn ( ip = self . ip , port = self . port )
cursor = conn . cursor ( )
self . create_postgres_db ( cursor , ' postgres_database ' )
self . create_clickhouse_postgres_db ( ip = self . ip , port = self . port )
def clear ( self ) :
2022-01-08 12:26:29 +00:00
if self . conn . closed == 0 :
self . conn . close ( )
2022-01-07 21:27:46 +00:00
for db in self . created_materialized_postgres_db_list . copy ( ) :
self . drop_materialized_db ( db ) ;
for db in self . created_ch_postgres_db_list . copy ( ) :
self . drop_clickhouse_postgres_db ( db )
if len ( self . created_postgres_db_list ) > 0 :
conn = get_postgres_conn ( ip = self . ip , port = self . port )
cursor = conn . cursor ( )
for db in self . created_postgres_db_list . copy ( ) :
self . drop_postgres_db ( cursor , db )
2022-01-08 12:26:29 +00:00
def get_db_cursor ( self ) :
self . conn = get_postgres_conn ( ip = self . ip , port = self . port , database = True )
return self . conn . cursor ( )
2022-01-07 21:27:46 +00:00
def create_postgres_db ( self , cursor , name = ' postgres_database ' ) :
self . drop_postgres_db ( cursor , name )
self . created_postgres_db_list . add ( name )
cursor . execute ( f " CREATE DATABASE { name } " )
def drop_postgres_db ( self , cursor , name = ' postgres_database ' ) :
cursor . execute ( f " DROP DATABASE IF EXISTS { name } " )
if name in self . created_postgres_db_list :
self . created_postgres_db_list . remove ( name )
def create_clickhouse_postgres_db ( self , ip , port , name = ' postgres_database ' , database_name = ' postgres_database ' , schema_name = ' ' ) :
self . drop_clickhouse_postgres_db ( name )
self . created_ch_postgres_db_list . add ( name )
if len ( schema_name ) == 0 :
self . instance . query ( f '''
CREATE DATABASE { name }
ENGINE = PostgreSQL ( ' {ip} : {port} ' , ' {database_name} ' , ' postgres ' , ' mysecretpassword ' ) ''' )
else :
self . instance . query ( f '''
CREATE DATABASE { name }
ENGINE = PostgreSQL ( ' {ip} : {port} ' , ' {database_name} ' , ' postgres ' , ' mysecretpassword ' , ' {schema_name} ' ) ''' )
def drop_clickhouse_postgres_db ( self , name = ' postgres_database ' ) :
self . instance . query ( f ' DROP DATABASE IF EXISTS { name } ' )
if name in self . created_ch_postgres_db_list :
self . created_ch_postgres_db_list . remove ( name )
def create_materialized_db ( self , ip , port ,
2022-01-08 12:26:29 +00:00
materialized_database = ' test_database ' , postgres_database = ' postgres_database ' ,
settings = [ ] , table_overrides = ' ' ) :
2022-01-07 21:27:46 +00:00
self . created_materialized_postgres_db_list . add ( materialized_database )
self . instance . query ( f " DROP DATABASE IF EXISTS { materialized_database } " )
create_query = f " CREATE DATABASE { materialized_database } ENGINE = MaterializedPostgreSQL( ' { ip } : { port } ' , ' { postgres_database } ' , ' postgres ' , ' mysecretpassword ' ) "
if len ( settings ) > 0 :
create_query + = " SETTINGS "
for i in range ( len ( settings ) ) :
if i != 0 :
create_query + = ' , '
create_query + = settings [ i ]
2022-01-08 12:26:29 +00:00
create_query + = table_overrides
2022-01-07 21:27:46 +00:00
self . instance . query ( create_query )
assert materialized_database in self . instance . query ( ' SHOW DATABASES ' )
def drop_materialized_db ( self , materialized_database = ' test_database ' ) :
self . instance . query ( f ' DROP DATABASE IF EXISTS { materialized_database } NO DELAY ' )
if materialized_database in self . created_materialized_postgres_db_list :
self . created_materialized_postgres_db_list . remove ( materialized_database )
assert materialized_database not in self . instance . query ( ' SHOW DATABASES ' )
def create_and_fill_postgres_table ( self , table_name ) :
conn = get_postgres_conn ( ip = self . ip , port = self . port , database = True )
cursor = conn . cursor ( )
2022-01-08 12:26:29 +00:00
self . create_and_fill_postgres_table_from_cursor ( cursor , table_name )
def create_and_fill_postgres_table_from_cursor ( self , cursor , table_name ) :
2022-01-07 21:27:46 +00:00
create_postgres_table ( cursor , table_name ) ;
self . instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, number from numbers(50) " )
def create_and_fill_postgres_tables ( self , tables_num , numbers = 50 ) :
conn = get_postgres_conn ( ip = self . ip , port = self . port , database = True )
cursor = conn . cursor ( )
self . create_and_fill_postgres_tables_from_cursor ( cursor , tables_num , numbers = numbers )
def create_and_fill_postgres_tables_from_cursor ( self , cursor , tables_num , numbers = 50 ) :
for i in range ( tables_num ) :
table_name = f ' postgresql_replica_ { i } '
create_postgres_table ( cursor , table_name ) ;
if numbers > 0 :
self . instance . query ( f " INSERT INTO postgres_database. { table_name } SELECT number, number from numbers( { numbers } ) " )
queries = [
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(0, 10000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE (value*value) % 3 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value - 125 WHERE key % 2 = 0; ' ,
" UPDATE postgresql_replica_ {} SET key=key+20000 WHERE key % 2=0 " ,
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(40000, 50000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE key % 10 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value + 101 WHERE key % 2 = 1; ' ,
" UPDATE postgresql_replica_ {} SET key=key+80000 WHERE key % 2=1 " ,
' DELETE FROM postgresql_replica_ {} WHERE value % 2 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value + 2000 WHERE key % 5 = 0; ' ,
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(200000, 250000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE value % 3 = 0; ' ,
' UPDATE postgresql_replica_ {} SET value = value * 2 WHERE key % 3 = 0; ' ,
" UPDATE postgresql_replica_ {} SET key=key+500000 WHERE key % 2=1 " ,
' INSERT INTO postgresql_replica_ {} select i, i from generate_series(1000000, 1050000) as t(i); ' ,
' DELETE FROM postgresql_replica_ {} WHERE value % 9 = 2; ' ,
" UPDATE postgresql_replica_ {} SET key=key+10000000 " ,
' UPDATE postgresql_replica_ {} SET value = value + 2 WHERE key % 3 = 1; ' ,
' DELETE FROM postgresql_replica_ {} WHERE value % 5 = 0; '
]
def assert_nested_table_is_created ( instance , table_name , materialized_database = ' test_database ' , schema_name = ' ' ) :
if len ( schema_name ) == 0 :
table = table_name
else :
table = schema_name + " . " + table_name
print ( f ' Checking table { table } exists in { materialized_database } ' )
database_tables = instance . query ( f ' SHOW TABLES FROM { materialized_database } ' )
while table not in database_tables :
time . sleep ( 0.2 )
database_tables = instance . query ( f ' SHOW TABLES FROM { materialized_database } ' )
assert ( table in database_tables )
def assert_number_of_columns ( instance , expected , table_name , database_name = ' test_database ' ) :
result = instance . query ( f " select count() from system.columns where table = ' { table_name } ' and database = ' { database_name } ' and not startsWith(name, ' _ ' ) " )
while ( int ( result ) != expected ) :
time . sleep ( 1 )
result = instance . query ( f " select count() from system.columns where table = ' { table_name } ' and database = ' { database_name } ' and not startsWith(name, ' _ ' ) " )
print ( ' Number of columns ok ' )
def check_tables_are_synchronized ( instance , table_name , order_by = ' key ' , postgres_database = ' postgres_database ' , materialized_database = ' test_database ' , schema_name = ' ' ) :
assert_nested_table_is_created ( instance , table_name , materialized_database , schema_name )
table_path = ' '
if len ( schema_name ) == 0 :
table_path = f ' { materialized_database } . { table_name } '
else :
table_path = f ' { materialized_database } .` { schema_name } . { table_name } ` '
print ( f " Checking table is synchronized: { table_path } " )
result_query = f ' select * from { table_path } order by { order_by } ; '
expected = instance . query ( f ' select * from { postgres_database } . { table_name } order by { order_by } ; ' )
result = instance . query ( result_query )
2022-01-08 12:26:29 +00:00
for _ in range ( 30 ) :
2022-01-07 21:27:46 +00:00
if result == expected :
break
else :
time . sleep ( 0.5 )
result = instance . query ( result_query )
assert ( result == expected )
def check_several_tables_are_synchronized ( instance , tables_num , order_by = ' key ' , postgres_database = ' postgres_database ' , materialized_database = ' test_database ' , schema_name = ' ' ) :
for i in range ( tables_num ) :
check_tables_are_synchronized ( instance , f ' postgresql_replica_ { i } ' ) ;