From 4827b8bb1c7a77e50912ab40d5c009c43d20f6ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Wed, 14 Aug 2024 14:56:02 +0000 Subject: [PATCH] Make S3Queue tests repeatable --- .../integration/test_storage_s3_queue/test.py | 97 ++++++++++++++----- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index ff723d0792a..08a8a7cac81 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -1,6 +1,7 @@ import io import logging import random +import string import time import pytest @@ -267,6 +268,10 @@ def create_mv( ) +def generate_random_string(length=6): + return "".join(random.choice(string.ascii_lowercase) for i in range(length)) + + @pytest.mark.parametrize("mode", ["unordered", "ordered"]) @pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"]) def test_delete_after_processing(started_cluster, mode, engine_name): @@ -276,6 +281,8 @@ def test_delete_after_processing(started_cluster, mode, engine_name): files_path = f"{table_name}_data" files_num = 5 row_num = 10 + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" if engine_name == "S3Queue": storage = "s3" else: @@ -290,7 +297,7 @@ def test_delete_after_processing(started_cluster, mode, engine_name): table_name, mode, files_path, - additional_settings={"after_processing": "delete"}, + additional_settings={"after_processing": "delete", "keeper_path": keeper_path}, engine_name=engine_name, ) create_mv(node, table_name, dst_table_name) @@ -333,7 +340,8 @@ def test_failed_retry(started_cluster, mode, engine_name): dst_table_name = f"{table_name}_dst" files_path = f"{table_name}_data" file_path = f"{files_path}/trash_test.csv" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" retries_num = 3 values = [ @@ -391,7 +399,8 @@ def test_failed_retry(started_cluster, mode, engine_name): def test_direct_select_file(started_cluster, mode): node = started_cluster.instances["instance"] table_name = f"test.direct_select_file_{mode}" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" file_path = f"{files_path}/test.csv" @@ -496,8 +505,17 @@ def test_direct_select_multiple_files(started_cluster, mode): node = started_cluster.instances["instance"] table_name = f"direct_select_multiple_files_{mode}" files_path = f"{table_name}_data" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" - create_table(started_cluster, node, table_name, mode, files_path) + create_table( + started_cluster, + node, + table_name, + mode, + files_path, + additional_settings={"keeper_path": keeper_path}, + ) for i in range(5): rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)] values_csv = ( @@ -520,14 +538,23 @@ def test_direct_select_multiple_files(started_cluster, mode): @pytest.mark.parametrize("mode", AVAILABLE_MODES) -def test_streaming_to_view_(started_cluster, mode): +def test_streaming_to_view(started_cluster, mode): node = started_cluster.instances["instance"] table_name = f"streaming_to_view_{mode}" dst_table_name = f"{table_name}_dst" files_path = f"{table_name}_data" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" total_values = generate_random_files(started_cluster, files_path, 10) - create_table(started_cluster, node, table_name, mode, files_path) + create_table( + started_cluster, + node, + table_name, + mode, + files_path, + additional_settings={"keeper_path": keeper_path}, + ) create_mv(node, table_name, dst_table_name) expected_values = set([tuple(i) for i in total_values]) @@ -549,7 +576,8 @@ def test_streaming_to_many_views(started_cluster, mode): node = started_cluster.instances["instance"] table_name = f"streaming_to_many_views_{mode}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" for i in range(3): @@ -587,7 +615,8 @@ def test_streaming_to_many_views(started_cluster, mode): def test_multiple_tables_meta_mismatch(started_cluster): node = started_cluster.instances["instance"] table_name = f"multiple_tables_meta_mismatch" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" create_table( @@ -680,7 +709,8 @@ def test_multiple_tables_streaming_sync(started_cluster, mode): node = started_cluster.instances["instance"] table_name = f"multiple_tables_streaming_sync_{mode}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 300 @@ -763,7 +793,8 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): node_2 = started_cluster.instances["instance2"] table_name = f"multiple_tables_streaming_sync_distributed_{mode}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 300 row_num = 50 @@ -838,7 +869,8 @@ def test_max_set_age(started_cluster): node = started_cluster.instances["instance"] table_name = "max_set_age" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" max_age = 20 files_to_generate = 10 @@ -949,10 +981,9 @@ def test_max_set_age(started_cluster): def test_max_set_size(started_cluster): node = started_cluster.instances["instance"] table_name = f"max_set_size" - dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" - max_age = 10 files_to_generate = 10 create_table( @@ -996,7 +1027,8 @@ def test_drop_table(started_cluster): node = started_cluster.instances["instance"] table_name = f"test_drop" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 300 @@ -1029,6 +1061,8 @@ def test_s3_client_reused(started_cluster): table_name = f"test.test_s3_client_reused" dst_table_name = f"{table_name}_dst" files_path = f"{table_name}_data" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" row_num = 10 def get_created_s3_clients_count(): @@ -1062,6 +1096,7 @@ def test_s3_client_reused(started_cluster): additional_settings={ "after_processing": "delete", "s3queue_processing_threads_num": 1, + "keeper_path": keeper_path, }, auth=NO_AUTH, bucket=started_cluster.minio_public_bucket, @@ -1119,7 +1154,8 @@ def test_processing_threads(started_cluster, mode): node = started_cluster.instances["instance"] table_name = f"processing_threads_{mode}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 300 processing_threads = 32 @@ -1186,7 +1222,8 @@ def test_shards(started_cluster, mode, processing_threads): node = started_cluster.instances["instance"] table_name = f"test_shards_{mode}_{processing_threads}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 300 shards_num = 3 @@ -1313,7 +1350,8 @@ def test_shards_distributed(started_cluster, mode, processing_threads): node_2 = started_cluster.instances["instance2"] table_name = f"test_shards_distributed_{mode}_{processing_threads}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 300 row_num = 300 @@ -1466,8 +1504,8 @@ def test_settings_check(started_cluster): node = started_cluster.instances["instance"] node_2 = started_cluster.instances["instance2"] table_name = f"test_settings_check" - dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" mode = "ordered" @@ -1509,7 +1547,10 @@ def test_processed_file_setting(started_cluster, processing_threads): node = started_cluster.instances["instance"] table_name = f"test_processed_file_setting_{processing_threads}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}_{processing_threads}" + # A unique path is necessary for repeatable tests + keeper_path = ( + f"/clickhouse/test_{table_name}_{processing_threads}_{generate_random_string()}" + ) files_path = f"{table_name}_data" files_to_generate = 10 @@ -1560,7 +1601,10 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads) node_2 = started_cluster.instances["instance2"] table_name = f"test_processed_file_setting_distributed_{processing_threads}" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = ( + f"/clickhouse/test_{table_name}_{processing_threads}_{generate_random_string()}" + ) files_path = f"{table_name}_data" files_to_generate = 10 @@ -1614,7 +1658,8 @@ def test_upgrade(started_cluster): table_name = f"test_upgrade" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 10 @@ -1655,7 +1700,8 @@ def test_exception_during_insert(started_cluster): table_name = f"test_exception_during_insert" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 10 @@ -1708,7 +1754,8 @@ def test_commit_on_limit(started_cluster): table_name = f"test_commit_on_limit" dst_table_name = f"{table_name}_dst" - keeper_path = f"/clickhouse/test_{table_name}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" files_path = f"{table_name}_data" files_to_generate = 10