From d87162f43cc0efaceed08da0051ad651c0ad81d0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 2 Aug 2023 17:09:47 +0200 Subject: [PATCH] Fix --- tests/integration/test_storage_s3_queue/test.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 77e32e2922c..484ab6d7e95 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -761,7 +761,7 @@ def test_multiple_tables_streaming_sync(started_cluster, mode): @pytest.mark.parametrize("mode", AVAILABLE_MODES) def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): files_to_generate = 100 - poll_size = 10 + poll_size = 2 prefix = f"test_multiple_{mode}" bucket = started_cluster.minio_restricted_bucket instance = started_cluster.instances["instance"] @@ -785,7 +785,12 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): CREATE TABLE test.s3_queue_persistent ({table_format}) ENGINE = MergeTree() ORDER BY column1; + """ + ) + for inst in [instance, instance_2]: + inst.query( + f""" CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS SELECT * @@ -800,7 +805,7 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): def get_count(node, table_name): return int(run_query(node, f"SELECT count() FROM {table_name}")) - for _ in range(100): + for _ in range(150): if ( get_count(instance, "test.s3_queue_persistent") + get_count(instance_2, "test.s3_queue_persistent") @@ -816,11 +821,12 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines() ] + assert len(res1) + len(res2) == files_to_generate + # Checking that all engines have made progress assert len(res1) > 0 assert len(res2) > 0 - assert len(res1) + len(res2) == files_to_generate assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values]) # Checking that all files were processed only once