Use unique names for tables and files

This commit is contained in:
Pablo Marcos 2024-07-24 11:42:28 +00:00
parent 02ee0d8dd4
commit b25cad23ed

View File

@ -7,6 +7,7 @@ import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import json
from uuid import uuid4
AVAILABLE_MODES = ["unordered", "ordered"]
@ -822,7 +823,7 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_age"
table_name = f"max_set_age_{uuid4().hex}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
@ -847,11 +848,11 @@ def test_max_set_age(started_cluster):
)
create_mv(node, table_name, dst_table_name)
total_values = generate_random_files(
_ = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
expected_rows = 10
expected_rows = files_to_generate
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
@ -865,11 +866,11 @@ def test_max_set_age(started_cluster):
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
assert files_to_generate == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
time.sleep(max_age + 5)
expected_rows = 20
expected_rows *= 2
for _ in range(20):
if expected_rows == get_count():
@ -877,7 +878,7 @@ def test_max_set_age(started_cluster):
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
assert files_to_generate == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
paths_count = [
int(x)
@ -885,7 +886,7 @@ def test_max_set_age(started_cluster):
f"SELECT count() from {dst_table_name} GROUP BY _path"
).splitlines()
]
assert 10 == len(paths_count)
assert files_to_generate == len(paths_count)
for path_count in paths_count:
assert 2 == path_count
@ -901,7 +902,8 @@ def test_max_set_age(started_cluster):
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, f"{files_path}/fff.csv", values_csv)
file_with_error = f"fff_{uuid4().hex}.csv"
put_s3_file_content(started_cluster, f"{files_path}/{file_with_error}", values_csv)
for _ in range(30):
if failed_count + 1 == int(
@ -920,16 +922,17 @@ def test_max_set_age(started_cluster):
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'"
f"SELECT exception FROM system.s3queue WHERE file_name ilike '%{file_with_error}'"
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv'"
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}'"
)
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}' AND notEmpty(exception)"
)
)
@ -943,11 +946,11 @@ def test_max_set_age(started_cluster):
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv' ORDER BY processing_end_time DESC LIMIT 1"
f"SELECT exception FROM system.s3queue WHERE file_name ilike '%{file_with_error}' ORDER BY processing_end_time DESC LIMIT 1"
)
assert 1 < int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}' AND notEmpty(exception)"
)
)