Merge pull request #21193 from kitaisreal/storage-rabbit-mq-added-uvloop

StorageRabbitMQ added UVLoop
This commit is contained in:
Maksim Kita 2021-02-26 11:04:02 +03:00 committed by GitHub
commit 749dd8d555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 10 deletions

View File

@ -99,9 +99,7 @@ StorageRabbitMQ::StorageRabbitMQ(
, unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
{
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
event_handler = std::make_shared<RabbitMQHandler>(loop.getLoop(), log);
restoreConnection(false);
StorageInMemoryMetadata storage_metadata;
@ -498,7 +496,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
AMQP::Login(login_password.first, login_password.second), vhost));
cnt_retries = 0;
while (!connection->ready() && !stream_cancelled && ++cnt_retries != RETRIES_MAX)
while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
@ -653,7 +651,7 @@ void StorageRabbitMQ::shutdown()
connection->close();
size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
event_handler->iterateLoop();
/// Should actually force closure, if not yet closed, but it generates distracting error logs

View File

@ -9,6 +9,7 @@
#include <Storages/RabbitMQ/Buffer_fwd.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#include <Storages/RabbitMQ/UVLoop.h>
#include <Common/thread_local_rng.h>
#include <amqpcpp/libuv.h>
#include <uv.h>
@ -96,7 +97,7 @@ private:
std::pair<String, String> login_password;
String vhost;
std::unique_ptr<uv_loop_t> loop;
UVLoop loop;
std::shared_ptr<RabbitMQHandler> event_handler;
std::unique_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers

View File

@ -0,0 +1,44 @@
#pragma once
#include <memory>
#include <boost/noncopyable.hpp>
#include <uv.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
}
/// RAII wrapper around uv event loop
class UVLoop : public boost::noncopyable
{
public:
UVLoop(): loop_ptr(new uv_loop_t())
{
int res = uv_loop_init(loop_ptr.get());
if (res != 0)
throw Exception("UVLoop could not initialize", ErrorCodes::SYSTEM_ERROR);
}
~UVLoop()
{
if (loop_ptr)
uv_loop_close(loop_ptr.get());
}
inline uv_loop_t * getLoop() { return loop_ptr.get(); }
inline const uv_loop_t * getLoop() const { return loop_ptr.get(); }
private:
std::unique_ptr<uv_loop_t> loop_ptr;
};
}

View File

@ -57,9 +57,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
{
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
event_handler = std::make_unique<RabbitMQHandler>(loop.getLoop(), log);
if (setupConnection(false))
{

View File

@ -7,6 +7,7 @@
#include <atomic>
#include <amqpcpp.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Storages/RabbitMQ/UVLoop.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/BackgroundSchedulePool.h>
#include <Core/Names.h>
@ -69,7 +70,7 @@ private:
AMQP::Table key_arguments;
BackgroundSchedulePool::TaskHolder writing_task;
std::unique_ptr<uv_loop_t> loop;
UVLoop loop;
std::unique_ptr<RabbitMQHandler> event_handler;
std::unique_ptr<AMQP::TcpConnection> connection;
std::unique_ptr<AMQP::TcpChannel> producer_channel;