This commit is contained in:
kssenii 2023-05-11 18:09:46 +02:00
parent bd59effd9a
commit c9e752fdc5
3 changed files with 99 additions and 129 deletions

View File

@ -87,7 +87,9 @@ def create_postgres_table(
else:
name = f"{database_name}.{table_name}"
drop_postgres_table(cursor, name)
cursor.execute(template.format(name))
query = template.format(name)
cursor.execute(query)
print(f"Query: {query}")
if replica_identity_full:
cursor.execute(f"ALTER TABLE {name} REPLICA IDENTITY FULL;")
@ -129,6 +131,9 @@ class PostgresManager:
self.prepare()
raise ex
def execute(self, query):
self.cursor.execute(query)
def prepare(self):
self.conn = get_postgres_conn(ip=self.ip, port=self.port)
self.cursor = self.conn.cursor()
@ -141,6 +146,7 @@ class PostgresManager:
database_name=self.default_database,
)
self.cursor = self.conn.cursor()
self.create_clickhouse_postgres_db()
def clear(self):
if self.conn.closed == 0:
@ -164,11 +170,11 @@ class PostgresManager:
return self.conn.cursor()
def database_or_default(self, database_name):
if database_name == "" and self.default_database == "":
raise Exception("Database name is empty")
if database_name == "":
database_name = self.default_database
return database_name
if database_name != "":
return database_name
if self.default_database != "":
return self.default_database
raise Exception("Database name is empty")
def create_postgres_db(self, database_name=""):
database_name = self.database_or_default(database_name)
@ -186,8 +192,11 @@ class PostgresManager:
self,
database_name="",
schema_name="",
postgres_database="",
):
database_name = self.database_or_default(database_name)
if postgres_database == "":
postgres_database = database_name
self.drop_clickhouse_postgres_db(database_name)
self.created_ch_postgres_db_list.add(database_name)
@ -195,13 +204,13 @@ class PostgresManager:
self.instance.query(
f"""
CREATE DATABASE {database_name}
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{database_name}', 'postgres', 'mysecretpassword')"""
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{postgres_database}', 'postgres', 'mysecretpassword')"""
)
else:
self.instance.query(
f"""
CREATE DATABASE {database_name}
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{database_name}', 'postgres', 'mysecretpassword', '{schema_name}')"""
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{postgres_database}', 'postgres', 'mysecretpassword', '{schema_name}')"""
)
def drop_clickhouse_postgres_db(self, database_name=""):
@ -239,6 +248,16 @@ class PostgresManager:
if materialized_database in self.created_materialized_postgres_db_list:
self.created_materialized_postgres_db_list.remove(materialized_database)
def create_postgres_schema(self, name):
create_postgres_schema(self.cursor, name)
def create_postgres_table(
self, table_name, database_name="", template=postgres_table_template
):
create_postgres_table(
self.cursor, table_name, database_name=database_name, template=template
)
def create_and_fill_postgres_table(self, table_name, database_name=""):
create_postgres_table(self.cursor, table_name, database_name)
database_name = self.database_or_default(database_name)
@ -246,22 +265,14 @@ class PostgresManager:
f"INSERT INTO {database_name}.{table_name} SELECT number, number from numbers(50)"
)
def create_and_fill_postgres_tables(self, tables_num, numbers=50):
conn = get_postgres_conn(ip=self.ip, port=self.port, database=True)
cursor = conn.cursor()
self.create_and_fill_postgres_tables_from_cursor(
cursor, tables_num, numbers=numbers
)
def create_and_fill_postgres_tables_from_cursor(
self, cursor, tables_num, numbers=50
):
def create_and_fill_postgres_tables(self, tables_num, numbers=50, database_name=""):
for i in range(tables_num):
table_name = f"postgresql_replica_{i}"
create_postgres_table(cursor, table_name)
create_postgres_table(self.cursor, table_name, database_name)
if numbers > 0:
db = self.database_or_default(database_name)
self.instance.query(
f"INSERT INTO postgres_database.{table_name} SELECT number, number from numbers({numbers})"
f"INSERT INTO {db}.{table_name} SELECT number, number from numbers({numbers})"
)

View File

@ -46,7 +46,12 @@ pg_manager = PostgresManager()
def started_cluster():
try:
cluster.start()
pg_manager.init(instance, cluster.postgres_ip, cluster.postgres_port)
pg_manager.init(
instance,
cluster.postgres_ip,
cluster.postgres_port,
default_database="postgres_database",
)
yield cluster
finally:
@ -74,16 +79,10 @@ def test_load_and_sync_all_database_tables(started_cluster):
def test_replicating_dml(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, "postgresql_replica_{}".format(i))
pg_manager.create_postgres_table(f"postgresql_replica_{i}")
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(
i, i
@ -96,39 +95,29 @@ def test_replicating_dml(started_cluster):
for i in range(NUM_TABLES):
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT 50 + number, {} from numbers(1000)".format(
i, i
)
f"INSERT INTO postgres_database.postgresql_replica_{i} SELECT 50 + number, {i} from numbers(1000)"
)
check_several_tables_are_synchronized(instance, NUM_TABLES)
for i in range(NUM_TABLES):
cursor.execute(
"UPDATE postgresql_replica_{} SET value = {} * {} WHERE key < 50;".format(
i, i, i
)
pg_manager.execute(
f"UPDATE postgresql_replica_{i} SET value = {i} * {i} WHERE key < 50;"
)
cursor.execute(
"UPDATE postgresql_replica_{} SET value = {} * {} * {} WHERE key >= 50;".format(
i, i, i, i
)
pg_manager.execute(
f"UPDATE postgresql_replica_{i} SET value = {i} * {i} * {i} WHERE key >= 50;"
)
check_several_tables_are_synchronized(instance, NUM_TABLES)
for i in range(NUM_TABLES):
cursor.execute(
"DELETE FROM postgresql_replica_{} WHERE (value*value + {}) % 2 = 0;".format(
i, i
)
pg_manager.execute(
f"DELETE FROM postgresql_replica_{i} WHERE (value*value + {i}) % 2 = 0;"
)
cursor.execute(
"UPDATE postgresql_replica_{} SET value = value - (value % 7) WHERE key > 128 AND key < 512;".format(
i
)
)
cursor.execute(
"DELETE FROM postgresql_replica_{} WHERE key % 7 = 1;".format(i, i)
pg_manager.execute(
f"UPDATE postgresql_replica_{i} SET value = value - (value % 7) WHERE key > 128 AND key < 512;"
)
pg_manager.execute(f"DELETE FROM postgresql_replica_{i} WHERE key % 7 = 1;")
check_several_tables_are_synchronized(instance, NUM_TABLES)
@ -288,13 +277,7 @@ def test_load_and_sync_subset_of_database_tables(started_cluster):
def test_changing_replica_identity_value(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
create_postgres_table(cursor, "postgresql_replica")
pg_manager.create_postgres_table("postgresql_replica")
instance.query(
"INSERT INTO postgres_database.postgresql_replica SELECT 50 + number, number from numbers(50)"
)
@ -307,7 +290,7 @@ def test_changing_replica_identity_value(started_cluster):
"INSERT INTO postgres_database.postgresql_replica SELECT 100 + number, number from numbers(50)"
)
check_tables_are_synchronized(instance, "postgresql_replica")
cursor.execute("UPDATE postgresql_replica SET key=key-25 WHERE key<100 ")
pg_manager.execute("UPDATE postgresql_replica SET key=key-25 WHERE key<100 ")
check_tables_are_synchronized(instance, "postgresql_replica")
@ -331,18 +314,13 @@ def test_clickhouse_restart(started_cluster):
def test_replica_identity_index(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
pg_manager.create_postgres_table(
"postgresql_replica", template=postgres_table_template_3
)
cursor = conn.cursor()
create_postgres_table(
cursor, "postgresql_replica", template=postgres_table_template_3
pg_manager.execute("CREATE unique INDEX idx on postgresql_replica(key1, key2);")
pg_manager.execute(
"ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx"
)
cursor.execute("CREATE unique INDEX idx on postgresql_replica(key1, key2);")
cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx")
instance.query(
"INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)"
)
@ -355,35 +333,29 @@ def test_replica_identity_index(started_cluster):
)
check_tables_are_synchronized(instance, "postgresql_replica", order_by="key1")
cursor.execute("UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 ")
cursor.execute("UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 ")
cursor.execute("UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 ")
cursor.execute("UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 ")
pg_manager.execute("UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 ")
pg_manager.execute("UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 ")
pg_manager.execute(
"UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 "
)
pg_manager.execute(
"UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 "
)
check_tables_are_synchronized(instance, "postgresql_replica", order_by="key1")
cursor.execute("DELETE FROM postgresql_replica WHERE key2<75;")
pg_manager.execute("DELETE FROM postgresql_replica WHERE key2<75;")
check_tables_are_synchronized(instance, "postgresql_replica", order_by="key1")
def test_table_schema_changes(started_cluster):
conn = get_postgres_conn(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(
cursor,
"postgresql_replica_{}".format(i),
template=postgres_table_template_2,
pg_manager.create_postgres_table(
f"postgresql_replica_{i}", template=postgres_table_template_2
)
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(
i, i, i, i
)
f"INSERT INTO postgres_database.postgresql_replica_{i} SELECT number, {i}, {i}, {i} from numbers(25)"
)
pg_manager.create_materialized_db(
@ -393,9 +365,7 @@ def test_table_schema_changes(started_cluster):
for i in range(NUM_TABLES):
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(
i, i, i, i
)
f"INSERT INTO postgres_database.postgresql_replica_{i} SELECT 25 + number, {i}, {i}, {i} from numbers(25)"
)
check_several_tables_are_synchronized(instance, NUM_TABLES)
@ -444,10 +414,7 @@ def test_many_concurrent_queries(started_cluster):
port=started_cluster.postgres_port,
database=True,
)
cursor = conn.cursor()
pg_manager.create_and_fill_postgres_tables_from_cursor(
cursor, NUM_TABLES, numbers=10000
)
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, numbers=10000)
def attack(thread_id):
print("thread {}".format(thread_id))

View File

@ -67,13 +67,11 @@ def started_cluster():
instance,
cluster.postgres_ip,
cluster.postgres_port,
default_database="test_database",
default_database="postgres_database",
)
pg_manager.create_clickhouse_postgres_db()
pg_manager2.init(
instance2, cluster.postgres_ip, cluster.postgres_port, "test_database2"
instance2, cluster.postgres_ip, cluster.postgres_port, "postgres_database2"
)
pg_manager2.create_clickhouse_postgres_db()
yield cluster
finally:
@ -88,11 +86,10 @@ def setup_teardown():
def test_add_new_table_to_replication(started_cluster):
cursor = pg_manager.get_db_cursor()
cursor.execute("DROP TABLE IF EXISTS test_table")
pg_manager.execute("DROP TABLE IF EXISTS test_table")
NUM_TABLES = 5
pg_manager.create_and_fill_postgres_tables_from_cursor(cursor, NUM_TABLES, 10000)
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, 10000)
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
)
@ -105,7 +102,7 @@ def test_add_new_table_to_replication(started_cluster):
)
table_name = "postgresql_replica_5"
pg_manager.create_and_fill_postgres_table_from_cursor(cursor, table_name)
pg_manager.create_and_fill_postgres_table(table_name)
result = instance.query("SHOW CREATE DATABASE test_database")
assert (
@ -158,7 +155,7 @@ def test_add_new_table_to_replication(started_cluster):
)
table_name = "postgresql_replica_6"
create_postgres_table(cursor, table_name)
pg_manager.create_postgres_table(table_name)
instance.query(
"INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(
table_name
@ -169,7 +166,7 @@ def test_add_new_table_to_replication(started_cluster):
instance.restart_clickhouse()
table_name = "postgresql_replica_7"
create_postgres_table(cursor, table_name)
pg_manager.create_postgres_table(table_name)
instance.query(
"INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(
table_name
@ -271,8 +268,7 @@ def test_remove_table_from_replication(started_cluster):
== ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n"
)
cursor = pg_manager.get_db_cursor()
cursor.execute(f"drop table if exists postgresql_replica_0;")
pg_manager.execute(f"drop table if exists postgresql_replica_0;")
# Removing from replication table which does not exist in PostgreSQL must be ok.
instance.query("DETACH TABLE test_database.postgresql_replica_0 PERMANENTLY")
@ -282,10 +278,11 @@ def test_remove_table_from_replication(started_cluster):
def test_predefined_connection_configuration(started_cluster):
cursor = pg_manager.get_db_cursor()
cursor.execute(f"DROP TABLE IF EXISTS test_table")
cursor.execute(f"CREATE TABLE test_table (key integer PRIMARY KEY, value integer)")
cursor.execute(f"INSERT INTO test_table SELECT 1, 2")
pg_manager.execute(f"DROP TABLE IF EXISTS test_table")
pg_manager.execute(
f"CREATE TABLE test_table (key integer PRIMARY KEY, value integer)"
)
pg_manager.execute(f"INSERT INTO test_table SELECT 1, 2")
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list='test_table'"
)
@ -332,10 +329,9 @@ def test_database_with_single_non_default_schema(started_cluster):
create_postgres_schema(cursor, schema_name)
pg_manager.create_clickhouse_postgres_db(
ip=cluster.postgres_ip,
port=cluster.postgres_port,
name=clickhouse_postgres_db,
database_name=clickhouse_postgres_db,
schema_name=schema_name,
postgres_database="postgres_database",
)
for i in range(NUM_TABLES):
@ -367,7 +363,7 @@ def test_database_with_single_non_default_schema(started_cluster):
check_all_tables_are_synchronized()
altered_table = random.randint(0, NUM_TABLES - 1)
cursor.execute(
pg_manager.execute(
"ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(
altered_table
)
@ -434,10 +430,9 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
create_postgres_schema(cursor, schema_name)
pg_manager.create_clickhouse_postgres_db(
ip=cluster.postgres_ip,
port=cluster.postgres_port,
name=clickhouse_postgres_db,
database_name=clickhouse_postgres_db,
schema_name=schema_name,
postgres_database="postgres_database",
)
for i in range(NUM_TABLES):
@ -472,7 +467,7 @@ def test_database_with_multiple_non_default_schemas_1(started_cluster):
check_all_tables_are_synchronized()
altered_table = random.randint(0, NUM_TABLES - 1)
cursor.execute(
pg_manager.execute(
"ALTER TABLE test_schema.postgresql_replica_{} ADD COLUMN value2 integer".format(
altered_table
)
@ -550,10 +545,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
clickhouse_postgres_db = f"clickhouse_postgres_db{i}"
create_postgres_schema(cursor, schema_name)
pg_manager.create_clickhouse_postgres_db(
ip=cluster.postgres_ip,
port=cluster.postgres_port,
name=clickhouse_postgres_db,
schema_name=schema_name,
database_name=clickhouse_postgres_db, schema_name=schema_name, postgres_database="postgres_database",
)
for ti in range(NUM_TABLES):
table_name = f"postgresql_replica_{ti}"
@ -586,7 +578,7 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
altered_schema = random.randint(0, schemas_num - 1)
altered_table = random.randint(0, NUM_TABLES - 1)
clickhouse_postgres_db = f"clickhouse_postgres_db{altered_schema}"
cursor.execute(
pg_manager.execute(
f"ALTER TABLE schema{altered_schema}.postgresql_replica_{altered_table} ADD COLUMN value2 integer"
)
@ -619,10 +611,9 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
def test_table_override(started_cluster):
cursor = pg_manager.get_db_cursor()
table_name = "table_override"
materialized_database = "test_database"
create_postgres_table(cursor, table_name, template=postgres_table_template_5)
pg_manager.create_postgres_table(table_name, template=postgres_table_template_5)
instance.query(
f"create table {table_name}(key Int32, value UUID) engine = PostgreSQL (postgres1, table={table_name})"
)
@ -649,10 +640,11 @@ def test_table_override(started_cluster):
def test_materialized_view(started_cluster):
cursor = pg_manager.get_db_cursor()
cursor.execute(f"DROP TABLE IF EXISTS test_table")
cursor.execute(f"CREATE TABLE test_table (key integer PRIMARY KEY, value integer)")
cursor.execute(f"INSERT INTO test_table SELECT 1, 2")
pg_manager.execute(f"DROP TABLE IF EXISTS test_table")
pg_manager.execute(
f"CREATE TABLE test_table (key integer PRIMARY KEY, value integer)"
)
pg_manager.execute(f"INSERT INTO test_table SELECT 1, 2")
instance.query("DROP DATABASE IF EXISTS test_database")
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL(postgres1) SETTINGS materialized_postgresql_tables_list='test_table'"
@ -663,7 +655,7 @@ def test_materialized_view(started_cluster):
"CREATE MATERIALIZED VIEW mv ENGINE=MergeTree ORDER BY tuple() POPULATE AS SELECT * FROM test_database.test_table"
)
assert "1\t2" == instance.query("SELECT * FROM mv").strip()
cursor.execute(f"INSERT INTO test_table SELECT 3, 4")
pg_manager.execute(f"INSERT INTO test_table SELECT 3, 4")
check_tables_are_synchronized(instance, "test_table")
assert "1\t2\n3\t4" == instance.query("SELECT * FROM mv ORDER BY 1, 2").strip()
pg_manager.drop_materialized_db()