From cebfddc3ad2c9ce69d94a9937a1ba641a3715558 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Jan 2023 12:54:11 +0100 Subject: [PATCH] Better closing of the loop --- src/Storages/RabbitMQ/RabbitMQConnection.cpp | 19 +++++++++++ src/Storages/RabbitMQ/RabbitMQConnection.h | 2 ++ src/Storages/UVLoop.h | 35 +++++++++++++++++++- 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.cpp b/src/Storages/RabbitMQ/RabbitMQConnection.cpp index 13d065774a2..a290d796e32 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConnection.cpp @@ -122,4 +122,23 @@ void RabbitMQConnection::disconnectImpl(bool immediately) event_handler.iterateLoop(); } +RabbitMQConnection::~RabbitMQConnection() +{ + std::lock_guard lock(mutex); + + if (!connection) + return; + + try + { + /// Try to always close the connection gracefully (run the loop to see the closing callbacks) + /// to make sure that the associated callbacks and pending events are removed. + disconnectImpl(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + } diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.h b/src/Storages/RabbitMQ/RabbitMQConnection.h index 7a355afea0e..288b8ccc465 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.h +++ b/src/Storages/RabbitMQ/RabbitMQConnection.h @@ -24,6 +24,8 @@ class RabbitMQConnection public: RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_); + ~RabbitMQConnection(); + bool isConnected(); bool connect(); diff --git a/src/Storages/UVLoop.h b/src/Storages/UVLoop.h index 66668739dd7..3a0e2df343e 100644 --- a/src/Storages/UVLoop.h +++ b/src/Storages/UVLoop.h @@ -6,6 +6,7 @@ #include #include +#include namespace DB { @@ -30,7 +31,38 @@ public: ~UVLoop() { if (loop_ptr) - uv_loop_close(loop_ptr.get()); + { + auto res = uv_loop_close(loop_ptr.get()); + + if (res == UV_EBUSY) + { + LOG_DEBUG(log, "Closing pending handles"); + uv_walk(loop_ptr.get(), [](uv_handle_t * handle, void *) + { + if (!uv_is_closing(handle)) + uv_close(handle, [](uv_handle_t *){}); + }, nullptr); + + /// Run the loop until there are no pending callbacks. + while (true) + { + res = uv_run(loop_ptr.get(), UV_RUN_ONCE); + if (res) + LOG_DEBUG(log, "Waiting for pending callbacks to finish ({})", res); + else + break; + } + + res = uv_loop_close(loop_ptr.get()); + if (res == UV_EBUSY) + { + LOG_ERROR( + log, "Failed to close libuv loop (active requests/handles in the loop: {})", + uv_loop_alive(loop_ptr.get())); + chassert(false); + } + } + } } inline uv_loop_t * getLoop() { return loop_ptr.get(); } @@ -39,6 +71,7 @@ public: private: std::unique_ptr loop_ptr; + Poco::Logger * log = &Poco::Logger::get("UVLoop"); }; }