diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 1f14886e50f..b5b60b2c8cc 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -808,22 +808,15 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): rabbitmq_routing_key_list = 'insert2', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; - CREATE TABLE test.view_many (key UInt64, value UInt64) - ENGINE = MergeTree - ORDER BY key - SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; - CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS - SELECT * FROM test.rabbitmq_consume; ''') - messages_num = 1000 + messages_num = 10000 + values = [] + for i in range(messages_num): + values.append("({i}, {i})".format(i=i)) + values = ','.join(values) def insert(): - values = [] - for i in range(messages_num): - values.append("({i}, {i})".format(i=i)) - values = ','.join(values) - while True: try: instance.query("INSERT INTO test.rabbitmq_many VALUES {}".format(values)) @@ -835,18 +828,29 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): raise threads = [] - threads_num = 20 + threads_num = 10 for _ in range(threads_num): threads.append(threading.Thread(target=insert)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() + instance.query(''' + CREATE TABLE test.view_many (key UInt64, value UInt64) + ENGINE = MergeTree + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS + SELECT * FROM test.rabbitmq_consume; + ''') + + for thread in threads: + thread.join() + while True: result = instance.query('SELECT count() FROM test.view_many') - time.sleep(1) if int(result) == messages_num * threads_num: break + time.sleep(1) instance.query(''' DROP TABLE test.rabbitmq_consume; @@ -855,9 +859,6 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): DROP TABLE test.view_many; ''') - for thread in threads: - thread.join() - assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)