Merge pull request #46595 from kssenii/update-rabbit-producer

Fix rabbitmq test
This commit is contained in:
Kseniia Sumarokova 2023-02-21 16:18:55 +01:00 committed by GitHub
commit 655e7f29f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 15 deletions

View File

@ -56,10 +56,10 @@ int RabbitMQHandler::iterateLoop()
/// Do not need synchronization as in iterateLoop(), because this method is used only for /// Do not need synchronization as in iterateLoop(), because this method is used only for
/// initial RabbitMQ setup - at this point there is no background loop thread. /// initial RabbitMQ setup - at this point there is no background loop thread.
void RabbitMQHandler::startBlockingLoop() int RabbitMQHandler::startBlockingLoop()
{ {
LOG_DEBUG(log, "Started blocking loop."); LOG_DEBUG(log, "Started blocking loop.");
uv_run(loop, UV_RUN_DEFAULT); return uv_run(loop, UV_RUN_DEFAULT);
} }
void RabbitMQHandler::stopLoop() void RabbitMQHandler::stopLoop()

View File

@ -38,7 +38,7 @@ public:
/// Loop to wait for small tasks in a blocking mode. /// Loop to wait for small tasks in a blocking mode.
/// No synchronization is done with the main loop thread. /// No synchronization is done with the main loop thread.
void startBlockingLoop(); int startBlockingLoop();
void stopLoop(); void stopLoop();

View File

@ -262,7 +262,20 @@ void RabbitMQProducer::startProducingTaskLoop()
LOG_TEST(log, "Waiting for pending callbacks to finish (count: {}, try: {})", res, try_num); LOG_TEST(log, "Waiting for pending callbacks to finish (count: {}, try: {})", res, try_num);
} }
LOG_DEBUG(log, "Producer on channel {} completed", channel_id); producer_channel->close()
.onSuccess([&]()
{
LOG_TRACE(log, "Successfully closed producer channel");
connection.getHandler().stopLoop();
})
.onError([&](const char * message)
{
LOG_ERROR(log, "Failed to close producer channel: {}", message);
connection.getHandler().stopLoop();
});
int active = connection.getHandler().startBlockingLoop();
LOG_DEBUG(log, "Producer on channel completed (not finished events: {})", active);
} }

View File

@ -1034,8 +1034,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_type = 'direct', rabbitmq_exchange_type = 'direct',
rabbitmq_num_consumers = 2, rabbitmq_num_consumers = 2,
rabbitmq_flush_interval_ms=1000, rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 1000, rabbitmq_max_block_size = 100,
rabbitmq_num_queues = 2,
rabbitmq_routing_key_list = 'over', rabbitmq_routing_key_list = 'over',
rabbitmq_format = 'TSV', rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n'; rabbitmq_row_delimiter = '\\n';
@ -1045,8 +1044,6 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_exchange_name = 'over', rabbitmq_exchange_name = 'over',
rabbitmq_exchange_type = 'direct', rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'over', rabbitmq_routing_key_list = 'over',
rabbitmq_flush_interval_ms=1000,
rabbitmq_max_block_size = 1000,
rabbitmq_format = 'TSV', rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n'; rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view_overload (key UInt64, value UInt64) CREATE TABLE test.view_overload (key UInt64, value UInt64)
@ -1087,6 +1084,9 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
time.sleep(random.uniform(0, 1)) time.sleep(random.uniform(0, 1))
thread.start() thread.start()
for thread in threads:
thread.join()
while True: while True:
result = instance.query("SELECT count() FROM test.view_overload") result = instance.query("SELECT count() FROM test.view_overload")
expected = messages_num * threads_num expected = messages_num * threads_num
@ -1097,16 +1097,13 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
instance.query( instance.query(
""" """
DROP TABLE test.consumer_overload; DROP TABLE test.consumer_overload NO DELAY;
DROP TABLE test.view_overload; DROP TABLE test.view_overload NO DELAY;
DROP TABLE test.rabbitmq_consume; DROP TABLE test.rabbitmq_consume NO DELAY;
DROP TABLE test.rabbitmq_overload; DROP TABLE test.rabbitmq_overload NO DELAY;
""" """
) )
for thread in threads:
thread.join()
assert ( assert (
int(result) == messages_num * threads_num int(result) == messages_num * threads_num
), "ClickHouse lost some messages: {}".format(result) ), "ClickHouse lost some messages: {}".format(result)