Merge pull request #46107 from kssenii/rabbit-test-overloaded-insert-fix

Fix flaky rabbitmq test
This commit is contained in:
Alexey Milovidov 2023-02-08 04:14:03 +03:00 committed by GitHub
commit da22a770a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1031,9 +1031,9 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_name = 'over', rabbitmq_exchange_name = 'over',
rabbitmq_queue_base = 'over', rabbitmq_queue_base = 'over',
rabbitmq_exchange_type = 'direct', rabbitmq_exchange_type = 'direct',
rabbitmq_num_consumers = 3, rabbitmq_num_consumers = 2,
rabbitmq_flush_interval_ms=1000, rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 100, rabbitmq_max_block_size = 1000,
rabbitmq_num_queues = 2, rabbitmq_num_queues = 2,
rabbitmq_routing_key_list = 'over', rabbitmq_routing_key_list = 'over',
rabbitmq_format = 'TSV', rabbitmq_format = 'TSV',
@ -1045,13 +1045,12 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_type = 'direct', rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'over', rabbitmq_routing_key_list = 'over',
rabbitmq_flush_interval_ms=1000, rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 100, rabbitmq_max_block_size = 1000,
rabbitmq_format = 'TSV', rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n'; rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view_overload (key UInt64, value UInt64) CREATE TABLE test.view_overload (key UInt64, value UInt64)
ENGINE = MergeTree ENGINE = MergeTree
ORDER BY key ORDER BY key;
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS
SELECT * FROM test.rabbitmq_consume; SELECT * FROM test.rabbitmq_consume;
""" """
@ -1080,7 +1079,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
raise raise
threads = [] threads = []
threads_num = 3 threads_num = 2
for _ in range(threads_num): for _ in range(threads_num):
threads.append(threading.Thread(target=insert)) threads.append(threading.Thread(target=insert))
for thread in threads: for thread in threads:
@ -1089,11 +1088,11 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
while True: while True:
result = instance.query("SELECT count() FROM test.view_overload") result = instance.query("SELECT count() FROM test.view_overload")
time.sleep(1)
expected = messages_num * threads_num expected = messages_num * threads_num
if int(result) == expected: if int(result) == expected:
break break
print(f"Result: {result} / {expected}") print(f"Result: {result} / {expected}")
time.sleep(1)
instance.query( instance.query(
""" """