Fix flaky test

This commit is contained in:
kssenii 2023-07-21 17:03:30 +02:00
parent d2195cff11
commit f82364d2c9

View File

@ -410,7 +410,7 @@ def test_streaming_to_view_(started_cluster, mode):
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
@ -461,15 +461,15 @@ def test_streaming_to_many_views(started_cluster, mode):
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_3;
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_3 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
@ -484,12 +484,12 @@ def test_streaming_to_many_views(started_cluster, mode):
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_2 TO test.s3_queue_persistent_2 AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_3 TO test.s3_queue_persistent_3 AS
SELECT
*
@ -530,7 +530,7 @@ def test_multiple_tables_meta_mismatch(started_cluster):
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
@ -615,7 +615,7 @@ def test_max_set_age(started_cluster):
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
@ -672,14 +672,14 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy_2 ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
@ -690,20 +690,20 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
SELECT
*
@ -718,7 +718,18 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
time.sleep((files_to_generate // poll_size) * 2)
def get_count(table_name):
return int(run_query(instance, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
@ -734,18 +745,18 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
list(map(int, l.split()))
for l in run_query(instance, get_query_copy_2).splitlines()
]
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert len(res3) > 0
# Checking that all files were processed only once
assert len(res1) + len(res2) + len(res3) == files_to_generate
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
@ -774,7 +785,7 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
@ -814,7 +825,7 @@ def test_max_set_size(started_cluster):
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS