#include #include #include #include #include #include #include "Poco/Timer.h" #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int CANNOT_CONNECT_NATS; } NATSConsumer::NATSConsumer( std::shared_ptr connection_, StorageNATS & storage_, std::vector & subjects_, const String & subscribe_queue_name, LoggerPtr log_, uint32_t queue_size_, const std::atomic & stopped_) : connection(connection_) , storage(storage_) , subjects(subjects_) , log(log_) , stopped(stopped_) , queue_name(subscribe_queue_name) , received(queue_size_) { } void NATSConsumer::subscribe() { if (subscribed) return; for (const auto & subject : subjects) { natsSubscription * ns; auto status = natsConnection_QueueSubscribe( &ns, connection->getConnection(), subject.c_str(), queue_name.c_str(), onMsg, static_cast(this)); if (status == NATS_OK) { LOG_DEBUG(log, "Subscribed to subject {}", subject); natsSubscription_SetPendingLimits(ns, -1, -1); subscriptions.emplace_back(ns, &natsSubscription_Destroy); } else { throw Exception(ErrorCodes::CANNOT_CONNECT_NATS, "Failed to subscribe to subject {}", subject); } } subscribed = true; } void NATSConsumer::unsubscribe() { for (const auto & subscription : subscriptions) natsSubscription_Unsubscribe(subscription.get()); } ReadBufferPtr NATSConsumer::consume() { if (stopped || !received.tryPop(current)) return nullptr; return std::make_shared(current.message.data(), current.message.size()); } void NATSConsumer::onMsg(natsConnection *, natsSubscription *, natsMsg * msg, void * consumer) { auto * nats_consumer = static_cast(consumer); const int msg_length = natsMsg_GetDataLength(msg); if (msg_length) { String message_received = std::string(natsMsg_GetData(msg), msg_length); String subject = natsMsg_GetSubject(msg); MessageData data = { .message = message_received, .subject = subject, }; if (!nats_consumer->received.push(std::move(data))) throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue"); nats_consumer->storage.startStreaming(); } natsMsg_Destroy(msg); } }