From 93b3fcc19533962c47f22dbfd8535f3b87412ce5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 12 Oct 2021 13:05:20 +0300 Subject: [PATCH] Fix rabbitmq --- src/Storages/RabbitMQ/RabbitMQSource.cpp | 5 +++-- tests/integration/test_storage_rabbitmq/test.py | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQSource.cpp b/src/Storages/RabbitMQ/RabbitMQSource.cpp index 2ea60bfb68b..34edd06d3e2 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSource.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -118,8 +119,8 @@ Chunk RabbitMQSource::generateImpl() is_finished = true; MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto input_format = context->getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, max_block_size); + auto input_format = FormatFactory::instance().getInputFormat( + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size); StreamingFormatExecutor executor(non_virtual_header, input_format); diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 9e2752438f8..85dda2fe4d3 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -463,11 +463,13 @@ def test_rabbitmq_big_message(rabbitmq_cluster): for message in messages: channel.basic_publish(exchange='big', routing_key='', body=message) - while True: + for _ in range(300): result = instance.query('SELECT count() FROM test.view') if int(result) == batch_messages * rabbitmq_messages: break + time.sleep(1) + connection.close() instance.query(''' DROP TABLE test.consumer;