From cebfddc3ad2c9ce69d94a9937a1ba641a3715558 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Jan 2023 12:54:11 +0100 Subject: [PATCH 1/5] Better closing of the loop --- src/Storages/RabbitMQ/RabbitMQConnection.cpp | 19 +++++++++++ src/Storages/RabbitMQ/RabbitMQConnection.h | 2 ++ src/Storages/UVLoop.h | 35 +++++++++++++++++++- 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.cpp b/src/Storages/RabbitMQ/RabbitMQConnection.cpp index 13d065774a2..a290d796e32 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConnection.cpp @@ -122,4 +122,23 @@ void RabbitMQConnection::disconnectImpl(bool immediately) event_handler.iterateLoop(); } +RabbitMQConnection::~RabbitMQConnection() +{ + std::lock_guard lock(mutex); + + if (!connection) + return; + + try + { + /// Try to always close the connection gracefully (run the loop to see the closing callbacks) + /// to make sure that the associated callbacks and pending events are removed. + disconnectImpl(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + } diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.h b/src/Storages/RabbitMQ/RabbitMQConnection.h index 7a355afea0e..288b8ccc465 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.h +++ b/src/Storages/RabbitMQ/RabbitMQConnection.h @@ -24,6 +24,8 @@ class RabbitMQConnection public: RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_); + ~RabbitMQConnection(); + bool isConnected(); bool connect(); diff --git a/src/Storages/UVLoop.h b/src/Storages/UVLoop.h index 66668739dd7..3a0e2df343e 100644 --- a/src/Storages/UVLoop.h +++ b/src/Storages/UVLoop.h @@ -6,6 +6,7 @@ #include #include +#include namespace DB { @@ -30,7 +31,38 @@ public: ~UVLoop() { if (loop_ptr) - uv_loop_close(loop_ptr.get()); + { + auto res = uv_loop_close(loop_ptr.get()); + + if (res == UV_EBUSY) + { + LOG_DEBUG(log, "Closing pending handles"); + uv_walk(loop_ptr.get(), [](uv_handle_t * handle, void *) + { + if (!uv_is_closing(handle)) + uv_close(handle, [](uv_handle_t *){}); + }, nullptr); + + /// Run the loop until there are no pending callbacks. + while (true) + { + res = uv_run(loop_ptr.get(), UV_RUN_ONCE); + if (res) + LOG_DEBUG(log, "Waiting for pending callbacks to finish ({})", res); + else + break; + } + + res = uv_loop_close(loop_ptr.get()); + if (res == UV_EBUSY) + { + LOG_ERROR( + log, "Failed to close libuv loop (active requests/handles in the loop: {})", + uv_loop_alive(loop_ptr.get())); + chassert(false); + } + } + } } inline uv_loop_t * getLoop() { return loop_ptr.get(); } @@ -39,6 +71,7 @@ public: private: std::unique_ptr loop_ptr; + Poco::Logger * log = &Poco::Logger::get("UVLoop"); }; } From ae7a5a65545908aad8e26bd9ada3a9dcf91b1714 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Jan 2023 13:07:56 +0100 Subject: [PATCH 2/5] Better comment --- src/Storages/RabbitMQ/RabbitMQConnection.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.cpp b/src/Storages/RabbitMQ/RabbitMQConnection.cpp index a290d796e32..39aa2b87caf 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConnection.cpp @@ -132,7 +132,8 @@ RabbitMQConnection::~RabbitMQConnection() try { /// Try to always close the connection gracefully (run the loop to see the closing callbacks) - /// to make sure that the associated callbacks and pending events are removed. + /// to make sure that the associated callbacks and pending events are removed + /// before handler and loop are destructed. disconnectImpl(); } catch (...) From 0fe4d0732d43f273f2119099049155c6f2967400 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Jan 2023 13:17:40 +0100 Subject: [PATCH 3/5] Better --- src/Storages/UVLoop.h | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/Storages/UVLoop.h b/src/Storages/UVLoop.h index 3a0e2df343e..f5f8d715170 100644 --- a/src/Storages/UVLoop.h +++ b/src/Storages/UVLoop.h @@ -37,11 +37,7 @@ public: if (res == UV_EBUSY) { LOG_DEBUG(log, "Closing pending handles"); - uv_walk(loop_ptr.get(), [](uv_handle_t * handle, void *) - { - if (!uv_is_closing(handle)) - uv_close(handle, [](uv_handle_t *){}); - }, nullptr); + uv_walk(loop_ptr.get(), onUVWalkClosingCallback, nullptr); /// Run the loop until there are no pending callbacks. while (true) @@ -72,6 +68,14 @@ public: private: std::unique_ptr loop_ptr; Poco::Logger * log = &Poco::Logger::get("UVLoop"); + + static void onUVWalkClosingCallback(uv_handle_t * handle, void *) + { + if (!uv_is_closing(handle)) + uv_close(handle, onUVCloseCallback); + } + + static void onUVCloseCallback(uv_handle_t *) {} }; } From 283388cfa6c155120c865094da62b427ce484077 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Jan 2023 13:43:33 +0100 Subject: [PATCH 4/5] Simplify loop, add comment --- src/Storages/RabbitMQ/RabbitMQConnection.h | 4 +++- src/Storages/UVLoop.h | 8 ++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.h b/src/Storages/RabbitMQ/RabbitMQConnection.h index 288b8ccc465..ac9380d25db 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.h +++ b/src/Storages/RabbitMQ/RabbitMQConnection.h @@ -56,9 +56,11 @@ private: Poco::Logger * log; UVLoop loop; + /// Preserve order of destruction here: + /// destruct connection and handler before the loop above. RabbitMQHandler event_handler; - std::unique_ptr connection; + std::mutex mutex; }; diff --git a/src/Storages/UVLoop.h b/src/Storages/UVLoop.h index f5f8d715170..6b24252077e 100644 --- a/src/Storages/UVLoop.h +++ b/src/Storages/UVLoop.h @@ -40,13 +40,9 @@ public: uv_walk(loop_ptr.get(), onUVWalkClosingCallback, nullptr); /// Run the loop until there are no pending callbacks. - while (true) + while ((res = uv_run(loop_ptr.get(), UV_RUN_ONCE)) != 0) { - res = uv_run(loop_ptr.get(), UV_RUN_ONCE); - if (res) - LOG_DEBUG(log, "Waiting for pending callbacks to finish ({})", res); - else - break; + LOG_DEBUG(log, "Waiting for pending callbacks to finish ({})", res); } res = uv_loop_close(loop_ptr.get()); From 2e758f983fb86732e0467f6a9bc2deb266d51a40 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Jan 2023 18:08:55 +0100 Subject: [PATCH 5/5] Add logging, remove closing connection in destructor --- src/Storages/RabbitMQ/RabbitMQConnection.cpp | 20 -------------------- src/Storages/RabbitMQ/RabbitMQConnection.h | 2 -- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 7 ++++++- 3 files changed, 6 insertions(+), 23 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.cpp b/src/Storages/RabbitMQ/RabbitMQConnection.cpp index 39aa2b87caf..13d065774a2 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConnection.cpp @@ -122,24 +122,4 @@ void RabbitMQConnection::disconnectImpl(bool immediately) event_handler.iterateLoop(); } -RabbitMQConnection::~RabbitMQConnection() -{ - std::lock_guard lock(mutex); - - if (!connection) - return; - - try - { - /// Try to always close the connection gracefully (run the loop to see the closing callbacks) - /// to make sure that the associated callbacks and pending events are removed - /// before handler and loop are destructed. - disconnectImpl(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -} - } diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.h b/src/Storages/RabbitMQ/RabbitMQConnection.h index ac9380d25db..698230b16f4 100644 --- a/src/Storages/RabbitMQ/RabbitMQConnection.h +++ b/src/Storages/RabbitMQ/RabbitMQConnection.h @@ -24,8 +24,6 @@ class RabbitMQConnection public: RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_); - ~RabbitMQConnection(); - bool isConnected(); bool connect(); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 2a4549e79ca..04decb91f7d 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -621,7 +621,6 @@ void StorageRabbitMQ::prepareChannelForConsumer(RabbitMQConsumerPtr consumer) consumer->setupChannel(); } - void StorageRabbitMQ::unbindExchange() { /* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query @@ -812,6 +811,8 @@ void StorageRabbitMQ::shutdown() { shutdown_called = true; + LOG_TRACE(log, "Deactivating background tasks"); + /// In case it has not yet been able to setup connection; deactivateTask(connection_task, true, false); @@ -820,6 +821,8 @@ void StorageRabbitMQ::shutdown() deactivateTask(streaming_task, true, false); deactivateTask(looping_task, true, true); + LOG_TRACE(log, "Cleaning up RabbitMQ after table usage"); + /// Just a paranoid try catch, it is not actually needed. try { @@ -842,6 +845,8 @@ void StorageRabbitMQ::shutdown() { tryLogCurrentException(log); } + + LOG_TRACE(log, "Shutdown finished"); }