Merge pull request #52944 from kssenii/fix-s3-queue-flaky-test

Fix flaky test_storage_s3_queue::test_multiple_tables_streaming_sync_distributed
This commit is contained in:
Kseniia Sumarokova 2023-08-03 10:44:43 +02:00 committed by GitHub
commit 65352d64c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -761,7 +761,7 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
files_to_generate = 100
poll_size = 10
poll_size = 2
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
@ -785,7 +785,12 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
"""
)
for inst in [instance, instance_2]:
inst.query(
f"""
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
@ -800,7 +805,7 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
for _ in range(150):
if (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
@ -816,11 +821,12 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
]
assert len(res1) + len(res2) == files_to_generate
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert len(res1) + len(res2) == files_to_generate
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once