From 93fc604223d205d1fc6e9ad5d6f65e9310bf260a Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 3 Apr 2022 18:39:14 +0200 Subject: [PATCH 1/5] Fix --- .../test_storage_postgresql_replica/test.py | 76 +++++++++---------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index e51a9335a65..fe8b92ea606 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -24,25 +24,25 @@ postgres_table_template = """ """ queries = [ - "INSERT INTO postgresql_replica select i, i from generate_series(0, 10000) as t(i);", - "DELETE FROM postgresql_replica WHERE (value*value) % 3 = 0;", - "UPDATE postgresql_replica SET value = value + 125 WHERE key % 2 = 0;", - "UPDATE postgresql_replica SET key=key+20000 WHERE key%2=0", - "INSERT INTO postgresql_replica select i, i from generate_series(40000, 50000) as t(i);", - "DELETE FROM postgresql_replica WHERE key % 10 = 0;", - "UPDATE postgresql_replica SET value = value + 101 WHERE key % 2 = 1;", - "UPDATE postgresql_replica SET key=key+80000 WHERE key%2=1", - "DELETE FROM postgresql_replica WHERE value % 2 = 0;", - "UPDATE postgresql_replica SET value = value + 2000 WHERE key % 5 = 0;", - "INSERT INTO postgresql_replica select i, i from generate_series(200000, 250000) as t(i);", - "DELETE FROM postgresql_replica WHERE value % 3 = 0;", - "UPDATE postgresql_replica SET value = value * 2 WHERE key % 3 = 0;", - "UPDATE postgresql_replica SET key=key+500000 WHERE key%2=1", - "INSERT INTO postgresql_replica select i, i from generate_series(1000000, 1050000) as t(i);", - "DELETE FROM postgresql_replica WHERE value % 9 = 2;", - "UPDATE postgresql_replica SET key=key+10000000", - "UPDATE postgresql_replica SET value = value + 2 WHERE key % 3 = 1;", - "DELETE FROM postgresql_replica WHERE value%5 = 0;", + "INSERT INTO {} select i, i from generate_series(0, 10000) as t(i);", + "DELETE FROM {} WHERE (value*value) % 3 = 0;", + "UPDATE {} SET value = value + 125 WHERE key % 2 = 0;", + "UPDATE {} SET key=key+20000 WHERE key%2=0", + "INSERT INTO {} select i, i from generate_series(40000, 50000) as t(i);", + "DELETE FROM {} WHERE key % 10 = 0;", + "UPDATE {} SET value = value + 101 WHERE key % 2 = 1;", + "UPDATE {} SET key=key+80000 WHERE key%2=1", + "DELETE FROM {} WHERE value % 2 = 0;", + "UPDATE {} SET value = value + 2000 WHERE key % 5 = 0;", + "INSERT INTO {} select i, i from generate_series(200000, 250000) as t(i);", + "DELETE FROM {} WHERE value % 3 = 0;", + "UPDATE {} SET value = value * 2 WHERE key % 3 = 0;", + "UPDATE {} SET key=key+500000 WHERE key%2=1", + "INSERT INTO {} select i, i from generate_series(1000000, 1050000) as t(i);", + "DELETE FROM {} WHERE value % 9 = 2;", + "UPDATE {} SET key=key+10000000", + "UPDATE {} SET value = value + 2 WHERE key % 3 = 1;", + "DELETE FROM {} WHERE value%5 = 0;", ] @@ -103,15 +103,13 @@ def create_clickhouse_postgres_db(ip, port, name="postgres_database"): ) -def create_materialized_table(ip, port): +def create_materialized_table(ip, port, table_name='postgresql_replica'): instance.query( - """ - CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) + f""" + CREATE TABLE test.{table_name} (key UInt64, value UInt64) ENGINE = MaterializedPostgreSQL( - '{}:{}', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') - PRIMARY KEY key; """.format( - ip, port - ) + '{ip}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword') + PRIMARY KEY key; """ ) @@ -669,17 +667,18 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster): database=True, ) cursor = conn.cursor() - create_postgres_table(cursor, "postgresql_replica") + table_name = "postgresql_replica" + create_postgres_table(cursor, table_name) - instance.query("DROP TABLE IF EXISTS test.postgresql_replica") + instance.query(f"DROP TABLE IF EXISTS test.{table_name}") create_materialized_table( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) for i in range(len(queries)): - query = queries[i] + query = queries[i].format(table_name) cursor.execute(query) - print("query {}".format(query)) + print("query {}".format(query.format(table_name))) started_cluster.pause_container("postgres1") @@ -701,25 +700,26 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster): database=True, ) cursor = conn.cursor() - create_postgres_table(cursor, "postgresql_replica") + table_name = "postgresql_replica_697" + create_postgres_table(cursor, table_name) - instance.query("DROP TABLE IF EXISTS test.postgresql_replica") + instance.query(f"DROP TABLE IF EXISTS test.{table_name}") create_materialized_table( - ip=started_cluster.postgres_ip, port=started_cluster.postgres_port + ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name ) for query in queries: - cursor.execute(query) - print("query {}".format(query)) + cursor.execute(query.format(table_name)) + print("query {}".format(query.format(table_name))) instance.restart_clickhouse() - result = instance.query("SELECT count() FROM test.postgresql_replica") + result = instance.query(f"SELECT count() FROM test.{table_name}") print(result) # Just debug - check_tables_are_synchronized("postgresql_replica") + check_tables_are_synchronized(table_name) - result = instance.query("SELECT count() FROM test.postgresql_replica") + result = instance.query(f"SELECT count() FROM test.{table_name}") print(result) # Just debug From f4bfa0fea3ab02401df49b676f2f1f92c0a60f82 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 5 Apr 2022 19:01:41 +0200 Subject: [PATCH 2/5] Update test.py --- .../test_storage_postgresql_replica/test.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index fe8b92ea606..8cc4b2049a2 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -106,7 +106,7 @@ def create_clickhouse_postgres_db(ip, port, name="postgres_database"): def create_materialized_table(ip, port, table_name='postgresql_replica'): instance.query( f""" - CREATE TABLE test.{table_name} (key UInt64, value UInt64) + CREATE TABLE test.{table_name} (key Int64, value Int64) ENGINE = MaterializedPostgreSQL( '{ip}:{port}', 'postgres_database', '{table_name}', 'postgres', 'mysecretpassword') PRIMARY KEY key; """ @@ -703,11 +703,21 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster): table_name = "postgresql_replica_697" create_postgres_table(cursor, table_name) + instance.query( + f"INSERT INTO postgres_database.{table_name} SELECT -1, 1" + ) instance.query(f"DROP TABLE IF EXISTS test.{table_name}") create_materialized_table( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name ) + n = 1 + while int(instance.query(f"select count() from test.{table_name}")) != 1: + sleep(1) + n += 1 + if n > 10: + break; + for query in queries: cursor.execute(query.format(table_name)) print("query {}".format(query.format(table_name))) From 06ff47f72f4e634b35a48122efdfea4122682531 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 6 Apr 2022 10:36:54 +0200 Subject: [PATCH 3/5] Add drops --- .../test_storage_postgresql_replica/test.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index 8cc4b2049a2..d2ac5d1a109 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -174,6 +174,7 @@ def test_initial_load_from_snapshot(started_cluster): cursor.execute("DROP TABLE postgresql_replica;") postgresql_replica_check_result(result, True) + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") @pytest.mark.timeout(320) @@ -210,6 +211,7 @@ def test_no_connection_at_startup(started_cluster): result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;") cursor.execute("DROP TABLE postgresql_replica;") postgresql_replica_check_result(result, True) + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") @pytest.mark.timeout(320) @@ -248,6 +250,7 @@ def test_detach_attach_is_ok(started_cluster): cursor.execute("DROP TABLE postgresql_replica;") postgresql_replica_check_result(result, True) + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") @pytest.mark.timeout(320) @@ -301,6 +304,7 @@ def test_replicating_insert_queries(started_cluster): result = instance.query("SELECT * FROM test.postgresql_replica ORDER BY key;") cursor.execute("DROP TABLE postgresql_replica;") postgresql_replica_check_result(result, True) + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") @pytest.mark.timeout(320) @@ -657,6 +661,7 @@ def test_virtual_columns(started_cluster): ) print(result) cursor.execute("DROP TABLE postgresql_replica;") + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") def test_abrupt_connection_loss_while_heavy_replication(started_cluster): @@ -691,6 +696,7 @@ def test_abrupt_connection_loss_while_heavy_replication(started_cluster): result = instance.query("SELECT count() FROM test.postgresql_replica") print(result) # Just debug + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") def test_abrupt_server_restart_while_heavy_replication(started_cluster): @@ -706,7 +712,7 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster): instance.query( f"INSERT INTO postgres_database.{table_name} SELECT -1, 1" ) - instance.query(f"DROP TABLE IF EXISTS test.{table_name}") + instance.query(f"DROP TABLE IF EXISTS test.{table_name} NO DELAY") create_materialized_table( ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name ) @@ -731,6 +737,7 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster): result = instance.query(f"SELECT count() FROM test.{table_name}") print(result) # Just debug + instance.query(f"DROP TABLE test.{table_name} NO DELAY") def test_drop_table_immediately(started_cluster): @@ -754,7 +761,7 @@ def test_drop_table_immediately(started_cluster): ip=started_cluster.postgres_ip, port=started_cluster.postgres_port ) check_tables_are_synchronized("postgresql_replica") - instance.query("DROP TABLE test.postgresql_replica") + instance.query(f"DROP TABLE test.postgresql_replica NO DELAY") if __name__ == "__main__": From 846faa51d808ab4bbd92176a6f857916d6ec044b Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 6 Apr 2022 14:35:19 +0200 Subject: [PATCH 4/5] Fix bug --- src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp index d12e91f62e4..a126478857b 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp @@ -246,8 +246,9 @@ void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPt return; replication_handler->shutdownFinal(); + replication_handler.reset(); - auto nested_table = getNested(); + auto nested_table = tryGetNested() != nullptr; if (nested_table) InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), no_delay); } From fc3e3251b97f24b451773f74dafb66486ca2285b Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 8 Apr 2022 13:50:24 +0200 Subject: [PATCH 5/5] Fix --- .../test_storage_postgresql_replica/test.py | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index d2ac5d1a109..aca33816d75 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -50,20 +50,17 @@ queries = [ def check_tables_are_synchronized( table_name, order_by="key", postgres_database="postgres_database" ): - expected = instance.query( - "select * from {}.{} order by {};".format( - postgres_database, table_name, order_by + while True: + expected = instance.query( + "select * from {}.{} order by {};".format( + postgres_database, table_name, order_by + ) ) - ) - result = instance.query( - "select * from test.{} order by {};".format(table_name, order_by) - ) - - while result != expected: - time.sleep(0.5) result = instance.query( "select * from test.{} order by {};".format(table_name, order_by) ) + if result == expected: + break assert result == expected @@ -103,7 +100,7 @@ def create_clickhouse_postgres_db(ip, port, name="postgres_database"): ) -def create_materialized_table(ip, port, table_name='postgresql_replica'): +def create_materialized_table(ip, port, table_name="postgresql_replica"): instance.query( f""" CREATE TABLE test.{table_name} (key Int64, value Int64) @@ -709,12 +706,12 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster): table_name = "postgresql_replica_697" create_postgres_table(cursor, table_name) - instance.query( - f"INSERT INTO postgres_database.{table_name} SELECT -1, 1" - ) + instance.query(f"INSERT INTO postgres_database.{table_name} SELECT -1, 1") instance.query(f"DROP TABLE IF EXISTS test.{table_name} NO DELAY") create_materialized_table( - ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, table_name=table_name + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + table_name=table_name, ) n = 1 @@ -722,7 +719,7 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster): sleep(1) n += 1 if n > 10: - break; + break for query in queries: cursor.execute(query.format(table_name))