From 361678ad7327d1c41b78c6eb5b744c178410812a Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 20 Feb 2023 13:55:27 +0100 Subject: [PATCH 1/2] rabbitmq-test-fix --- src/Storages/RabbitMQ/RabbitMQHandler.cpp | 4 ++-- src/Storages/RabbitMQ/RabbitMQHandler.h | 2 +- src/Storages/RabbitMQ/RabbitMQProducer.cpp | 15 ++++++++++++++- tests/integration/test_storage_rabbitmq/test.py | 14 +++++++------- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 934753257b4..745af0d20e3 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -56,10 +56,10 @@ int RabbitMQHandler::iterateLoop() /// Do not need synchronization as in iterateLoop(), because this method is used only for /// initial RabbitMQ setup - at this point there is no background loop thread. -void RabbitMQHandler::startBlockingLoop() +int RabbitMQHandler::startBlockingLoop() { LOG_DEBUG(log, "Started blocking loop."); - uv_run(loop, UV_RUN_DEFAULT); + return uv_run(loop, UV_RUN_DEFAULT); } void RabbitMQHandler::stopLoop() diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index 948d56416fd..4223732a4a0 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -38,7 +38,7 @@ public: /// Loop to wait for small tasks in a blocking mode. /// No synchronization is done with the main loop thread. - void startBlockingLoop(); + int startBlockingLoop(); void stopLoop(); diff --git a/src/Storages/RabbitMQ/RabbitMQProducer.cpp b/src/Storages/RabbitMQ/RabbitMQProducer.cpp index 5d639b77f53..246569060d0 100644 --- a/src/Storages/RabbitMQ/RabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/RabbitMQProducer.cpp @@ -262,7 +262,20 @@ void RabbitMQProducer::startProducingTaskLoop() LOG_TEST(log, "Waiting for pending callbacks to finish (count: {}, try: {})", res, try_num); } - LOG_DEBUG(log, "Producer on channel {} completed", channel_id); + producer_channel->close() + .onSuccess([&]() + { + LOG_TRACE(log, "Successfully closed producer channel"); + connection.getHandler().stopLoop(); + }) + .onError([&](const char * message) + { + LOG_ERROR(log, "Failed to close producer channel: {}", message); + connection.getHandler().stopLoop(); + }); + + int active = connection.getHandler().startBlockingLoop(); + LOG_DEBUG(log, "Producer on channel completed (not finished events: {})", active); } diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 030d9507d4f..5ca6f2acedf 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -1086,6 +1086,9 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): time.sleep(random.uniform(0, 1)) thread.start() + for thread in threads: + thread.join() + while True: result = instance.query("SELECT count() FROM test.view_overload") expected = messages_num * threads_num @@ -1096,16 +1099,13 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): instance.query( """ - DROP TABLE test.consumer_overload; - DROP TABLE test.view_overload; - DROP TABLE test.rabbitmq_consume; - DROP TABLE test.rabbitmq_overload; + DROP TABLE test.consumer_overload NO DELAY; + DROP TABLE test.view_overload NO DELAY; + DROP TABLE test.rabbitmq_consume NO DELAY; + DROP TABLE test.rabbitmq_overload NO DELAY; """ ) - for thread in threads: - thread.join() - assert ( int(result) == messages_num * threads_num ), "ClickHouse lost some messages: {}".format(result) From 0a3639884746a6bb1c057bb5480369d026a96854 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 20 Feb 2023 16:53:06 +0100 Subject: [PATCH 2/2] Update test --- tests/integration/test_storage_rabbitmq/test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 5ca6f2acedf..c3e1843a417 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -1033,8 +1033,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): rabbitmq_exchange_type = 'direct', rabbitmq_num_consumers = 2, rabbitmq_flush_interval_ms=1000, - rabbitmq_max_block_size = 1000, - rabbitmq_num_queues = 2, + rabbitmq_max_block_size = 100, rabbitmq_routing_key_list = 'over', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; @@ -1044,8 +1043,6 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): rabbitmq_exchange_name = 'over', rabbitmq_exchange_type = 'direct', rabbitmq_routing_key_list = 'over', - rabbitmq_flush_interval_ms=1000, - rabbitmq_max_block_size = 1000, rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view_overload (key UInt64, value UInt64)