diff --git a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp index 65063e004a5..f6facc04212 100644 --- a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include "Poco/Timer.h" @@ -34,7 +35,7 @@ RabbitMQConsumer::RabbitMQConsumer( { } -void RabbitMQConsumer::shutdown() +void RabbitMQConsumer::stop() { stopped = true; cv.notify_one(); @@ -53,10 +54,9 @@ void RabbitMQConsumer::subscribe() consumer_channel->consume(queue_name) .onSuccess([&](const std::string & /* consumer_tag */) { - LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name); - - if (++subscribed == queues.size()) - wait_subscription.store(false); + LOG_TRACE( + log, "Consumer on channel {} ({}/{}) is subscribed to queue {}", + channel_id, subscriptions_num, queues.size(), queue_name); }) .onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered) { @@ -64,9 +64,15 @@ void RabbitMQConsumer::subscribe() { String message_received = std::string(message.body(), message.body() + message.bodySize()); - if (!received.push({message_received, message.hasMessageID() ? message.messageID() : "", - message.hasTimestamp() ? message.timestamp() : 0, - redelivered, AckTracker(delivery_tag, channel_id)})) + MessageData result{ + .message = message_received, + .message_id = message.hasMessageID() ? message.messageID() : "", + .timestamp = message.hasTimestamp() ? message.timestamp() : 0, + .redelivered = redelivered, + .delivery_tag = delivery_tag, + .channel_id = channel_id}; + + if (!received.push(std::move(result))) throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue"); cv.notify_one(); @@ -74,98 +80,100 @@ void RabbitMQConsumer::subscribe() }) .onError([&](const char * message) { - /* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which - * arises from queue settings mismatch or queue level error, which should not happen as no one else is supposed to touch them + /* End up here either if channel ends up in an error state (then there will be resubscription) + * or consume call error, which arises from queue settings mismatch or queue level error, + * which should not happen as no one else is supposed to touch them */ LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message); - wait_subscription.store(false); + state = State::ERROR; }); } } -bool RabbitMQConsumer::ackMessages() +bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info) { - AckTracker record_info = last_inserted_record_info; + if (state != State::OK) + return false; - /* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per - * channel, so if channel fails, all previous delivery tags become invalid - */ - if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag) + /// Nothing to ack. + if (!commit_info.delivery_tag) + return false; + + /// Do not send ack to server if message's channel is not the same as + /// current running channel because delivery tags are scoped per channel, + /// so if channel fails, all previous delivery tags become invalid. + if (commit_info.channel_id != channel_id) + return false; + + /// Duplicate ack? + if (commit_info.delivery_tag > last_commited_delivery_tag + && consumer_channel->ack(commit_info.delivery_tag, AMQP::multiple)) { - /// Commit all received messages with delivery tags from last committed to last inserted - if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple)) - { - LOG_ERROR(log, "Failed to commit messages with delivery tags from last committed to {} on channel {}", - record_info.delivery_tag, channel_id); - return false; - } + last_commited_delivery_tag = commit_info.delivery_tag; - prev_tag = record_info.delivery_tag; - LOG_TRACE(log, "Consumer committed messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id); + LOG_TRACE( + log, "Consumer committed messages with deliveryTags up to {} on channel {}", + last_commited_delivery_tag, channel_id); + + return true; } - return true; + LOG_ERROR( + log, + "Did not commit messages for {}:{}, (current commit point {}:{})", + commit_info.channel_id, commit_info.delivery_tag, + channel_id, last_commited_delivery_tag); + + return false; } -void RabbitMQConsumer::updateAckTracker(AckTracker record_info) +void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection) { - if (record_info.delivery_tag && channel_error.load()) - return; - - if (!record_info.delivery_tag) - prev_tag = 0; - - last_inserted_record_info = record_info; -} - - -void RabbitMQConsumer::setupChannel() -{ - if (!consumer_channel) - return; - - wait_subscription.store(true); + state = State::INITIALIZING; + last_commited_delivery_tag = 0; + consumer_channel = connection.createChannel(); consumer_channel->onReady([&]() { - /* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer, - * i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that - * channel_id is unique for each table - */ - channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base; - LOG_TRACE(log, "Channel {} is created", channel_id); + try + { + /// 1. channel_id_base - indicates current consumer buffer. + /// 2. channel_id_couner - indicates serial number of created channel for current buffer + /// (incremented on each channel update). + /// 3. channel_base is to guarantee that channel_id is unique for each table. + channel_id = fmt::format("{}_{}_{}", channel_id_base, channel_id_counter++, channel_base); - subscribed = 0; - subscribe(); - channel_error.store(false); + LOG_TRACE(log, "Channel {} is successfully created", channel_id); + + subscriptions_num = 0; + subscribe(); + + state = State::OK; + } + catch (...) + { + state = State::ERROR; + tryLogCurrentException(__PRETTY_FUNCTION__); + } }); consumer_channel->onError([&](const char * message) { - LOG_ERROR(log, "Channel {} error: {}", channel_id, message); - - channel_error.store(true); - wait_subscription.store(false); + LOG_ERROR(log, "Channel {} in an error state: {}", channel_id, message); + state = State::ERROR; }); } bool RabbitMQConsumer::needChannelUpdate() { - if (wait_subscription) - return false; - - return channel_error || !consumer_channel || !consumer_channel->usable(); + chassert(consumer_channel); + return state == State::ERROR; } -void RabbitMQConsumer::iterateEventLoop() -{ - event_handler.iterateLoop(); -} - ReadBufferPtr RabbitMQConsumer::consume() { if (stopped || !received.tryPop(current)) diff --git a/src/Storages/RabbitMQ/RabbitMQConsumer.h b/src/Storages/RabbitMQ/RabbitMQConsumer.h index c7adb856212..89dfa060eec 100644 --- a/src/Storages/RabbitMQ/RabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/RabbitMQConsumer.h @@ -19,6 +19,7 @@ namespace DB { class RabbitMQHandler; +class RabbitMQConnection; using ChannelPtr = std::unique_ptr; static constexpr auto SANITY_TIMEOUT = 1000 * 60 * 10; /// 10min. @@ -27,55 +28,44 @@ class RabbitMQConsumer public: RabbitMQConsumer( - RabbitMQHandler & event_handler_, - std::vector & queues_, - size_t channel_id_base_, - const String & channel_base_, - Poco::Logger * log_, - uint32_t queue_size_); + RabbitMQHandler & event_handler_, + std::vector & queues_, + size_t channel_id_base_, + const String & channel_base_, + Poco::Logger * log_, + uint32_t queue_size_); - struct AckTracker + struct CommitInfo { - UInt64 delivery_tag; + UInt64 delivery_tag = 0; String channel_id; - - AckTracker() = default; - AckTracker(UInt64 tag, String id) : delivery_tag(tag), channel_id(id) {} }; struct MessageData { String message; String message_id; - uint64_t timestamp = 0; + UInt64 timestamp = 0; bool redelivered = false; - AckTracker track{}; + UInt64 delivery_tag = 0; + String channel_id; }; + const MessageData & currentMessage() { return current; } /// Return read buffer containing next available message /// or nullptr if there are no messages to process. ReadBufferPtr consume(); - ChannelPtr & getChannel() { return consumer_channel; } - void setupChannel(); bool needChannelUpdate(); - void shutdown(); - - void updateQueues(std::vector & queues_) { queues = queues_; } - size_t queuesCount() { return queues.size(); } + void updateChannel(RabbitMQConnection & connection); + void stop(); bool isConsumerStopped() const { return stopped.load(); } - bool ackMessages(); - void updateAckTracker(AckTracker record = AckTracker()); + + bool ackMessages(const CommitInfo & commit_info); bool hasPendingMessages() { return !received.empty(); } - auto getChannelID() const { return current.track.channel_id; } - auto getDeliveryTag() const { return current.track.delivery_tag; } - auto getRedelivered() const { return current.redelivered; } - auto getMessageID() const { return current.message_id; } - auto getTimestamp() const { return current.timestamp; } - void waitForMessages(std::optional timeout_ms = std::nullopt) { std::unique_lock lock(mutex); @@ -88,24 +78,36 @@ public: private: void subscribe(); - void iterateEventLoop(); + bool isChannelUsable(); + void updateCommitInfo(CommitInfo record); ChannelPtr consumer_channel; RabbitMQHandler & event_handler; /// Used concurrently, but is thread safe. - std::vector queues; + + const std::vector queues; const String channel_base; const size_t channel_id_base; + Poco::Logger * log; std::atomic stopped; String channel_id; - std::atomic channel_error = true, wait_subscription = false; + UInt64 channel_id_counter = 0; + + enum class State + { + NONE, + INITIALIZING, + OK, + ERROR, + }; + std::atomic state = State::NONE; + size_t subscriptions_num = 0; + ConcurrentBoundedQueue received; MessageData current; - size_t subscribed = 0; - AckTracker last_inserted_record_info; - UInt64 prev_tag = 0, channel_id_counter = 0; + UInt64 last_commited_delivery_tag; std::condition_variable cv; std::mutex mutex; diff --git a/src/Storages/RabbitMQ/RabbitMQSource.cpp b/src/Storages/RabbitMQ/RabbitMQSource.cpp index d755dff3202..7194683f3fc 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSource.cpp @@ -99,10 +99,7 @@ void RabbitMQSource::updateChannel() if (!consumer) return; - consumer->updateAckTracker(); - - if (storage.updateChannel(consumer->getChannel())) - consumer->setupChannel(); + consumer->updateChannel(storage.getConnection()); } Chunk RabbitMQSource::generate() @@ -137,6 +134,7 @@ Chunk RabbitMQSource::generateImpl() StreamingFormatExecutor executor(non_virtual_header, input_format); size_t total_rows = 0; + RabbitMQConsumer::CommitInfo current_commit_info; while (true) { size_t new_rows = 0; @@ -149,26 +147,21 @@ Chunk RabbitMQSource::generateImpl() if (new_rows) { - auto exchange_name = storage.getExchange(); - auto channel_id = consumer->getChannelID(); - auto delivery_tag = consumer->getDeliveryTag(); - auto redelivered = consumer->getRedelivered(); - auto message_id = consumer->getMessageID(); - auto timestamp = consumer->getTimestamp(); - - consumer->updateAckTracker({delivery_tag, channel_id}); + const auto exchange_name = storage.getExchange(); + const auto & message = consumer->currentMessage(); for (size_t i = 0; i < new_rows; ++i) { virtual_columns[0]->insert(exchange_name); - virtual_columns[1]->insert(channel_id); - virtual_columns[2]->insert(delivery_tag); - virtual_columns[3]->insert(redelivered); - virtual_columns[4]->insert(message_id); - virtual_columns[5]->insert(timestamp); + virtual_columns[1]->insert(message.channel_id); + virtual_columns[2]->insert(message.delivery_tag); + virtual_columns[3]->insert(message.redelivered); + virtual_columns[4]->insert(message.message_id); + virtual_columns[5]->insert(message.timestamp); } total_rows += new_rows; + current_commit_info = {message.delivery_tag, message.channel_id}; } else if (total_rows == 0) { @@ -210,6 +203,7 @@ Chunk RabbitMQSource::generateImpl() for (auto & column : virtual_columns) result_columns.push_back(std::move(column)); + commit_info = current_commit_info; return Chunk(std::move(result_columns), total_rows); } @@ -219,7 +213,7 @@ bool RabbitMQSource::sendAck() if (!consumer) return false; - if (!consumer->ackMessages()) + if (!consumer->ackMessages(commit_info)) return false; return true; diff --git a/src/Storages/RabbitMQ/RabbitMQSource.h b/src/Storages/RabbitMQ/RabbitMQSource.h index 5d2d8b25630..fd78e326038 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.h +++ b/src/Storages/RabbitMQ/RabbitMQSource.h @@ -51,6 +51,8 @@ private: uint64_t max_execution_time_ms = 0; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; + RabbitMQConsumer::CommitInfo commit_info; + RabbitMQSource( StorageRabbitMQ & storage_, const StorageSnapshotPtr & storage_snapshot_, diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 7999d4af71a..7d3a205c722 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -371,6 +371,14 @@ void StorageRabbitMQ::initRabbitMQ() for (const auto i : collections::range(0, num_queues)) bindQueue(i + 1, *rabbit_channel); + if (queues.size() != num_queues) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected all queues to be initialized (but having {}/{})", + queues.size(), num_queues); + } + LOG_TRACE(log, "RabbitMQ setup completed"); rabbit_is_ready = true; rabbit_channel->close(); @@ -593,35 +601,6 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann } -bool StorageRabbitMQ::updateChannel(ChannelPtr & channel) -{ - try - { - channel = connection->createChannel(); - return true; - } - catch (...) - { - tryLogCurrentException(log); - return false; - } -} - - -void StorageRabbitMQ::prepareChannelForConsumer(RabbitMQConsumerPtr consumer) -{ - if (!consumer) - return; - - if (consumer->queuesCount() != queues.size()) - consumer->updateQueues(queues); - - consumer->updateAckTracker(); - - if (updateChannel(consumer->getChannel())) - consumer->setupChannel(); -} - void StorageRabbitMQ::unbindExchange() { /* This is needed because with RabbitMQ (without special adjustments) can't, for example, properly make mv if there was insert query @@ -820,7 +799,7 @@ void StorageRabbitMQ::shutdown() shutdown_called = true; for (auto & consumer : consumers_ref) - consumer.lock()->shutdown(); + consumer.lock()->stop(); LOG_TRACE(log, "Deactivating background tasks"); @@ -982,7 +961,7 @@ void StorageRabbitMQ::initializeBuffers() if (!initialized) { for (const auto & consumer : consumers) - prepareChannelForConsumer(consumer); + consumer->updateChannel(*connection); initialized = true; } } @@ -1144,10 +1123,7 @@ bool StorageRabbitMQ::tryStreamToViews() ++queue_empty; if (source->needChannelUpdate()) - { - auto consumer = source->getBuffer(); - prepareChannelForConsumer(consumer); - } + source->getBuffer()->updateChannel(*connection); /* false is returned by the sendAck function in only two cases: * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index c6cb340619c..a30919f2b25 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -72,9 +72,7 @@ public: String getExchange() const { return exchange_name; } void unbindExchange(); - bool updateChannel(ChannelPtr & channel); - void updateQueues(std::vector & queues_) { queues_ = queues; } - void prepareChannelForConsumer(RabbitMQConsumerPtr consumer); + RabbitMQConnection & getConnection() { return *connection; } void incrementReader(); void decrementReader();