diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 2e339a9b5c9..963089c3777 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -827,7 +827,7 @@ def test_max_set_age(started_cluster): dst_table_name = f"{table_name}_dst" keeper_path = f"/clickhouse/test_{table_name}" files_path = f"{table_name}_data" - max_age = 10 + max_age = 20 files_to_generate = 10 create_table( @@ -858,12 +858,17 @@ def test_max_set_age(started_cluster): def get_count(): return int(node.query(f"SELECT count() FROM {dst_table_name}")) - def wait_for_condition(check_function, max_wait_time=30): + import os + + def wait_for_condition(check_function, max_wait_time=1.5 * max_age): + logging.debug(f"{os.getenv('PYTEST_XDIST_WORKER')} - PMO: Waiting for condition") before = time.time() while time.time() - before < max_wait_time: if check_function(): + logging.debug(f"{os.getenv('PYTEST_XDIST_WORKER')} - PMO: Condition verified") return - time.sleep(0.1) + time.sleep(0.25) + logging.debug(f"{os.getenv('PYTEST_XDIST_WORKER')} - PMO: Oh oh... Condition not verified") assert False wait_for_condition(lambda: get_count() == expected_rows) @@ -871,8 +876,6 @@ def test_max_set_age(started_cluster): node.query(f"SELECT uniq(_path) from {dst_table_name}") ) - time.sleep(max_age + max_age / 2) - expected_rows *= 2 wait_for_condition(lambda: get_count() == expected_rows) assert files_to_generate == int( @@ -886,8 +889,12 @@ def test_max_set_age(started_cluster): ).splitlines() ] assert files_to_generate == len(paths_count) - for path_count in paths_count: - assert 2 == path_count + try: + for path_count in paths_count: + assert 2 == path_count + finally: + logging.debug(f"{os.getenv('PYTEST_XDIST_WORKER')} - PMO: Got path count {path_count} for paths_count {paths_count}") + logging.debug(f"{os.getenv('PYTEST_XDIST_WORKER')} - PMO: Got paths: {node.query('SELECT *, _path from ' + dst_table_name + ' FORMAT JSONEachRow')}") def get_object_storage_failures(): return int( @@ -897,6 +904,7 @@ def test_max_set_age(started_cluster): ) failed_count = get_object_storage_failures() + logging.debug(f"{os.getenv('PYTEST_XDIST_WORKER')} - PMO: Got failed count {failed_count}") values = [ ["failed", 1, 1], @@ -906,7 +914,7 @@ def test_max_set_age(started_cluster): ).encode() # use a different filename for each test to allow running a bunch of them sequentially with --count - file_with_error = f"fff_{uuid4().hex}.csv" + file_with_error = f"max_set_age_fail_{uuid4().hex[:8]}.csv" put_s3_file_content(started_cluster, f"{files_path}/{file_with_error}", values_csv) wait_for_condition(lambda: failed_count + 1 <= get_object_storage_failures()) @@ -922,9 +930,7 @@ def test_max_set_age(started_cluster): ) ) - time.sleep(max_age + max_age / 2) - - assert failed_count + 2 <= get_object_storage_failures() + wait_for_condition(lambda: failed_count + 2 <= get_object_storage_failures()) node.query("SYSTEM FLUSH LOGS") assert "Cannot parse input" in node.query(