mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Add a test
This commit is contained in:
parent
5a6fe87b7c
commit
5ffa2c9ca1
@ -82,24 +82,24 @@ def drop_postgres_schema(cursor, schema_name):
|
||||
def create_postgres_table(
|
||||
cursor,
|
||||
table_name,
|
||||
database_name="",
|
||||
replica_identity_full=False,
|
||||
template=postgres_table_template,
|
||||
):
|
||||
if database_name == "":
|
||||
name = table_name
|
||||
else:
|
||||
name = f"{database_name}.{table_name}"
|
||||
drop_postgres_table(cursor, name)
|
||||
query = template.format(name)
|
||||
cursor.execute(query)
|
||||
drop_postgres_table(cursor, table_name)
|
||||
query = template.format(table_name)
|
||||
|
||||
print(f"Query: {query}")
|
||||
cursor.execute(query)
|
||||
|
||||
if replica_identity_full:
|
||||
cursor.execute(f"ALTER TABLE {name} REPLICA IDENTITY FULL;")
|
||||
cursor.execute(f"""ALTER TABLE "{table_name}" REPLICA IDENTITY FULL;""")
|
||||
|
||||
|
||||
def drop_postgres_table(cursor, name):
|
||||
cursor.execute(f"""DROP TABLE IF EXISTS "{name}" """)
|
||||
def drop_postgres_table(cursor, name, database_name=""):
|
||||
if database_name != "":
|
||||
cursor.execute(f"""DROP TABLE IF EXISTS "{database_name}"."{name}" """)
|
||||
else:
|
||||
cursor.execute(f"""DROP TABLE IF EXISTS "{name}" """)
|
||||
|
||||
|
||||
def create_postgres_table_with_schema(cursor, schema_name, table_name):
|
||||
@ -245,9 +245,9 @@ class PostgresManager:
|
||||
):
|
||||
postgres_database = self.database_or_default(postgres_database)
|
||||
self.created_materialized_postgres_db_list.add(materialized_database)
|
||||
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database}")
|
||||
self.instance.query(f"DROP DATABASE IF EXISTS `{materialized_database}`")
|
||||
|
||||
create_query = f"CREATE DATABASE {materialized_database} ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', '{user}', '{password}')"
|
||||
create_query = f"CREATE DATABASE `{materialized_database}` ENGINE = MaterializedPostgreSQL('{ip}:{port}', '{postgres_database}', '{user}', '{password}')"
|
||||
if len(settings) > 0:
|
||||
create_query += " SETTINGS "
|
||||
for i in range(len(settings)):
|
||||
@ -259,7 +259,7 @@ class PostgresManager:
|
||||
assert materialized_database in self.instance.query("SHOW DATABASES")
|
||||
|
||||
def drop_materialized_db(self, materialized_database="test_database"):
|
||||
self.instance.query(f"DROP DATABASE IF EXISTS {materialized_database} SYNC")
|
||||
self.instance.query(f"DROP DATABASE IF EXISTS `{materialized_database}` SYNC")
|
||||
if materialized_database in self.created_materialized_postgres_db_list:
|
||||
self.created_materialized_postgres_db_list.remove(materialized_database)
|
||||
|
||||
@ -269,15 +269,28 @@ class PostgresManager:
|
||||
def create_postgres_table(
|
||||
self, table_name, database_name="", template=postgres_table_template
|
||||
):
|
||||
create_postgres_table(
|
||||
self.cursor, table_name, database_name=database_name, template=template
|
||||
)
|
||||
database_name = self.database_or_default(database_name)
|
||||
cursor = self.cursor
|
||||
if database_name != self.get_default_database:
|
||||
try:
|
||||
self.create_postgres_db(database_name)
|
||||
except:
|
||||
# postgres does not support create database if not exists
|
||||
pass
|
||||
conn = get_postgres_conn(
|
||||
ip=self.ip,
|
||||
port=self.port,
|
||||
database=True,
|
||||
database_name=database_name,
|
||||
)
|
||||
cursor = conn.cursor()
|
||||
create_postgres_table(cursor, table_name, template=template)
|
||||
|
||||
def create_and_fill_postgres_table(self, table_name, database_name=""):
|
||||
create_postgres_table(self.cursor, table_name, database_name)
|
||||
database_name = self.database_or_default(database_name)
|
||||
self.create_postgres_table(table_name, database_name)
|
||||
self.instance.query(
|
||||
f"INSERT INTO {database_name}.{table_name} SELECT number, number from numbers(50)"
|
||||
f"INSERT INTO `{database_name}`.`{table_name}` SELECT number, number from numbers(50)"
|
||||
)
|
||||
|
||||
def create_and_fill_postgres_tables(
|
||||
@ -289,11 +302,11 @@ class PostgresManager:
|
||||
):
|
||||
for i in range(tables_num):
|
||||
table_name = f"{table_name_base}_{i}"
|
||||
create_postgres_table(self.cursor, table_name, database_name)
|
||||
self.create_postgres_table(table_name, database_name)
|
||||
if numbers > 0:
|
||||
db = self.database_or_default(database_name)
|
||||
self.instance.query(
|
||||
f"INSERT INTO {db}.{table_name} SELECT number, number from numbers({numbers})"
|
||||
f"INSERT INTO `{db}`.{table_name} SELECT number, number from numbers({numbers})"
|
||||
)
|
||||
|
||||
|
||||
@ -329,11 +342,11 @@ def assert_nested_table_is_created(
|
||||
table = schema_name + "." + table_name
|
||||
|
||||
print(f"Checking table {table} exists in {materialized_database}")
|
||||
database_tables = instance.query(f"SHOW TABLES FROM {materialized_database}")
|
||||
database_tables = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||
|
||||
while table not in database_tables:
|
||||
time.sleep(0.2)
|
||||
database_tables = instance.query(f"SHOW TABLES FROM {materialized_database}")
|
||||
database_tables = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||
|
||||
assert table in database_tables
|
||||
|
||||
@ -366,9 +379,9 @@ def check_tables_are_synchronized(
|
||||
|
||||
table_path = ""
|
||||
if len(schema_name) == 0:
|
||||
table_path = f"{materialized_database}.{table_name}"
|
||||
table_path = f"`{materialized_database}`.`{table_name}`"
|
||||
else:
|
||||
table_path = f"{materialized_database}.`{schema_name}.{table_name}`"
|
||||
table_path = f"`{materialized_database}`.`{schema_name}.{table_name}`"
|
||||
|
||||
print(f"Checking table is synchronized: {table_path}")
|
||||
result_query = f"select * from {table_path} order by {order_by};"
|
||||
@ -403,4 +416,9 @@ def check_several_tables_are_synchronized(
|
||||
schema_name="",
|
||||
):
|
||||
for i in range(tables_num):
|
||||
check_tables_are_synchronized(instance, f"postgresql_replica_{i}")
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
f"postgresql_replica_{i}",
|
||||
postgres_database=postgres_database,
|
||||
materialized_database=materialized_database,
|
||||
)
|
||||
|
@ -1097,6 +1097,103 @@ def test_dependent_loading(started_cluster):
|
||||
instance.query(f"DROP TABLE {table} SYNC")
|
||||
|
||||
|
||||
def test_quoting_publication(started_cluster):
|
||||
NUM_TABLES = 5
|
||||
postgres_database = "postgres-postgres"
|
||||
materialized_database = "test-database"
|
||||
|
||||
pg_manager3.create_and_fill_postgres_tables(NUM_TABLES, 10000)
|
||||
|
||||
check_table_name_1 = "postgresql-replica-5"
|
||||
pg_manager3.create_and_fill_postgres_table(check_table_name_1)
|
||||
|
||||
pg_manager3.create_materialized_db(
|
||||
ip=started_cluster.postgres_ip,
|
||||
port=started_cluster.postgres_port,
|
||||
materialized_database=materialized_database,
|
||||
)
|
||||
check_several_tables_are_synchronized(
|
||||
instance,
|
||||
NUM_TABLES,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
|
||||
result = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||
assert (
|
||||
result
|
||||
== "postgresql-replica-5\npostgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
||||
)
|
||||
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
check_table_name_1,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
instance.query(
|
||||
f"INSERT INTO `{postgres_database}`.`{check_table_name_1}` SELECT number, number from numbers(10000, 10000)"
|
||||
)
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
check_table_name_1,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
|
||||
check_table_name_2 = "postgresql-replica-6"
|
||||
pg_manager3.create_and_fill_postgres_table(check_table_name_2)
|
||||
|
||||
instance.query(f"ATTACH TABLE `{materialized_database}`.`{check_table_name_2}`")
|
||||
|
||||
result = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||
assert (
|
||||
result
|
||||
== "postgresql-replica-5\npostgresql-replica-6\npostgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
||||
)
|
||||
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
check_table_name_2,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
instance.query(
|
||||
f"INSERT INTO `{postgres_database}`.`{check_table_name_2}` SELECT number, number from numbers(10000, 10000)"
|
||||
)
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
check_table_name_2,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
|
||||
instance.restart_clickhouse()
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
check_table_name_1,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
check_tables_are_synchronized(
|
||||
instance,
|
||||
check_table_name_2,
|
||||
materialized_database=materialized_database,
|
||||
postgres_database=postgres_database,
|
||||
)
|
||||
|
||||
instance.query(
|
||||
f"DETACH TABLE `{materialized_database}`.`{check_table_name_2}` PERMANENTLY"
|
||||
)
|
||||
time.sleep(5)
|
||||
|
||||
result = instance.query(f"SHOW TABLES FROM `{materialized_database}`")
|
||||
assert (
|
||||
result
|
||||
== "postgresql-replica-5\npostgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
Loading…
Reference in New Issue
Block a user