diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index bb8da289490..e489c2a134f 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -253,6 +253,8 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): @pytest.mark.timeout(240) def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', @@ -260,6 +262,11 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): rabbitmq_format = 'TSV', rabbitmq_queue_base = 'tsv', rabbitmq_row_delimiter = '\\n'; + CREATE TABLE test.view (key UInt64, value UInt64) + ENGINE = MergeTree() + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.rabbitmq; ''') credentials = pika.PlainCredentials('root', 'clickhouse') @@ -275,11 +282,10 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): channel.basic_publish(exchange='tsv', routing_key='', body=message) connection.close() - time.sleep(1) result = '' while True: - result += instance.query('SELECT * FROM test.rabbitmq ORDER BY key', ignore_error=True) + result = instance.query('SELECT * FROM test.view ORDER BY key') if rabbitmq_check_result(result): break