Better closing of the loop

This commit is contained in:
kssenii 2023-01-04 12:54:11 +01:00
parent fa234878d6
commit cebfddc3ad
3 changed files with 55 additions and 1 deletions

View File

@ -122,4 +122,23 @@ void RabbitMQConnection::disconnectImpl(bool immediately)
event_handler.iterateLoop();
}
RabbitMQConnection::~RabbitMQConnection()
{
std::lock_guard lock(mutex);
if (!connection)
return;
try
{
/// Try to always close the connection gracefully (run the loop to see the closing callbacks)
/// to make sure that the associated callbacks and pending events are removed.
disconnectImpl();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -24,6 +24,8 @@ class RabbitMQConnection
public:
RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_);
~RabbitMQConnection();
bool isConnected();
bool connect();

View File

@ -6,6 +6,7 @@
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -30,7 +31,38 @@ public:
~UVLoop()
{
if (loop_ptr)
uv_loop_close(loop_ptr.get());
{
auto res = uv_loop_close(loop_ptr.get());
if (res == UV_EBUSY)
{
LOG_DEBUG(log, "Closing pending handles");
uv_walk(loop_ptr.get(), [](uv_handle_t * handle, void *)
{
if (!uv_is_closing(handle))
uv_close(handle, [](uv_handle_t *){});
}, nullptr);
/// Run the loop until there are no pending callbacks.
while (true)
{
res = uv_run(loop_ptr.get(), UV_RUN_ONCE);
if (res)
LOG_DEBUG(log, "Waiting for pending callbacks to finish ({})", res);
else
break;
}
res = uv_loop_close(loop_ptr.get());
if (res == UV_EBUSY)
{
LOG_ERROR(
log, "Failed to close libuv loop (active requests/handles in the loop: {})",
uv_loop_alive(loop_ptr.get()));
chassert(false);
}
}
}
}
inline uv_loop_t * getLoop() { return loop_ptr.get(); }
@ -39,6 +71,7 @@ public:
private:
std::unique_ptr<uv_loop_t> loop_ptr;
Poco::Logger * log = &Poco::Logger::get("UVLoop");
};
}