Make S3Queue tests repeatable

This commit is contained in:
János Benjamin Antal 2024-08-14 14:56:02 +00:00
parent fc9929dc3d
commit 4827b8bb1c

View File

@ -1,6 +1,7 @@
import io import io
import logging import logging
import random import random
import string
import time import time
import pytest import pytest
@ -267,6 +268,10 @@ def create_mv(
) )
def generate_random_string(length=6):
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
@pytest.mark.parametrize("mode", ["unordered", "ordered"]) @pytest.mark.parametrize("mode", ["unordered", "ordered"])
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"]) @pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
def test_delete_after_processing(started_cluster, mode, engine_name): def test_delete_after_processing(started_cluster, mode, engine_name):
@ -276,6 +281,8 @@ def test_delete_after_processing(started_cluster, mode, engine_name):
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_num = 5 files_num = 5
row_num = 10 row_num = 10
# A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
if engine_name == "S3Queue": if engine_name == "S3Queue":
storage = "s3" storage = "s3"
else: else:
@ -290,7 +297,7 @@ def test_delete_after_processing(started_cluster, mode, engine_name):
table_name, table_name,
mode, mode,
files_path, files_path,
additional_settings={"after_processing": "delete"}, additional_settings={"after_processing": "delete", "keeper_path": keeper_path},
engine_name=engine_name, engine_name=engine_name,
) )
create_mv(node, table_name, dst_table_name) create_mv(node, table_name, dst_table_name)
@ -333,7 +340,8 @@ def test_failed_retry(started_cluster, mode, engine_name):
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
file_path = f"{files_path}/trash_test.csv" file_path = f"{files_path}/trash_test.csv"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
retries_num = 3 retries_num = 3
values = [ values = [
@ -391,7 +399,8 @@ def test_failed_retry(started_cluster, mode, engine_name):
def test_direct_select_file(started_cluster, mode): def test_direct_select_file(started_cluster, mode):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"test.direct_select_file_{mode}" table_name = f"test.direct_select_file_{mode}"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
file_path = f"{files_path}/test.csv" file_path = f"{files_path}/test.csv"
@ -496,8 +505,17 @@ def test_direct_select_multiple_files(started_cluster, mode):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"direct_select_multiple_files_{mode}" table_name = f"direct_select_multiple_files_{mode}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
# A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
create_table(started_cluster, node, table_name, mode, files_path) create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={"keeper_path": keeper_path},
)
for i in range(5): for i in range(5):
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)] rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = ( values_csv = (
@ -520,14 +538,23 @@ def test_direct_select_multiple_files(started_cluster, mode):
@pytest.mark.parametrize("mode", AVAILABLE_MODES) @pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode): def test_streaming_to_view(started_cluster, mode):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"streaming_to_view_{mode}" table_name = f"streaming_to_view_{mode}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
# A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
total_values = generate_random_files(started_cluster, files_path, 10) total_values = generate_random_files(started_cluster, files_path, 10)
create_table(started_cluster, node, table_name, mode, files_path) create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={"keeper_path": keeper_path},
)
create_mv(node, table_name, dst_table_name) create_mv(node, table_name, dst_table_name)
expected_values = set([tuple(i) for i in total_values]) expected_values = set([tuple(i) for i in total_values])
@ -549,7 +576,8 @@ def test_streaming_to_many_views(started_cluster, mode):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"streaming_to_many_views_{mode}" table_name = f"streaming_to_many_views_{mode}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
for i in range(3): for i in range(3):
@ -587,7 +615,8 @@ def test_streaming_to_many_views(started_cluster, mode):
def test_multiple_tables_meta_mismatch(started_cluster): def test_multiple_tables_meta_mismatch(started_cluster):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"multiple_tables_meta_mismatch" table_name = f"multiple_tables_meta_mismatch"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
create_table( create_table(
@ -680,7 +709,8 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"multiple_tables_streaming_sync_{mode}" table_name = f"multiple_tables_streaming_sync_{mode}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 300 files_to_generate = 300
@ -763,7 +793,8 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
node_2 = started_cluster.instances["instance2"] node_2 = started_cluster.instances["instance2"]
table_name = f"multiple_tables_streaming_sync_distributed_{mode}" table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 300 files_to_generate = 300
row_num = 50 row_num = 50
@ -838,7 +869,8 @@ def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = "max_set_age" table_name = "max_set_age"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
max_age = 20 max_age = 20
files_to_generate = 10 files_to_generate = 10
@ -949,10 +981,9 @@ def test_max_set_age(started_cluster):
def test_max_set_size(started_cluster): def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"max_set_size" table_name = f"max_set_size"
dst_table_name = f"{table_name}_dst" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}" keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
max_age = 10
files_to_generate = 10 files_to_generate = 10
create_table( create_table(
@ -996,7 +1027,8 @@ def test_drop_table(started_cluster):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"test_drop" table_name = f"test_drop"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 300 files_to_generate = 300
@ -1029,6 +1061,8 @@ def test_s3_client_reused(started_cluster):
table_name = f"test.test_s3_client_reused" table_name = f"test.test_s3_client_reused"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
# A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
row_num = 10 row_num = 10
def get_created_s3_clients_count(): def get_created_s3_clients_count():
@ -1062,6 +1096,7 @@ def test_s3_client_reused(started_cluster):
additional_settings={ additional_settings={
"after_processing": "delete", "after_processing": "delete",
"s3queue_processing_threads_num": 1, "s3queue_processing_threads_num": 1,
"keeper_path": keeper_path,
}, },
auth=NO_AUTH, auth=NO_AUTH,
bucket=started_cluster.minio_public_bucket, bucket=started_cluster.minio_public_bucket,
@ -1119,7 +1154,8 @@ def test_processing_threads(started_cluster, mode):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"processing_threads_{mode}" table_name = f"processing_threads_{mode}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 300 files_to_generate = 300
processing_threads = 32 processing_threads = 32
@ -1186,7 +1222,8 @@ def test_shards(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"test_shards_{mode}_{processing_threads}" table_name = f"test_shards_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 300 files_to_generate = 300
shards_num = 3 shards_num = 3
@ -1313,7 +1350,8 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
node_2 = started_cluster.instances["instance2"] node_2 = started_cluster.instances["instance2"]
table_name = f"test_shards_distributed_{mode}_{processing_threads}" table_name = f"test_shards_distributed_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 300 files_to_generate = 300
row_num = 300 row_num = 300
@ -1466,8 +1504,8 @@ def test_settings_check(started_cluster):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"] node_2 = started_cluster.instances["instance2"]
table_name = f"test_settings_check" table_name = f"test_settings_check"
dst_table_name = f"{table_name}_dst" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}" keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
mode = "ordered" mode = "ordered"
@ -1509,7 +1547,10 @@ def test_processed_file_setting(started_cluster, processing_threads):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]
table_name = f"test_processed_file_setting_{processing_threads}" table_name = f"test_processed_file_setting_{processing_threads}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}_{processing_threads}" # A unique path is necessary for repeatable tests
keeper_path = (
f"/clickhouse/test_{table_name}_{processing_threads}_{generate_random_string()}"
)
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 10 files_to_generate = 10
@ -1560,7 +1601,10 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads)
node_2 = started_cluster.instances["instance2"] node_2 = started_cluster.instances["instance2"]
table_name = f"test_processed_file_setting_distributed_{processing_threads}" table_name = f"test_processed_file_setting_distributed_{processing_threads}"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = (
f"/clickhouse/test_{table_name}_{processing_threads}_{generate_random_string()}"
)
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 10 files_to_generate = 10
@ -1614,7 +1658,8 @@ def test_upgrade(started_cluster):
table_name = f"test_upgrade" table_name = f"test_upgrade"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 10 files_to_generate = 10
@ -1655,7 +1700,8 @@ def test_exception_during_insert(started_cluster):
table_name = f"test_exception_during_insert" table_name = f"test_exception_during_insert"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 10 files_to_generate = 10
@ -1708,7 +1754,8 @@ def test_commit_on_limit(started_cluster):
table_name = f"test_commit_on_limit" table_name = f"test_commit_on_limit"
dst_table_name = f"{table_name}_dst" dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}" # A unique path is necessary for repeatable tests
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data" files_path = f"{table_name}_data"
files_to_generate = 10 files_to_generate = 10