From 053f31cb77235e4da3d3401f64b24cb3b4cfc413 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 4 Aug 2020 15:13:09 +0000 Subject: [PATCH] Better confirmListener --- .../WriteBufferToRabbitMQProducer.cpp | 145 ++++++++---------- .../RabbitMQ/WriteBufferToRabbitMQProducer.h | 8 +- 2 files changed, 69 insertions(+), 84 deletions(-) diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 883ee70f5d5..c2ab8e3e843 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -13,11 +13,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int CANNOT_CONNECT_RABBITMQ; -} - static const auto CONNECT_SLEEP = 200; static const auto RETRIES_MAX = 20; static const auto BATCH = 512; @@ -133,23 +128,29 @@ bool WriteBufferToRabbitMQProducer::setupConnection() void WriteBufferToRabbitMQProducer::setupChannel() { producer_channel = std::make_unique(connection.get()); + producer_channel->onError([&](const char * message) { - LOG_DEBUG(log, "Producer error: {}. Currently {} messages have not been confirmed yet, {} messages are waiting to be published", - message, delivery_tags_record.size(), payloads.size()); + LOG_ERROR(log, "Producer error: {}", message); /// Means channel ends up in an error state and is not usable anymore. producer_channel->close(); + + for (auto record = delivery_record.begin(); record != delivery_record.end(); record++) + returned.tryPush(record->second); + + LOG_DEBUG(log, "Currently {} messages have not been confirmed yet, {} waiting to be published, {} will be republished", + delivery_record.size(), payloads.size(), returned.size()); + + /// Delivery tags are scoped per channel. + delivery_record.clear(); + delivery_tag = 0; }); producer_channel->onReady([&]() { LOG_DEBUG(log, "Producer channel is ready"); - /// Delivery tags are scoped per channel. - delivery_tags_record.clear(); - delivery_tag = 0; - if (use_transactional_channel) { producer_channel->startTransaction(); @@ -157,56 +158,76 @@ void WriteBufferToRabbitMQProducer::setupChannel() else { /* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails, - * it will be requed in returned_callback. If persistent == false, message is confirmed the moment it is enqueued. If fails, it - * is not requeued. First option is two times slower than the second, so default is second and the first is turned on in table - * setting. Persistent message is not requeued if it is unroutable, i.e. no queues are bound to given exchange with the given - * routing key - this is a responsibility of a client. It can be requeued in this case if AMQP::mandatory is set, but pointless. + * onNack() is received. If persistent == false, message is confirmed the moment it is enqueued. First option is two times + * slower than the second, so default is second and the first is turned on in table setting. + * + * "Publisher confirms" are implemented similar to strategy#3 here https://www.rabbitmq.com/tutorials/tutorial-seven-java.html */ producer_channel->confirmSelect() .onAck([&](uint64_t acked_delivery_tag, bool multiple) { - removeConfirmed(acked_delivery_tag, multiple); + removeConfirmed(acked_delivery_tag, multiple, false); }) .onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */) { - if (!persistent) - removeConfirmed(nacked_delivery_tag, multiple); + removeConfirmed(nacked_delivery_tag, multiple, true); }); } }); } -void WriteBufferToRabbitMQProducer::removeConfirmed(UInt64 received_delivery_tag, bool multiple) +void WriteBufferToRabbitMQProducer::removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish) { - /// Same as here https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/ - std::lock_guard lock(mutex); - auto found_tag_pos = delivery_tags_record.find(received_delivery_tag); - if (found_tag_pos != delivery_tags_record.end()) + auto record_iter = delivery_record.find(received_delivery_tag); + + if (record_iter != delivery_record.end()) { - /// If multiple is true, then all delivery tags up to and including current are confirmed. if (multiple) { - ++found_tag_pos; - delivery_tags_record.erase(delivery_tags_record.begin(), found_tag_pos); + /// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack). + ++record_iter; + + if (republish) + for (auto record = delivery_record.begin(); record != record_iter; ++record) + returned.tryPush(record->second); + + /// Delete the records even in case when republished because new delivery tags will be assigned by the server. + delivery_record.erase(delivery_record.begin(), record_iter); + //LOG_DEBUG(log, "Confirmed all delivery tags up to {}", received_delivery_tag); } else { - delivery_tags_record.erase(found_tag_pos); + if (republish) + returned.tryPush(record_iter->second); + + delivery_record.erase(record_iter); + //LOG_DEBUG(log, "Confirmed delivery tag {}", received_delivery_tag); } } + /// else is theoretically not possible } -void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue & messages) +void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue & messages, bool republishing) { String payload; - while (!messages.empty()) + while (!messages.empty() && producer_channel->usable()) { messages.pop(payload); AMQP::Envelope envelope(payload.data(), payload.size()); + AMQP::Table message_settings = key_arguments; + + /* There is the case when connection is lost in the period after some messages were published and before ack/nack was sent by the + * server, then it means that publisher will never now whether those messages were delivered or not, and therefore those records + * that received no ack/nack before connection loss will be republished, so there might be duplicates. To let consumer know that + * received message might be a possible duplicate - a "republished" field is added to message metadata. + */ + message_settings["republished"] = std::to_string(republishing); + + envelope.setHeaders(message_settings); /// Delivery mode is 1 or 2. 1 is default. 2 makes a message durable, but makes performance 1.5-2 times worse. if (persistent) @@ -214,79 +235,45 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue & mes if (exchange_type == AMQP::ExchangeType::consistent_hash) { - producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope).onReturned(returned_callback); + producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope); } else if (exchange_type == AMQP::ExchangeType::headers) { - envelope.setHeaders(key_arguments); - producer_channel->publish(exchange_name, "", envelope).onReturned(returned_callback); + producer_channel->publish(exchange_name, "", envelope); } else { - producer_channel->publish(exchange_name, routing_keys[0], envelope).onReturned(returned_callback); + producer_channel->publish(exchange_name, routing_keys[0], envelope); } - if (producer_channel->usable()) - { - ++delivery_tag; - delivery_tags_record.insert(delivery_tags_record.end(), delivery_tag); + ++delivery_tag; + delivery_record.insert(delivery_record.end(), {delivery_tag, payload}); - if (delivery_tag % BATCH == 0) - break; - } - else - { + /// Need to break to let event loop run, because no publishing actually happend before looping. + if (delivery_tag % BATCH == 0) break; - } } iterateEventLoop(); } -/* Currently implemented “asynchronous publisher confirms” - does not stop after each publish to wait for each individual confirm. An - * asynchronous publisher may have any number of messages in-flight (unconfirmed) at a time. - * Synchronous publishing is where after each publish need to wait for the acknowledgement (ack/nack - see confirmSelect() in channel - * declaration), which is very slow because takes starting event loop and waiting for corresponding callback - can really take a while. - * - * Async publishing works well in all failure cases except for connection failure, because if connection fails - not all Ack/Nack might be - * receieved from the server (and even if all messages were successfully delivered, publisher will not be able to know it). Also in this - * case onReturned callback will not be received, so loss is possible for messages that were published but have not received confirm from - * server before connection loss, because then publisher won't know if message was delivered or not. - * - * To make it a delivery with no loss and minimal possible amount of duplicates - need to use synchronous publishing (which is too slow). - * With async publishing at-least-once delivery is achieved with (batch) publishing and manual republishing in case when not all delivery - * tags were confirmed (ack/nack) before connection loss. Here the maximum number of possible duplicates is no more than batch size. - * (Manual last batch republishing is only for case of connection loss, in all other failure cases - onReturned callback will be received.) - * - * So currently implemented async batch publishing, but for now without manual republishing (because still in doubt how to do it nicely, - * but current idea is to store in delivery_tags_record not just delivery tags, but pair: (delivery_tag, message). As currently once the - * publisher receives acknowledgement from the server that the message was sucessfully delivered - a "confirmListener" will delete its - * delivery tag from the set of pending acknowledgemens, then we can as well delete the payload. If connection fails, undeleted delivery - * tags indicate messages, whose fate is unknown, so corresponding payloads should be republished.) -*/ + void WriteBufferToRabbitMQProducer::writingFunc() { - returned_callback = [&](const AMQP::Message & message, int16_t code, const std::string & description) - { - returned.tryPush(std::string(message.body(), message.size())); - LOG_DEBUG(log, "Message returned with code: {}, description: {}. Republishing", code, description); - - /* Here can be added a value to AMQP::Table field of AMQP::Envelope (and then it should be queue instead of - * queue) - to indicate that message was republished. Later a consumer will be able to extract this field and understand - * that this message was republished and can probably be a duplicate (as RabbitMQ does not guarantee exactly-once delivery). - */ - }; - while (!payloads.empty() || wait_all) { + /* Publish main paylods only when there are no returned messages. This way it is ensured that returned.queue never grows too big + * and returned messages are republished as fast as possible. Also payloads.queue is fixed size and push attemt would block thread + * in countRow() once there is no space - that is intended. + */ if (!returned.empty() && producer_channel->usable()) - publish(returned); - else if (!payloads.empty() && delivery_tags_record.empty() && producer_channel->usable()) - publish(payloads); + publish(returned, true); + else if (!payloads.empty() && producer_channel->usable()) + publish(payloads, false); iterateEventLoop(); - if (wait_num.load() && delivery_tags_record.empty() && payloads.empty()) + if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty()) wait_all.store(false); else if ((!producer_channel->usable() && connection->usable()) || (!connection->usable() && setupConnection())) setupChannel(); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index d8e3db37043..b9378695d8d 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -45,8 +45,8 @@ private: void writingFunc(); bool setupConnection(); void setupChannel(); - void removeConfirmed(UInt64 received_delivery_tag, bool multiple); - void publish(ConcurrentBoundedQueue & message); + void removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish); + void publish(ConcurrentBoundedQueue & message, bool republishing); std::pair parsed_address; const std::pair login_password; @@ -68,10 +68,8 @@ private: UInt64 delivery_tag = 0; std::atomic wait_all = true; std::atomic wait_num = 0; - std::set delivery_tags_record; - std::mutex mutex; UInt64 payload_counter = 0; - std::function returned_callback; + std::map delivery_record; Poco::Logger * log; const std::optional delim;