#include #include #include #include #include #include #include #include #include #include #include "Poco/Timer.h" #include namespace DB { ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, HandlerPtr event_handler_, const String & exchange_name_, std::vector & queues_, size_t channel_id_base_, const String & channel_base_, const String & queue_base_, Poco::Logger * log_, char row_delimiter_, size_t num_queues_, const String & deadletter_exchange_, uint32_t queue_size_, const std::atomic & stopped_) : ReadBuffer(nullptr, 0) , consumer_channel(std::move(consumer_channel_)) , event_handler(event_handler_) , exchange_name(exchange_name_) , queues(queues_) , channel_base(channel_base_) , channel_id_base(channel_id_base_) , queue_base(queue_base_) , num_queues(num_queues_) , deadletter_exchange(deadletter_exchange_) , log(log_) , row_delimiter(row_delimiter_) , queue_size(queue_size_) , stopped(stopped_) , received(queue_size * num_queues) { setupChannel(); } ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() { BufferBase::set(nullptr, 0, 0); } void ReadBufferFromRabbitMQConsumer::subscribe() { for (const auto & queue_name : queues) { consumer_channel->consume(queue_name) .onSuccess([&](const std::string & /* consumer_tag */) { LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name); if (++subscribed == queues.size()) wait_subscription.store(false); }) .onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered) { if (message.bodySize()) { String message_received = std::string(message.body(), message.body() + message.bodySize()); if (row_delimiter != '\0') message_received += row_delimiter; if (message.hasMessageID()) received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)}); else received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)}); } }) .onError([&](const char * message) { /* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which * arises from queue settings mismatch or queue level error, which should not happen as noone else is supposed to touch them */ LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message); wait_subscription.store(false); }); } } bool ReadBufferFromRabbitMQConsumer::ackMessages() { AckTracker record_info = last_inserted_record_info; /* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per * channel, so if channel fails, all previous delivery tags become invalid */ if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag) { /// Commit all received messages with delivery tags from last commited to last inserted if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple)) { LOG_ERROR(log, "Failed to commit messages with delivery tags from last commited to {} on channel {}", record_info.delivery_tag, channel_id); return false; } prev_tag = record_info.delivery_tag; LOG_TRACE(log, "Consumer commited messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id); } return true; } void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record_info) { if (record_info.delivery_tag && channel_error.load()) return; if (!record_info.delivery_tag) prev_tag = 0; last_inserted_record_info = record_info; } void ReadBufferFromRabbitMQConsumer::setupChannel() { wait_subscription.store(true); consumer_channel->onReady([&]() { /* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer, * i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that * channel_id is unique for each table */ channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base; LOG_TRACE(log, "Channel {} is created", channel_id); subscribed = 0; subscribe(); channel_error.store(false); }); consumer_channel->onError([&](const char * message) { LOG_ERROR(log, "Channel {} error: {}", channel_id, message); channel_error.store(true); wait_subscription.store(false); }); } void ReadBufferFromRabbitMQConsumer::iterateEventLoop() { event_handler->iterateLoop(); } bool ReadBufferFromRabbitMQConsumer::nextImpl() { if (stopped || !allowed) return false; if (received.tryPop(current)) { auto * new_position = const_cast(current.message.data()); BufferBase::set(new_position, current.message.size(), 0); allowed = false; return true; } return false; } }