mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #35885 from kssenii/fix-postgres-test-2
fix postgres test
This commit is contained in:
commit
1e54c5c57b
@ -246,8 +246,9 @@ void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPt
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
replication_handler->shutdownFinal();
|
replication_handler->shutdownFinal();
|
||||||
|
replication_handler.reset();
|
||||||
|
|
||||||
auto nested_table = getNested();
|
auto nested_table = tryGetNested() != nullptr;
|
||||||
if (nested_table)
|
if (nested_table)
|
||||||
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), no_delay);
|
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), no_delay);
|
||||||
}
|
}
|
||||||
|
@ -24,25 +24,25 @@ postgres_table_template = """
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
queries = [
|
queries = [
|
||||||
"INSERT INTO postgresql_replica select i, i from generate_series(0, 10000) as t(i);",
|
"INSERT INTO {} select i, i from generate_series(0, 10000) as t(i);",
|
||||||
"DELETE FROM postgresql_replica WHERE (value*value) % 3 = 0;",
|
"DELETE FROM {} WHERE (value*value) % 3 = 0;",
|
||||||
"UPDATE postgresql_replica SET value = value + 125 WHERE key % 2 = 0;",
|
"UPDATE {} SET value = value + 125 WHERE key % 2 = 0;",
|
||||||
"UPDATE postgresql_replica SET key=key+20000 WHERE key%2=0",
|
"UPDATE {} SET key=key+20000 WHERE key%2=0",
|
||||||
"INSERT INTO postgresql_replica select i, i from generate_series(40000, 50000) as t(i);",
|
"INSERT INTO {} select i, i from generate_series(40000, 50000) as t(i);",
|
||||||
"DELETE FROM postgresql_replica WHERE key % 10 = 0;",
|
"DELETE FROM {} WHERE key % 10 = 0;",
|
||||||
"UPDATE postgresql_replica SET value = value + 101 WHERE key % 2 = 1;",
|
"UPDATE {} SET value = value + 101 WHERE key % 2 = 1;",
|
||||||
"UPDATE postgresql_replica SET key=key+80000 WHERE key%2=1",
|
"UPDATE {} SET key=key+80000 WHERE key%2=1",
|
||||||
"DELETE FROM postgresql_replica WHERE value % 2 = 0;",
|
"DELETE FROM {} WHERE value % 2 = 0;",
|
||||||
"UPDATE postgresql_replica SET value = value + 2000 WHERE key % 5 = 0;",
|
"UPDATE {} SET value = value + 2000 WHERE key % 5 = 0;",
|
||||||
"INSERT INTO postgresql_replica select i, i from generate_series(200000, 250000) as t(i);",
|
"INSERT INTO {} select i, i from generate_series(200000, 250000) as t(i);",
|
||||||
"DELETE FROM postgresql_replica WHERE value % 3 = 0;",
|
"DELETE FROM {} WHERE value % 3 = 0;",
|
||||||
"UPDATE postgresql_replica SET value = value * 2 WHERE key % 3 = 0;",
|
"UPDATE {} SET value = value * 2 WHERE key % 3 = 0;",
|
||||||
"UPDATE postgresql_replica SET key=key+500000 WHERE key%2=1",
|
"UPDATE {} SET key=key+500000 WHERE key%2=1",
|
||||||
"INSERT INTO postgresql_replica select i, i from generate_series(1000000, 1050000) as t(i);",
|
"INSERT INTO {} select i, i from generate_series(1000000, 1050000) as t(i);",
|
||||||
"DELETE FROM postgresql_replica WHERE value % 9 = 2;",
|
"DELETE FROM {} WHERE value % 9 = 2;",
|
||||||
"UPDATE postgresql_replica SET key=key+10000000",
|
"UPDATE {} SET key=key+10000000",
|
||||||
"UPDATE postgresql_replica SET value = value + 2 WHERE key % 3 = 1;",
|
"UPDATE {} SET value = value + 2 WHERE key % 3 = 1;",
|
||||||
"DELETE FROM postgresql_replica WHERE value%5 = 0;",
|
"DELETE FROM {} WHERE value%5 = 0;",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@ -50,6 +50,7 @@ queries = [
|
|||||||
def check_tables_are_synchronized(
|
def check_tables_are_synchronized(
|
||||||
table_name, order_by="key", postgres_database="postgres_database"
|
table_name, order_by="key", postgres_database="postgres_database"
|
||||||
):
|
):
|
||||||
|
while True:
|
||||||
expected = instance.query(
|
expected = instance.query(
|
||||||
"select * from {}.{} order by {};".format(
|
"select * from {}.{} order by {};".format(
|
||||||
postgres_database, table_name, order_by
|
postgres_database, table_name, order_by
|
||||||
@ -58,12 +59,8 @@ def check_tables_are_synchronized(
|
|||||||
result = instance.query(
|
result = instance.query(
|
||||||
"select * from test.{} order by {};".format(table_name, order_by)
|
"select * from test.{} order by {};".format(table_name, order_by)
|
||||||
)
|
)
|
||||||
|
if result == expected:
|
||||||
while result != expected:
|
break
|
||||||
time.sleep(0.5)
|
|
||||||
result = instance.query(
|
|
||||||
"select * from test.{} order by {};".format(table_name, order_by)
|
|
||||||
)
|
|
||||||
|
|
||||||
assert result == expected
|
assert result == expected
|
||||||
|
|
||||||
@ -103,15 +100,13 @@ def create_clickhouse_postgres_db(ip, port, name="postgres_database"):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_materialized_table(ip, port):
|
def create_materialized_table(ip, port, table_name="postgresql_replica"):
|
||||||
instance.query(
|
instance.query(
|
||||||
"""
|
f"""
|
||||||
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
|
CREATE TABLE test.{table_name} (key Int64, value Int64)
|
||||||
ENGINE = MaterializedPostgreSQL(
|
ENGINE = MaterializedPostgreSQL(
|
||||||
'{}:{}', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
|
'{ip}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword')
|
||||||
PRIMARY KEY key; """.format(
|
PRIMARY KEY key; """
|
||||||
ip, port
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -176,6 +171,7 @@ def test_initial_load_from_snapshot(started_cluster):
|
|||||||
|
|
||||||
cursor.execute("DROP TABLE postgresql_replica;")
|
cursor.execute("DROP TABLE postgresql_replica;")
|
||||||
postgresql_replica_check_result(result, True)
|
postgresql_replica_check_result(result, True)
|
||||||
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(320)
|
@pytest.mark.timeout(320)
|
||||||
@ -212,6 +208,7 @@ def test_no_connection_at_startup(started_cluster):
|
|||||||
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
||||||
cursor.execute("DROP TABLE postgresql_replica;")
|
cursor.execute("DROP TABLE postgresql_replica;")
|
||||||
postgresql_replica_check_result(result, True)
|
postgresql_replica_check_result(result, True)
|
||||||
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(320)
|
@pytest.mark.timeout(320)
|
||||||
@ -250,6 +247,7 @@ def test_detach_attach_is_ok(started_cluster):
|
|||||||
|
|
||||||
cursor.execute("DROP TABLE postgresql_replica;")
|
cursor.execute("DROP TABLE postgresql_replica;")
|
||||||
postgresql_replica_check_result(result, True)
|
postgresql_replica_check_result(result, True)
|
||||||
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(320)
|
@pytest.mark.timeout(320)
|
||||||
@ -303,6 +301,7 @@ def test_replicating_insert_queries(started_cluster):
|
|||||||
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;")
|
||||||
cursor.execute("DROP TABLE postgresql_replica;")
|
cursor.execute("DROP TABLE postgresql_replica;")
|
||||||
postgresql_replica_check_result(result, True)
|
postgresql_replica_check_result(result, True)
|
||||||
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(320)
|
@pytest.mark.timeout(320)
|
||||||
@ -659,6 +658,7 @@ def test_virtual_columns(started_cluster):
|
|||||||
)
|
)
|
||||||
print(result)
|
print(result)
|
||||||
cursor.execute("DROP TABLE postgresql_replica;")
|
cursor.execute("DROP TABLE postgresql_replica;")
|
||||||
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
||||||
@ -669,17 +669,18 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
|||||||
database=True,
|
database=True,
|
||||||
)
|
)
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
create_postgres_table(cursor, "postgresql_replica")
|
table_name = "postgresql_replica"
|
||||||
|
create_postgres_table(cursor, table_name)
|
||||||
|
|
||||||
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
instance.query(f"DROP TABLE IF EXISTS test.{table_name}")
|
||||||
create_materialized_table(
|
create_materialized_table(
|
||||||
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
||||||
)
|
)
|
||||||
|
|
||||||
for i in range(len(queries)):
|
for i in range(len(queries)):
|
||||||
query = queries[i]
|
query = queries[i].format(table_name)
|
||||||
cursor.execute(query)
|
cursor.execute(query)
|
||||||
print("query {}".format(query))
|
print("query {}".format(query.format(table_name)))
|
||||||
|
|
||||||
started_cluster.pause_container("postgres1")
|
started_cluster.pause_container("postgres1")
|
||||||
|
|
||||||
@ -692,6 +693,7 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster):
|
|||||||
|
|
||||||
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
||||||
print(result) # Just debug
|
print(result) # Just debug
|
||||||
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
||||||
@ -701,26 +703,38 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
|
|||||||
database=True,
|
database=True,
|
||||||
)
|
)
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
create_postgres_table(cursor, "postgresql_replica")
|
table_name = "postgresql_replica_697"
|
||||||
|
create_postgres_table(cursor, table_name)
|
||||||
|
|
||||||
instance.query("DROP TABLE IF EXISTS test.postgresql_replica")
|
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT -1, 1")
|
||||||
|
instance.query(f"DROP TABLE IF EXISTS test.{table_name} NO DELAY")
|
||||||
create_materialized_table(
|
create_materialized_table(
|
||||||
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
table_name=table_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
n = 1
|
||||||
|
while int(instance.query(f"select count() from test.{table_name}")) != 1:
|
||||||
|
sleep(1)
|
||||||
|
n += 1
|
||||||
|
if n > 10:
|
||||||
|
break
|
||||||
|
|
||||||
for query in queries:
|
for query in queries:
|
||||||
cursor.execute(query)
|
cursor.execute(query.format(table_name))
|
||||||
print("query {}".format(query))
|
print("query {}".format(query.format(table_name)))
|
||||||
|
|
||||||
instance.restart_clickhouse()
|
instance.restart_clickhouse()
|
||||||
|
|
||||||
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
result = instance.query(f"SELECT count() FROM test.{table_name}")
|
||||||
print(result) # Just debug
|
print(result) # Just debug
|
||||||
|
|
||||||
check_tables_are_synchronized("postgresql_replica")
|
check_tables_are_synchronized(table_name)
|
||||||
|
|
||||||
result = instance.query("SELECT count() FROM test.postgresql_replica")
|
result = instance.query(f"SELECT count() FROM test.{table_name}")
|
||||||
print(result) # Just debug
|
print(result) # Just debug
|
||||||
|
instance.query(f"DROP TABLE test.{table_name} NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
def test_drop_table_immediately(started_cluster):
|
def test_drop_table_immediately(started_cluster):
|
||||||
@ -744,7 +758,7 @@ def test_drop_table_immediately(started_cluster):
|
|||||||
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
ip=started_cluster.postgres_ip, port=started_cluster.postgres_port
|
||||||
)
|
)
|
||||||
check_tables_are_synchronized("postgresql_replica")
|
check_tables_are_synchronized("postgresql_replica")
|
||||||
instance.query("DROP TABLE test.postgresql_replica")
|
instance.query(f"DROP TABLE test.postgresql_replica NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user