mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
Fix
This commit is contained in:
parent
b56beeca9d
commit
93fc604223
@ -24,25 +24,25 @@ postgres_table_template = """
|
||||
"""
|
||||
|
||||
queries = [
|
||||
"INSERT INTO postgresql_replica select i, i from generate_series(0, 10000) as t(i);",
|
||||
"DELETE FROM postgresql_replica WHERE (value*value) % 3 = 0;",
|
||||
"UPDATE postgresql_replica SET value = value + 125 WHERE key % 2 = 0;",
|
||||
"UPDATE postgresql_replica SET key=key+20000 WHERE key%2=0",
|
||||
"INSERT INTO postgresql_replica select i, i from generate_series(40000, 50000) as t(i);",
|
||||
"DELETE FROM postgresql_replica WHERE key % 10 = 0;",
|
||||
"UPDATE postgresql_replica SET value = value + 101 WHERE key % 2 = 1;",
|
||||
"UPDATE postgresql_replica SET key=key+80000 WHERE key%2=1",
|
||||
"DELETE FROM postgresql_replica WHERE value % 2 = 0;",
|
||||
"UPDATE postgresql_replica SET value = value + 2000 WHERE key % 5 = 0;",
|
||||
"INSERT INTO postgresql_replica select i, i from generate_series(200000, 250000) as t(i);",
|
||||
"DELETE FROM postgresql_replica WHERE value % 3 = 0;",
|
||||
"UPDATE postgresql_replica SET value = value * 2 WHERE key % 3 = 0;",
|
||||
"UPDATE postgresql_replica SET key=key+500000 WHERE key%2=1",
|
||||
"INSERT INTO postgresql_replica select i, i from generate_series(1000000, 1050000) as t(i);",
|
||||
"DELETE FROM postgresql_replica WHERE value % 9 = 2;",
|
||||
"UPDATE postgresql_replica SET key=key+10000000",
|
||||
"UPDATE postgresql_replica SET value = value + 2 WHERE key % 3 = 1;",
|
||||
"DELETE FROM postgresql_replica WHERE value%5 = 0;",
|
||||
"INSERT INTO {} select i, i from generate_series(0, 10000) as t(i);",
|
||||
"DELETE FROM {} WHERE (value*value) % 3 = 0;",
|
||||
"UPDATE {} SET value = value + 125 WHERE key % 2 = 0;",
|
||||
"UPDATE {} SET key=key+20000 WHERE key%2=0",
|
||||
"INSERT INTO {} select i, i from generate_series(40000, 50000) as t(i);",
|
||||
"DELETE FROM {} WHERE key % 10 = 0;",
|
||||
"UPDATE {} SET value = value + 101 WHERE key % 2 = 1;",
|
||||
"UPDATE {} SET key=key+80000 WHERE key%2=1",
|
||||
"DELETE FROM {} WHERE value % 2 = 0;",
|
||||
"UPDATE {} SET value = value + 2000 WHERE key % 5 = 0;",
|
||||
"INSERT INTO {} select i, i from generate_series(200000, 250000) as t(i);",
|
||||
"DELETE FROM {} WHERE value % 3 = 0;",
|
||||
"UPDATE {} SET value = value * 2 WHERE key % 3 = 0;",
|
||||
"UPDATE {} SET key=key+500000 WHERE key%2=1",
|
||||
"INSERT INTO {} select i, i from generate_series(1000000, 1050000) as t(i);",
|
||||
"DELETE FROM {} WHERE value % 9 = 2;",
|
||||
"UPDATE {} SET key=key+10000000",
|
||||
"UPDATE {} SET value = value + 2 WHERE key % 3 = 1;",
|
||||
"DELETE FROM {} WHERE value%5 = 0;",
|
||||
]
|
||||
|
||||
|
||||
@ -103,15 +103,13 @@ def create_clickhouse_postgres_db(ip, port, name="postgres_database"):
|
||||
)
|
||||
|
||||
|
||||
def create_materialized_table(ip, port):
|
||||
def create_materialized_table(ip, port, table_name='postgresql_replica'):
|
||||
instance.query(
|
||||
"""
|
||||
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
|
||||
f"""
|
||||
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
|
||||
ENGINE = MaterializedPostgreSQL(
|
||||
'{}:{}', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
|
||||
PRIMARY KEY key; """.format(
|
||||
ip, port
|
||||
)
|
||||
'{ip}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword')
|
||||
PRIMARY KEY key; """
|
||||
)
|
||||
|
||||
|
||||
@ -669,17 +667,18 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
||||
database=True,
|
||||
)
|
||||
cursor = conn.cursor()
|
||||
create_postgres_table(cursor, "postgresql_replica")
|
||||
table_name = "postgresql_replica"
|
||||
create_postgres_table(cursor, table_name)
|
||||
|
||||
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
||||
instance.query(f"DROP TABLE IF EXISTS test.{table_name}")
|
||||
create_materialized_table(
|
||||
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
||||
)
|
||||
|
||||
for i in range(len(queries)):
|
||||
query = queries[i]
|
||||
query = queries[i].format(table_name)
|
||||
cursor.execute(query)
|
||||
print("query {}".format(query))
|
||||
print("query {}".format(query.format(table_name)))
|
||||
|
||||
started_cluster.pause_container("postgres1")
|
||||
|
||||
@ -701,25 +700,26 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
||||
database=True,
|
||||
)
|
||||
cursor = conn.cursor()
|
||||
create_postgres_table(cursor, "postgresql_replica")
|
||||
table_name = "postgresql_replica_697"
|
||||
create_postgres_table(cursor, table_name)
|
||||
|
||||
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
||||
instance.query(f"DROP TABLE IF EXISTS test.{table_name}")
|
||||
create_materialized_table(
|
||||
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
||||
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name
|
||||
)
|
||||
|
||||
for query in queries:
|
||||
cursor.execute(query)
|
||||
print("query {}".format(query))
|
||||
cursor.execute(query.format(table_name))
|
||||
print("query {}".format(query.format(table_name)))
|
||||
|
||||
instance.restart_clickhouse()
|
||||
|
||||
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||
result = instance.query(f"SELECT count() FROM test.{table_name}")
|
||||
print(result) # Just debug
|
||||
|
||||
check_tables_are_synchronized("postgresql_replica")
|
||||
check_tables_are_synchronized(table_name)
|
||||
|
||||
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||
result = instance.query(f"SELECT count() FROM test.{table_name}")
|
||||
print(result) # Just debug
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user