Merge pull request #44897 from kssenii/rabbitmq-leak

Try fix rabbitmq potential leak
This commit is contained in:
Ilya Yatsishin 2023-01-05 10:56:56 +01:00 committed by GitHub
commit e4d4a2d8ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 3 deletions

View File

@ -54,9 +54,11 @@ private:
Poco::Logger * log;
UVLoop loop;
/// Preserve order of destruction here:
/// destruct connection and handler before the loop above.
RabbitMQHandler event_handler;
std::unique_ptr<AMQP::TcpConnection> connection;
std::mutex mutex;
};

View File

@ -621,7 +621,6 @@ void StorageRabbitMQ::prepareChannelForConsumer(RabbitMQConsumerPtr consumer)
consumer->setupChannel();
}
void StorageRabbitMQ::unbindExchange()
{
/* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query
@ -812,6 +811,8 @@ void StorageRabbitMQ::shutdown()
{
shutdown_called = true;
LOG_TRACE(log, "Deactivating background tasks");
/// In case it has not yet been able to setup connection;
deactivateTask(connection_task, true, false);
@ -820,6 +821,8 @@ void StorageRabbitMQ::shutdown()
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
LOG_TRACE(log, "Cleaning up RabbitMQ after table usage");
/// Just a paranoid try catch, it is not actually needed.
try
{
@ -842,6 +845,8 @@ void StorageRabbitMQ::shutdown()
{
tryLogCurrentException(log);
}
LOG_TRACE(log, "Shutdown finished");
}

View File

@ -6,6 +6,7 @@
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -30,7 +31,30 @@ public:
~UVLoop()
{
if (loop_ptr)
uv_loop_close(loop_ptr.get());
{
auto res = uv_loop_close(loop_ptr.get());
if (res == UV_EBUSY)
{
LOG_DEBUG(log, "Closing pending handles");
uv_walk(loop_ptr.get(), onUVWalkClosingCallback, nullptr);
/// Run the loop until there are no pending callbacks.
while ((res = uv_run(loop_ptr.get(), UV_RUN_ONCE)) != 0)
{
LOG_DEBUG(log, "Waiting for pending callbacks to finish ({})", res);
}
res = uv_loop_close(loop_ptr.get());
if (res == UV_EBUSY)
{
LOG_ERROR(
log, "Failed to close libuv loop (active requests/handles in the loop: {})",
uv_loop_alive(loop_ptr.get()));
chassert(false);
}
}
}
}
inline uv_loop_t * getLoop() { return loop_ptr.get(); }
@ -39,6 +63,15 @@ public:
private:
std::unique_ptr<uv_loop_t> loop_ptr;
Poco::Logger * log = &Poco::Logger::get("UVLoop");
static void onUVWalkClosingCallback(uv_handle_t * handle, void *)
{
if (!uv_is_closing(handle))
uv_close(handle, onUVCloseCallback);
}
static void onUVCloseCallback(uv_handle_t *) {}
};
}