diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.cpp b/src/Storages/RabbitMQ/RabbitMQConnection.cpp new file mode 100644 index 00000000000..520fb0d651d --- /dev/null +++ b/src/Storages/RabbitMQ/RabbitMQConnection.cpp @@ -0,0 +1,88 @@ +#include "RabbitMQConnection.h" + +#include +#include + + +namespace DB +{ + +static const auto CONNECT_SLEEP = 200; +static const auto RETRIES_MAX = 20; + + +RabbitMQConnection::RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_) + : configuration(configuration_) + , log(log_) + , event_handler(loop.getLoop(), log) +{ +} + +String RabbitMQConnection::connectionInfoForLog() const +{ + return configuration.host + ':' + toString(configuration.port); +} + +bool RabbitMQConnection::isConnected() +{ + std::lock_guard lock(mutex); + return event_handler.connectionRunning() && connection->usable(); +} + +bool RabbitMQConnection::connect() +{ + std::lock_guard lock(mutex); + if (configuration.connection_string.empty()) + { + AMQP::Login login(configuration.username, configuration.password); + AMQP::Address address(configuration.host, configuration.port, login, configuration.vhost, configuration.secure); + connection = std::make_unique(&event_handler, address); + } + else + { + AMQP::Address address(configuration.connection_string); + connection = std::make_unique(&event_handler, address); + } + + auto cnt_retries = 0; + while (!connection->ready() && cnt_retries++ != RETRIES_MAX) + { + event_handler.iterateLoop(); + std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); + } + return event_handler.connectionRunning(); +} + +bool RabbitMQConnection::reconnect() +{ + disconnect(); + { + /// This will force immediate closure if not yet closed + std::lock_guard lock(mutex); + if (!connection->closed()) + connection->close(true); + } + LOG_TRACE(log, "Trying to restore connection to {}", connectionInfoForLog()); + return connect(); +} + +ChannelPtr RabbitMQConnection::createChannel() +{ + std::lock_guard lock(mutex); + return std::make_unique(connection.get()); +} + +void RabbitMQConnection::disconnect(bool immediately) +{ + std::lock_guard lock(mutex); + connection->close(immediately); + + /** Connection is not closed immediately (firstly, all pending operations are completed, and then + * an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed + */ + size_t cnt_retries = 0; + while (!connection->closed() && cnt_retries++ != RETRIES_MAX) + event_handler.iterateLoop(); +} + +} diff --git a/src/Storages/RabbitMQ/RabbitMQConnection.h b/src/Storages/RabbitMQ/RabbitMQConnection.h new file mode 100644 index 00000000000..38007d448af --- /dev/null +++ b/src/Storages/RabbitMQ/RabbitMQConnection.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +struct RabbitMQConfiguration +{ + String host; + UInt16 port; + String username; + String password; + String vhost; + + bool secure; + String connection_string; +}; + +class RabbitMQConnection +{ +public: + RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_); + + bool isConnected(); + + bool connect(); + + bool reconnect(); + + void disconnect(bool immediately = false); + + void heartbeat() { connection->heartbeat(); } + + bool closed() { return connection->closed(); } + + ChannelPtr createChannel(); + + /// RabbitMQHandler is thread safe. Any public methods can be called concurrently. + RabbitMQHandler & getHandler() { return event_handler; } + + String connectionInfoForLog() const; + +private: + RabbitMQConfiguration configuration; + Poco::Logger * log; + + UVLoop loop; + RabbitMQHandler event_handler; + + std::unique_ptr connection; + std::mutex mutex; +}; + +using RabbitMQConnectionPtr = std::unique_ptr; + +} diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 29abfde0d6a..85d8063a73f 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -32,13 +32,6 @@ void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */) loop_state.store(Loop::RUN); } -bool RabbitMQHandler::connectionRunning(const AMQP::TcpConnection * connection) -{ - if (connection_running.load() && !connection->usable()) - LOG_ERROR(log, "Logical error: mismatch in connection flags"); - return connection_running.load() && connection->usable(); -} - void RabbitMQHandler::startLoop() { std::lock_guard lock(startup_mutex); diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index 7abdacd2bbb..8e419b9fca8 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -17,6 +17,7 @@ namespace Loop static const UInt8 STOP = 2; } +using ChannelPtr = std::unique_ptr; class RabbitMQHandler : public AMQP::LibUvHandler { @@ -40,7 +41,7 @@ public: void stopLoop(); - bool connectionRunning(const AMQP::TcpConnection * connection); + bool connectionRunning() { return connection_running.load(); } bool loopRunning() { return loop_running.load(); } void updateLoopState(UInt8 state) { loop_state.store(state); } @@ -55,4 +56,6 @@ private: std::mutex startup_mutex; }; +using RabbitMQHandlerPtr = std::shared_ptr; + } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 195f7b46c37..f5eef0cc960 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -16,7 +16,7 @@ namespace DB ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, - HandlerPtr event_handler_, + RabbitMQHandler & event_handler_, std::vector & queues_, size_t channel_id_base_, const String & channel_base_, @@ -159,7 +159,7 @@ bool ReadBufferFromRabbitMQConsumer::needChannelUpdate() void ReadBufferFromRabbitMQConsumer::iterateEventLoop() { - event_handler->iterateLoop(); + event_handler.iterateLoop(); } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index ccc8e56db5e..22892c5dd32 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -15,16 +15,13 @@ namespace Poco namespace DB { -using ChannelPtr = std::shared_ptr; -using HandlerPtr = std::shared_ptr; - class ReadBufferFromRabbitMQConsumer : public ReadBuffer { public: ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, - HandlerPtr event_handler_, + RabbitMQHandler & event_handler_, std::vector & queues_, size_t channel_id_base_, const String & channel_base_, @@ -85,7 +82,7 @@ private: void iterateEventLoop(); ChannelPtr consumer_channel; - HandlerPtr event_handler; + RabbitMQHandler & event_handler; /// Used concurrently, but is thread safe. std::vector queues; const String channel_base; const size_t channel_id_base; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 8620d58137d..3dbfe20929d 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -37,8 +37,6 @@ namespace DB { -static const auto CONNECT_SLEEP = 200; -static const auto RETRIES_MAX = 20; static const uint32_t QUEUE_SIZE = 100000; static const auto MAX_FAILED_READ_ATTEMPTS = 10; static const auto RESCHEDULE_MS = 500; @@ -91,23 +89,28 @@ StorageRabbitMQ::StorageRabbitMQ( , use_user_setup(rabbitmq_settings->rabbitmq_queue_consume.value) , hash_exchange(num_consumers > 1 || num_queues > 1) , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) - , address(rabbitmq_settings->rabbitmq_host_port.value) - , parsed_address(parseAddress(address, 5672)) - , login_password(std::make_pair( - getContext()->getConfigRef().getString("rabbitmq.username"), - getContext()->getConfigRef().getString("rabbitmq.password"))) - , vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value)) - , connection_string(rabbitmq_settings->rabbitmq_address) - , secure(rabbitmq_settings->rabbitmq_secure.value) , semaphore(0, num_consumers) , unique_strbase(getRandomName()) , queue_size(std::max(QUEUE_SIZE, static_cast(getMaxBlockSize()))) , milliseconds_to_wait(RESCHEDULE_MS) { - event_handler = std::make_shared(loop.getLoop(), log); - if (secure) + auto parsed_address = parseAddress(rabbitmq_settings->rabbitmq_host_port.value, 5672); + configuration = + { + .host = parsed_address.first, + .port = parsed_address.second, + .username = getContext()->getConfigRef().getString("rabbitmq.username"), + .password = getContext()->getConfigRef().getString("rabbitmq.password"), + .vhost = getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value), + .secure = rabbitmq_settings->rabbitmq_secure.value, + .connection_string = rabbitmq_settings->rabbitmq_address + }; + + if (configuration.secure) SSL_library_init(); - restoreConnection(false); + + connection = std::make_unique(configuration, log); + connection->connect(); StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -117,7 +120,6 @@ StorageRabbitMQ::StorageRabbitMQ( rabbitmq_context->makeQueryContext(); /// One looping task for all consumers as they share the same connection == the same handler == the same event loop - event_handler->updateLoopState(Loop::STOP); looping_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); }); looping_task->deactivate(); @@ -222,14 +224,14 @@ std::shared_ptr StorageRabbitMQ::addSettings(ContextPtr local_context) void StorageRabbitMQ::loopingFunc() { - if (event_handler->connectionRunning(connection.get())) - event_handler->startLoop(); + if (connection->isConnected()) + connection->getHandler().startLoop(); } void StorageRabbitMQ::connectionFunc() { - if (restoreConnection(true)) + if (connection->reconnect()) initRabbitMQ(); else connection_task->scheduleAfter(RESCHEDULE_MS); @@ -242,7 +244,9 @@ void StorageRabbitMQ::connectionFunc() void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop) { if (stop_loop) - event_handler->updateLoopState(Loop::STOP); + { + connection->getHandler().updateLoopState(Loop::STOP); + } std::unique_lock lock(task_mutex, std::defer_lock); if (lock.try_lock()) @@ -278,19 +282,19 @@ void StorageRabbitMQ::initRabbitMQ() return; } - AMQP::TcpChannel rabbit_channel(connection.get()); + auto rabbit_channel = connection->createChannel(); /// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers - initExchange(rabbit_channel); - bindExchange(rabbit_channel); + initExchange(*rabbit_channel); + bindExchange(*rabbit_channel); for (const auto i : collections::range(0, num_queues)) - bindQueue(i + 1, rabbit_channel); + bindQueue(i + 1, *rabbit_channel); LOG_TRACE(log, "RabbitMQ setup completed"); rabbit_is_ready = true; - rabbit_channel.close(); + rabbit_channel->close(); } @@ -380,7 +384,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel) } rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers) - .onSuccess([&]() { event_handler->stopLoop(); }) + .onSuccess([&]() { connection->getHandler().stopLoop(); }) .onError([&](const char * message) { throw Exception( @@ -392,7 +396,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel) else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash) { rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0]) - .onSuccess([&]() { event_handler->stopLoop(); }) + .onSuccess([&]() { connection->getHandler().stopLoop(); }) .onError([&](const char * message) { throw Exception( @@ -410,7 +414,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel) { ++bound_keys; if (bound_keys == routing_keys.size()) - event_handler->stopLoop(); + connection->getHandler().stopLoop(); }) .onError([&](const char * message) { @@ -422,7 +426,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel) } } - event_handler->startBlockingLoop(); + connection->getHandler().startBlockingLoop(); } @@ -441,7 +445,7 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann * fanout exchange it can be arbitrary */ rabbit_channel.bindQueue(consumer_exchange, queue_name, std::to_string(queue_id)) - .onSuccess([&] { event_handler->stopLoop(); }) + .onSuccess([&] { connection->getHandler().stopLoop(); }) .onError([&](const char * message) { throw Exception( @@ -507,52 +511,15 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann /// AMQP::autodelete setting is not allowed, because in case of server restart there will be no consumers /// and deleting queues should not take place. rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); - event_handler->startBlockingLoop(); -} - - -bool StorageRabbitMQ::restoreConnection(bool reconnecting) -{ - size_t cnt_retries = 0; - - if (reconnecting) - { - connection->close(); /// Connection might be unusable, but not closed - - /* Connection is not closed immediately (firstly, all pending operations are completed, and then - * an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed - */ - while (!connection->closed() && cnt_retries++ != RETRIES_MAX) - event_handler->iterateLoop(); - - /// This will force immediate closure if not yet closed - if (!connection->closed()) - connection->close(true); - - LOG_TRACE(log, "Trying to restore connection to " + address); - } - - auto amqp_address = connection_string.empty() ? AMQP::Address(parsed_address.first, parsed_address.second, - AMQP::Login(login_password.first, login_password.second), vhost, secure) - : AMQP::Address(connection_string); - connection = std::make_unique(event_handler.get(), amqp_address); - - cnt_retries = 0; - while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX) - { - event_handler->iterateLoop(); - std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); - } - - return event_handler->connectionRunning(connection.get()); + connection->getHandler().startBlockingLoop(); } bool StorageRabbitMQ::updateChannel(ChannelPtr & channel) { - if (event_handler->connectionRunning(connection.get())) + if (connection->isConnected()) { - channel = std::make_shared(connection.get()); + channel = connection->createChannel(); return true; } @@ -576,11 +543,11 @@ void StorageRabbitMQ::unbindExchange() std::call_once(flag, [&]() { streaming_task->deactivate(); - event_handler->updateLoopState(Loop::STOP); + connection->getHandler().updateLoopState(Loop::STOP); looping_task->deactivate(); - AMQP::TcpChannel rabbit_channel(connection.get()); - rabbit_channel.removeExchange(bridge_exchange) + auto rabbit_channel = connection->createChannel(); + rabbit_channel->removeExchange(bridge_exchange) .onSuccess([&]() { exchange_removed.store(true); @@ -592,19 +559,13 @@ void StorageRabbitMQ::unbindExchange() while (!exchange_removed.load()) { - event_handler->iterateLoop(); + connection->getHandler().iterateLoop(); } - rabbit_channel.close(); + rabbit_channel->close(); }); } -String StorageRabbitMQ::formatConnectionInfoForLogs() const -{ - return parsed_address.first + ':' + toString(parsed_address.second); -} - - Pipe StorageRabbitMQ::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -624,12 +585,12 @@ Pipe StorageRabbitMQ::read( auto modified_context = addSettings(local_context); auto block_size = getMaxBlockSize(); - if (!event_handler->connectionRunning(connection.get())) + if (!connection->isConnected()) { - if (event_handler->loopRunning()) + if (connection->getHandler().loopRunning()) deactivateTask(looping_task, false, true); - if (!restoreConnection(true)) - throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "No connection to {}", formatConnectionInfoForLogs()); + if (!connection->reconnect()) + throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "No connection to {}", connection->connectionInfoForLog()); } Pipes pipes; @@ -645,7 +606,7 @@ Pipe StorageRabbitMQ::read( pipes.emplace_back(std::make_shared(converting_stream)); } - if (!event_handler->loopRunning() && event_handler->connectionRunning(connection.get())) + if (!connection->getHandler().loopRunning() && connection->isConnected()) looping_task->activateAndSchedule(); LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); @@ -663,7 +624,7 @@ SinkToStoragePtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr void StorageRabbitMQ::startup() { - if (event_handler->connectionRunning(connection.get())) + if (connection->isConnected()) initRabbitMQ(); else connection_task->activateAndSchedule(); @@ -682,7 +643,7 @@ void StorageRabbitMQ::startup() } } - event_handler->updateLoopState(Loop::RUN); + connection->getHandler().updateLoopState(Loop::RUN); streaming_task->activateAndSchedule(); } @@ -713,13 +674,7 @@ void StorageRabbitMQ::shutdown() /// It is important to close connection here - before removing consumer buffers, because /// it will finish and clean callbacks, which might use those buffers data. - connection->close(); - - /// Connection is not closed immediately - it requires the loop to shutdown it properly and to - /// finish all callbacks. - size_t cnt_retries = 0; - while (!connection->closed() && cnt_retries++ != RETRIES_MAX) - event_handler->iterateLoop(); + connection->disconnect(); for (size_t i = 0; i < num_created_consumers; ++i) popReadBuffer(); @@ -739,7 +694,7 @@ void StorageRabbitMQ::cleanupRabbitMQ() const return; connection->heartbeat(); - if (!event_handler->connectionRunning(connection.get())) + if (!connection->isConnected()) { String queue_names; for (const auto & queue : queues) @@ -755,27 +710,27 @@ void StorageRabbitMQ::cleanupRabbitMQ() const return; } - AMQP::TcpChannel rabbit_channel(connection.get()); + auto rabbit_channel = connection->createChannel(); for (const auto & queue : queues) { /// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping /// on of them should not affect others. /// AMQP::ifempty is not used on purpose. - rabbit_channel.removeQueue(queue, AMQP::ifunused) + rabbit_channel->removeQueue(queue, AMQP::ifunused) .onSuccess([&](uint32_t num_messages) { LOG_TRACE(log, "Successfully deleted queue {}, messages contained {}", queue, num_messages); - event_handler->stopLoop(); + connection->getHandler().stopLoop(); }) .onError([&](const char * message) { LOG_ERROR(log, "Failed to delete queue {}. Error message: {}", queue, message); - event_handler->stopLoop(); + connection->getHandler().stopLoop(); }); } - event_handler->startBlockingLoop(); - rabbit_channel.close(); + connection->getHandler().startBlockingLoop(); + rabbit_channel->close(); /// Also there is no need to cleanup exchanges as they were created with AMQP::autodelete option. Once queues /// are removed, exchanges will also be cleaned. @@ -819,11 +774,11 @@ ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeo ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() { ChannelPtr consumer_channel; - if (event_handler->connectionRunning(connection.get())) - consumer_channel = std::make_shared(connection.get()); + if (connection->isConnected()) + consumer_channel = connection->createChannel(); return std::make_shared( - consumer_channel, event_handler, queues, ++consumer_id, + std::move(consumer_channel), connection->getHandler(), queues, ++consumer_id, unique_strbase, log, row_delimiter, queue_size, stream_cancelled); } @@ -831,7 +786,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() { return std::make_shared( - parsed_address, getContext(), login_password, vhost, routing_keys, exchange_name, exchange_type, + configuration, getContext(), routing_keys, exchange_name, exchange_type, producer_id.fetch_add(1), persistent, wait_confirm, log, row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } @@ -867,7 +822,7 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id) void StorageRabbitMQ::streamingToViewsFunc() { - if (rabbit_is_ready && (event_handler->connectionRunning(connection.get()) || restoreConnection(true))) + if ((rabbit_is_ready && connection->isConnected()) || connection->reconnect()) { try { @@ -893,7 +848,7 @@ void StorageRabbitMQ::streamingToViewsFunc() /// Reschedule with backoff. if (milliseconds_to_wait < BACKOFF_TRESHOLD) milliseconds_to_wait *= 2; - event_handler->updateLoopState(Loop::STOP); + connection->getHandler().updateLoopState(Loop::STOP); break; } else @@ -905,7 +860,7 @@ void StorageRabbitMQ::streamingToViewsFunc() auto duration = std::chrono::duration_cast(end_time - start_time); if (duration.count() > MAX_THREAD_WORK_DURATION_MS) { - event_handler->updateLoopState(Loop::STOP); + connection->getHandler().updateLoopState(Loop::STOP); LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded."); break; } @@ -975,9 +930,9 @@ bool StorageRabbitMQ::streamToViews() std::atomic stub = {false}; - if (!event_handler->loopRunning()) + if (!connection->getHandler().loopRunning()) { - event_handler->updateLoopState(Loop::RUN); + connection->getHandler().updateLoopState(Loop::RUN); looping_task->activateAndSchedule(); } @@ -989,12 +944,12 @@ bool StorageRabbitMQ::streamToViews() deactivateTask(looping_task, false, true); size_t queue_empty = 0; - if (!event_handler->connectionRunning(connection.get())) + if (!connection->isConnected()) { if (stream_cancelled) return true; - if (restoreConnection(true)) + if (connection->reconnect()) { for (auto & stream : streams) stream->as()->updateChannel(); @@ -1043,12 +998,12 @@ bool StorageRabbitMQ::streamToViews() if (!stream->as()->sendAck()) { /// Iterate loop to activate error callbacks if they happened - event_handler->iterateLoop(); - if (!event_handler->connectionRunning(connection.get())) + connection->getHandler().iterateLoop(); + if (!connection->isConnected()) break; } - event_handler->iterateLoop(); + connection->getHandler().iterateLoop(); } } @@ -1061,7 +1016,7 @@ bool StorageRabbitMQ::streamToViews() } else { - event_handler->updateLoopState(Loop::RUN); + connection->getHandler().updateLoopState(Loop::RUN); looping_task->activateAndSchedule(); } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index e4adaa1987d..62ae5847f80 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -7,9 +7,8 @@ #include #include #include -#include #include -#include +#include #include #include #include @@ -19,8 +18,6 @@ namespace DB { -using ChannelPtr = std::shared_ptr; - class StorageRabbitMQ final: public shared_ptr_helper, public IStorage, WithContext { friend struct shared_ptr_helper; @@ -103,16 +100,9 @@ private: bool hash_exchange; Poco::Logger * log; - String address; - std::pair parsed_address; - std::pair login_password; - String vhost; - String connection_string; - bool secure; - UVLoop loop; - std::shared_ptr event_handler; - std::unique_ptr connection; /// Connection for all consumers + RabbitMQConnectionPtr connection; /// Connection for all consumers + RabbitMQConfiguration configuration; size_t num_created_consumers = 0; Poco::Semaphore semaphore; @@ -154,7 +144,6 @@ private: static Names parseSettings(String settings_list); static AMQP::ExchangeType defineExchangeType(String exchange_type_); static String getTableBasedName(String name, const StorageID & table_id); - String formatConnectionInfoForLogs() const; std::shared_ptr addSettings(ContextPtr context) const; size_t getMaxBlockSize() const; @@ -167,7 +156,6 @@ private: void bindExchange(AMQP::TcpChannel & rabbit_channel); void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel); - bool restoreConnection(bool reconnecting); bool streamToViews(); bool checkDependencies(const StorageID & table_id); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 8d3383c085a..c859a997cd7 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -26,10 +26,8 @@ namespace ErrorCodes } WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( - std::pair & parsed_address_, + const RabbitMQConfiguration & configuration_, ContextPtr global_context, - const std::pair & login_password_, - const String & vhost_, const Names & routing_keys_, const String & exchange_name_, const AMQP::ExchangeType exchange_type_, @@ -41,9 +39,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( size_t rows_per_message, size_t chunk_size_) : WriteBuffer(nullptr, 0) - , parsed_address(parsed_address_) - , login_password(login_password_) - , vhost(vhost_) + , connection(configuration_, log_) , routing_keys(routing_keys_) , exchange_name(exchange_name_) , exchange_type(exchange_type_) @@ -57,20 +53,10 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( , max_rows(rows_per_message) , chunk_size(chunk_size_) { - event_handler = std::make_unique(loop.getLoop(), log); - - if (setupConnection(false)) - { + if (connection.connect()) setupChannel(); - } else - { - if (!connection->closed()) - connection->close(true); - - throw Exception("Cannot connect to RabbitMQ host: " + parsed_address.first + ", port: " + std::to_string(parsed_address.second), - ErrorCodes::CANNOT_CONNECT_RABBITMQ); - } + throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to RabbitMQ {}", connection.connectionInfoForLog()); writing_task = global_context->getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); }); writing_task->deactivate(); @@ -92,12 +78,12 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer() { writing_task->deactivate(); - connection->close(); + connection.disconnect(); size_t cnt_retries = 0; - while (!connection->closed() && cnt_retries++ != RETRIES_MAX) + while (!connection.closed() && cnt_retries++ != RETRIES_MAX) { - event_handler->iterateLoop(); + connection.getHandler().iterateLoop(); std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); } @@ -131,42 +117,9 @@ void WriteBufferToRabbitMQProducer::countRow() } -bool WriteBufferToRabbitMQProducer::setupConnection(bool reconnecting) -{ - size_t cnt_retries = 0; - - if (reconnecting) - { - connection->close(); - - while (!connection->closed() && ++cnt_retries != RETRIES_MAX) - event_handler->iterateLoop(); - - if (!connection->closed()) - connection->close(true); - - LOG_TRACE(log, "Trying to set up connection"); - } - - connection = std::make_unique(event_handler.get(), - AMQP::Address( - parsed_address.first, parsed_address.second, - AMQP::Login(login_password.first, login_password.second), vhost)); - - cnt_retries = 0; - while (!connection->ready() && ++cnt_retries != RETRIES_MAX) - { - event_handler->iterateLoop(); - std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); - } - - return event_handler->connectionRunning(connection.get()); -} - - void WriteBufferToRabbitMQProducer::setupChannel() { - producer_channel = std::make_unique(connection.get()); + producer_channel = connection.createChannel(); producer_channel->onError([&](const char * message) { @@ -322,7 +275,7 @@ void WriteBufferToRabbitMQProducer::writingFunc() if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty()) wait_all = false; - else if ((!producer_channel->usable() && event_handler->connectionRunning(connection.get())) || (!event_handler->connectionRunning(connection.get()) && setupConnection(true))) + else if ((!producer_channel->usable() && connection.isConnected()) || (!connection.isConnected() && connection.reconnect())) setupChannel(); } @@ -355,7 +308,7 @@ void WriteBufferToRabbitMQProducer::reinitializeChunks() void WriteBufferToRabbitMQProducer::iterateEventLoop() { - event_handler->iterateLoop(); + connection.getHandler().iterateLoop(); } } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 452cc38d751..bb16a50d068 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -6,8 +6,7 @@ #include #include #include -#include -#include +#include #include #include #include @@ -19,14 +18,12 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer { public: WriteBufferToRabbitMQProducer( - std::pair & parsed_address_, + const RabbitMQConfiguration & configuratin_, ContextPtr global_context, - const std::pair & login_password_, - const String & vhost_, const Names & routing_keys_, const String & exchange_name_, const AMQP::ExchangeType exchange_type_, - const size_t channel_id_, + const size_t channel_id_base_, const bool persistent_, std::atomic & wait_confirm_, Poco::Logger * log_, @@ -48,14 +45,12 @@ private: void iterateEventLoop(); void writingFunc(); - bool setupConnection(bool reconnecting); void setupChannel(); void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish); void publish(ConcurrentBoundedQueue> & message, bool republishing); - std::pair parsed_address; - const std::pair login_password; - const String vhost; + RabbitMQConnection connection; + const Names routing_keys; const String exchange_name; AMQP::ExchangeType exchange_type; @@ -70,9 +65,6 @@ private: AMQP::Table key_arguments; BackgroundSchedulePool::TaskHolder writing_task; - UVLoop loop; - std::unique_ptr event_handler; - std::unique_ptr connection; std::unique_ptr producer_channel; bool producer_ready = false;