diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 1a20699d23a..589f5b39d2e 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -52,7 +52,8 @@ void RabbitMQBlockInputStream::readPrefixImpl() if (!buffer || finished) return; - buffer->checkSubscription(); + if (!buffer->channelUsable() && (storage.connectionRunning() || storage.restoreConnection())) + buffer->restoreChannel(storage.getChannel()); } diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 5d17ff23b64..ecaa109c184 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -5,11 +5,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int CANNOT_CONNECT_RABBITMQ; -} - /* The object of this class is shared between concurrent consumers (who share the same connection == share the same * event loop and handler). */ @@ -20,19 +15,26 @@ RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) : { } +///Method that is called when the connection ends up in an error state. void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message) { + connection_running.store(false); LOG_ERROR(log, "Library error report: {}", message); - if (!connection->usable() || !connection->ready()) - throw Exception("Connection error", ErrorCodes::CANNOT_CONNECT_RABBITMQ); + if (connection) + connection->close(); +} + +void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */) +{ + connection_running.store(true); } void RabbitMQHandler::startLoop() { std::lock_guard lock(startup_mutex); /// stop_loop variable is updated in a separate thread - while (!stop_loop.load()) + while (!stop_loop.load() && connection_running.load()) uv_run(loop, UV_RUN_NOWAIT); } diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index 5893ace1d2f..2a992f68d27 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -17,16 +17,18 @@ class RabbitMQHandler : public AMQP::LibUvHandler public: RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_); void onError(AMQP::TcpConnection * connection, const char * message) override; + void onReady(AMQP::TcpConnection * connection) override; void stop() { stop_loop.store(true); } void startLoop(); void iterateLoop(); + bool connectionRunning() { return connection_running.load(); } private: uv_loop_t * loop; Poco::Logger * log; - std::atomic stop_loop = false; + std::atomic stop_loop = false, connection_running = false; std::mutex startup_mutex; }; diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 9f036a8a9b6..2c9834ae077 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -47,7 +47,16 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( for (size_t queue_id = 0; queue_id < num_queues; ++queue_id) bindQueue(queue_id); - consumer_channel->onReady([&]() { subscribe(); }); + consumer_channel->onReady([&]() + { + consumer_channel->onError([&](const char * message) + { + LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message); + channel_error.store(true); + }); + + subscribe(); + }); } @@ -62,16 +71,16 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id) { std::atomic bindings_created = false, bindings_error = false; - auto success_callback = [&](const std::string & queue_name_, int msgcount, int /* consumercount */) + auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */) { - queues.emplace_back(queue_name_); - LOG_DEBUG(log, "Queue " + queue_name_ + " is declared"); + queues.emplace_back(queue_name); + LOG_DEBUG(log, "Queue {} is declared", queue_name); if (msgcount) - LOG_TRACE(log, "Queue " + queue_name_ + " is non-empty. Non-consumed messaged will also be delivered."); + LOG_TRACE(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name); /// Binding key must be a string integer in case of hash exchange (here it is either hash or fanout). - setup_channel->bindQueue(exchange_name, queue_name_, std::to_string(channel_id)) + setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id)) .onSuccess([&] { bindings_created = true; @@ -114,22 +123,13 @@ void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id) void ReadBufferFromRabbitMQConsumer::subscribe() { - count_subscribed = 0; for (const auto & queue_name : queues) { consumer_channel->consume(queue_name) .onSuccess([&](const std::string & consumer) { - ++count_subscribed; LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name); - - consumer_error = false; consumer_tag = consumer; - - consumer_channel->onError([&](const char * message) - { - LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message); - }); }) .onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered) { @@ -144,36 +144,12 @@ void ReadBufferFromRabbitMQConsumer::subscribe() }) .onError([&](const char * message) { - consumer_error = true; LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message); }); } } -void ReadBufferFromRabbitMQConsumer::checkSubscription() -{ - if (count_subscribed == num_queues || !consumer_channel->usable()) - return; - - wait_subscribed = num_queues; - - /// These variables are updated in a separate thread. - while (count_subscribed != wait_subscribed && !consumer_error) - { - iterateEventLoop(); - } - - LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed); - - /// Updated in callbacks which are run by the loop. - if (count_subscribed == num_queues) - return; - - subscribe(); -} - - void ReadBufferFromRabbitMQConsumer::ackMessages() { UInt64 delivery_tag = last_inserted_delivery_tag; @@ -209,4 +185,26 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl() return false; } + +void ReadBufferFromRabbitMQConsumer::restoreChannel(ChannelPtr new_channel) +{ + if (consumer_channel->usable()) + return; + + consumer_channel = std::move(new_channel); + consumer_channel->onReady([&]() + { + LOG_TRACE(log, "Channel {} is restored", channel_id); + channel_error.store(false); + consumer_channel->onError([&](const char * message) + { + LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message); + channel_error.store(true); + }); + + subscribe(); + }); +} + + } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 6448389aea5..d3f560fad3b 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -46,7 +46,8 @@ public: }; void allowNext() { allowed = true; } // Allow to read next message. - void checkSubscription(); + bool channelUsable() { return !channel_error.load(); } + void restoreChannel(ChannelPtr new_channel); void updateNextDeliveryTag(UInt64 delivery_tag) { last_inserted_delivery_tag = delivery_tag; } void ackMessages(); @@ -71,15 +72,13 @@ private: const std::atomic & stopped; const String deadletter_exchange; - std::atomic consumer_error = false; - std::atomic count_subscribed = 0, wait_subscribed; + std::atomic channel_error = false; String consumer_tag; ConcurrentBoundedQueue received; UInt64 last_inserted_delivery_tag = 0, prev_tag = 0; MessageData current; std::vector queues; - std::unordered_map subscribed_queue; bool nextImpl() override; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index f31cf3f4f72..67f3daa81ec 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -39,7 +39,7 @@ namespace DB { static const auto CONNECT_SLEEP = 200; -static const auto RETRIES_MAX = 1000; +static const auto RETRIES_MAX = 20; static const auto HEARTBEAT_RESCHEDULE_MS = 3000; namespace ErrorCodes @@ -98,7 +98,6 @@ StorageRabbitMQ::StorageRabbitMQ( { loop = std::make_unique(); uv_loop_init(loop.get()); - event_handler = std::make_shared(loop.get(), log); connection = std::make_shared(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")); @@ -138,16 +137,6 @@ StorageRabbitMQ::StorageRabbitMQ( exchange_type = AMQP::ExchangeType::fanout; } - if (exchange_type == AMQP::ExchangeType::headers) - { - for (const auto & header : routing_keys) - { - std::vector matching; - boost::split(matching, header, [](char c){ return c == '='; }); - bind_headers[matching[0]] = matching[1]; - } - } - auto table_id = getStorageID(); String table_name = table_id.table_name; @@ -163,7 +152,7 @@ StorageRabbitMQ::StorageRabbitMQ( void StorageRabbitMQ::heartbeatFunc() { - if (!stream_cancelled) + if (!stream_cancelled && event_handler->connectionRunning()) { LOG_TRACE(log, "Sending RabbitMQ heartbeat"); connection->heartbeat(); @@ -174,8 +163,11 @@ void StorageRabbitMQ::heartbeatFunc() void StorageRabbitMQ::loopingFunc() { - LOG_DEBUG(log, "Starting event looping iterations"); - event_handler->startLoop(); + if (event_handler->connectionRunning()) + { + LOG_DEBUG(log, "Starting event looping iterations"); + event_handler->startLoop(); + } } @@ -231,6 +223,14 @@ void StorageRabbitMQ::bindExchange() if (exchange_type == AMQP::ExchangeType::headers) { + AMQP::Table bind_headers; + for (const auto & header : routing_keys) + { + std::vector matching; + boost::split(matching, header, [](char c){ return c == '='; }); + bind_headers[matching[0]] = matching[1]; + } + setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers) .onSuccess([&]() { @@ -299,10 +299,66 @@ void StorageRabbitMQ::unbindExchange() event_handler->stop(); looping_task->deactivate(); + heartbeat_task->deactivate(); }); } +bool StorageRabbitMQ::restoreConnection() +{ + if (restore_connection.try_lock()) + { + /// This lock is to synchronize with getChannel(). + std::lock_guard lk(connection_mutex); + + if (!connection->usable() || !connection->ready()) + { + LOG_TRACE(log, "Trying to restore consumer connection"); + + if (!connection->closed()) + connection->close(); + + connection = std::make_shared(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")); + + size_t cnt_retries = 0; + while (!connection->ready() && ++cnt_retries != RETRIES_MAX) + { + event_handler->iterateLoop(); + std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); + } + } + + if (event_handler->connectionRunning()) + { + LOG_TRACE(log, "Connection restored"); + + heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); + looping_task->activateAndSchedule(); + } + else + { + LOG_TRACE(log, "Connection refused"); + } + + restore_connection.unlock(); + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP)); + } + + return event_handler->connectionRunning(); +} + + +ChannelPtr StorageRabbitMQ::getChannel() +{ + std::lock_guard lk(connection_mutex); + ChannelPtr new_channel = std::make_shared(connection.get()); + return new_channel; +} + + Pipes StorageRabbitMQ::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 9c7df1b1421..31e045ddb87 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -58,6 +58,10 @@ public: bool checkBridge() const { return !exchange_removed.load(); } void unbindExchange(); + bool connectionRunning() { return event_handler->connectionRunning(); } + bool restoreConnection(); + ChannelPtr getChannel(); + protected: StorageRabbitMQ( const StorageID & table_id_, @@ -109,11 +113,11 @@ private: String local_exchange, bridge_exchange, consumer_exchange; std::once_flag flag; - AMQP::Table bind_headers; size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0 bool update_channel_id = false; std::atomic loop_started = false, exchange_removed = false; ChannelPtr setup_channel; + std::mutex connection_mutex, restore_connection; BackgroundSchedulePool::TaskHolder streaming_task; BackgroundSchedulePool::TaskHolder heartbeat_task;