diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp index 17e4db3fb89..8e867db6de9 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp @@ -46,6 +46,8 @@ void RabbitMQBlockOutputStream::write(const Block & block) if (buffer) buffer->flush(); + + storage.pingConnection(); } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 5aa77a9a732..635d53e6cf0 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -54,6 +54,8 @@ public: const String & getFormatName() const { return format_name; } NamesAndTypesList getVirtuals() const override; + const void pingConnection() { connection.heartbeat(); } + protected: StorageRabbitMQ( const StorageID & table_id_, @@ -88,7 +90,7 @@ private: event_base * evbase; RabbitMQHandler eventHandler; - AMQP::TcpConnection connection; + AMQP::TcpConnection connection; /// Connection for all consumers Poco::Semaphore semaphore; std::mutex mutex; diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 86d3b32925a..e61a8e1ccd8 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -16,7 +16,7 @@ enum { Connection_setup_sleep = 200, Connection_setup_retries_max = 1000, - Buffer_limit_to_flush = 5000 /// It is important to keep it low in order not to kill consumers + Buffer_limit_to_flush = 10000 /// It is important to keep it low in order not to kill consumers }; WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( @@ -113,38 +113,11 @@ void WriteBufferToRabbitMQProducer::flush() /* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name * and makes it visible from current producer_channel. */ - producer_channel->declareExchange(exchange_name + "_direct", AMQP::direct, AMQP::passive) .onSuccess([&]() { exchange_declared = true; - /// The case that should not normally happen: message was not delivered to queue (queue ttl exceeded) / not forwareded to consumer - if (flush_returned) - { - /// Needed to avoid data race because two different threads may access this vector - std::lock_guard lock(mutex); - - LOG_TRACE(log, "Redelivering returned messages"); - for (auto & payload : returned) - { - next_queue = next_queue % num_queues + 1; - - if (bind_by_id || hash_exchange) - { - producer_channel->publish(exchange_name, std::to_string(next_queue), payload); - } - else - { - producer_channel->publish(exchange_name, routing_key, payload); - } - - --message_counter; - } - - returned.clear(); - } - /* The reason for accumulating payloads and not publishing each of them at once in count_row() is that publishing * needs to be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it * each time we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to @@ -159,27 +132,11 @@ void WriteBufferToRabbitMQProducer::flush() if (bind_by_id || hash_exchange) { - producer_channel->publish(exchange_name, std::to_string(next_queue), payload, AMQP::mandatory || AMQP::immediate) - .onReturned([&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */) - { - flush_returned = true; - - /// Needed to avoid data race because two different threads may access this variable - std::lock_guard lock(mutex); - returned.emplace_back(std::string(message.body(), message.body() + message.bodySize())); - }); + producer_channel->publish(exchange_name, std::to_string(next_queue), payload); } else { - producer_channel->publish(exchange_name, routing_key, payload, AMQP::mandatory || AMQP::immediate) - .onReturned([&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */) - { - flush_returned = true; - - /// Needed to avoid data race because two different threads may access this vector - std::lock_guard lock(mutex); - returned.emplace_back(std::string(message.body(), message.body() + message.bodySize())); - }); + producer_channel->publish(exchange_name, routing_key, payload); } --message_counter; diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 146be0c5796..c61a76a3e74 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -54,11 +54,8 @@ private: size_t next_queue = 0; UInt64 message_counter = 0; String channel_id; - std::atomic flush_returned = false; - std::mutex mutex; Messages messages; - Messages returned; Poco::Logger * log; const std::optional delim;