From 5a934c079e691d4231b08a1a96204a6ebd5d85d2 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 31 Jul 2020 04:59:56 +0000 Subject: [PATCH] Add connection restore in insert, better confirms --- .../WriteBufferToRabbitMQProducer.cpp | 127 +++++++++++++----- .../RabbitMQ/WriteBufferToRabbitMQProducer.h | 14 +- 2 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 82cb3f2311d..d74e94d74d2 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -25,7 +25,7 @@ static const auto LOOP_WAIT = 10; static const auto BATCH = 10000; WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( - std::pair & parsed_address, + std::pair & parsed_address_, Context & global_context, const std::pair & login_password_, const Names & routing_keys_, @@ -39,6 +39,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( size_t rows_per_message, size_t chunk_size_) : WriteBuffer(nullptr, 0) + , parsed_address(parsed_address_) , login_password(login_password_) , routing_keys(routing_keys_) , exchange_name(exchange_name_) @@ -55,11 +56,45 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( loop = std::make_unique(); uv_loop_init(loop.get()); - event_handler = std::make_unique(loop.get(), log); + + /// New coonection for each publisher because cannot publish from different threads with the same connection.(https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086) + setupConnection(0); + setupChannel(0); + + writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); }); + writing_task->deactivate(); + + if (exchange_type == AMQP::ExchangeType::headers) + { + for (const auto & header : routing_keys) + { + std::vector matching; + boost::split(matching, header, [](char c){ return c == '='; }); + key_arguments[matching[0]] = matching[1]; + } + } +} + + +WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer() +{ + writing_task->deactivate(); + connection->close(); + assert(rows == 0 && chunks.empty()); +} + + +void WriteBufferToRabbitMQProducer::setupConnection(bool remove_prev_connection) +{ + if (remove_prev_connection && connection) + { + connection->close(); + connection.release(); + } + connection = std::make_unique(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")); - /// New coonection for each publisher because cannot publish from different threads.(https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086) size_t cnt_retries = 0; while (!connection->ready() && ++cnt_retries != RETRIES_MAX) { @@ -71,8 +106,18 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( { throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ); } +} - producer_channel = std::make_shared(connection.get()); + +void WriteBufferToRabbitMQProducer::setupChannel(bool remove_prev_channel) +{ + if (remove_prev_channel && producer_channel) + { + producer_channel->close(); + producer_channel.release(); + } + + producer_channel = std::make_unique(connection.get()); producer_channel->onError([&](const char * message) { LOG_ERROR(log, "Prodcuer error: {}", message); @@ -84,38 +129,38 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( } else { + /// Same as here https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/ + remove_confirmed_tag = [&](uint64_t received_delivery_tag, bool multiple) + { + std::lock_guard lock(mutex); + auto found_tag_pos = delivery_tags_record.find(received_delivery_tag); + if (found_tag_pos != delivery_tags_record.end()) + { + if (multiple) + { + ++found_tag_pos; + delivery_tags_record.erase(delivery_tags_record.begin(), found_tag_pos); + } + else + delivery_tags_record.erase(found_tag_pos); + } + }; + + /* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails, it + * will be requed in returned_callback. If persistent == false, message is confirmed the moment it is enqueued. If fails, it is + * not requeued. First option is two times slower than the second, so default is second and the first is turned on in table setting. + */ producer_channel->confirmSelect() - .onAck([&](uint64_t deliveryTag, bool /* multiple */) + .onAck([&](uint64_t acked_delivery_tag, bool multiple) { - if (deliveryTag > last_processed) - last_processed = deliveryTag; + remove_confirmed_tag(acked_delivery_tag, multiple); }) - .onNack([&](uint64_t /* deliveryTag */, bool /* multiple */, bool /* requeue */) + .onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */) { + if (!persistent) + remove_confirmed_tag(nacked_delivery_tag, multiple); }); } - - writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); }); - writing_task->deactivate(); - - if (exchange_type == AMQP::ExchangeType::headers) - { - std::vector matching; - for (const auto & header : routing_keys) - { - boost::split(matching, header, [](char c){ return c == '='; }); - key_arguments[matching[0]] = matching[1]; - matching.clear(); - } - } -} - - -WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer() -{ - writing_task->deactivate(); - connection->close(); - assert(rows == 0 && chunks.empty()); } @@ -143,6 +188,9 @@ void WriteBufferToRabbitMQProducer::countRow() ++delivery_tag; payloads.push(payload); + + std::lock_guard lock(mutex); + delivery_tags_record.insert(delivery_tags_record.end(), delivery_tag); } } @@ -180,7 +228,7 @@ void WriteBufferToRabbitMQProducer::writingFunc() else if (exchange_type == AMQP::ExchangeType::headers) { envelope.setHeaders(key_arguments); - producer_channel->publish(exchange_name, "", envelope, key_arguments).onReturned(returned_callback); + producer_channel->publish(exchange_name, "", envelope).onReturned(returned_callback); } else { @@ -191,7 +239,7 @@ void WriteBufferToRabbitMQProducer::writingFunc() iterateEventLoop(); } - if (wait_num.load() && last_processed.load() >= wait_num.load()) + if (wait_num.load() && delivery_tags_record.empty()) { wait_all.store(false); LOG_DEBUG(log, "All messages are successfully published"); @@ -200,7 +248,22 @@ void WriteBufferToRabbitMQProducer::writingFunc() { iterateEventLoop(); } + + /// Most channel based errors result in channel closure, which is very likely to trigger connection closure. + if (connection->usable() && connection->ready() && !producer_channel->usable()) + { + LOG_DEBUG(log, "Channel is not usable. Creating a new one"); + setupChannel(1); + } + else if (!connection->usable() || !connection->ready()) + { + LOG_DEBUG(log, "Connection is not usable. Creating a new one"); + setupConnection(1); + setupChannel(1); + } } + + LOG_DEBUG(log, "Delivered messages"); } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 30e647af471..188bd5676f4 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -14,13 +14,11 @@ namespace DB { -using ChannelPtr = std::shared_ptr; - class WriteBufferToRabbitMQProducer : public WriteBuffer { public: WriteBufferToRabbitMQProducer( - std::pair & parsed_address, + std::pair & parsed_address_, Context & global_context, const std::pair & login_password_, const Names & routing_keys_, @@ -46,7 +44,10 @@ private: void nextImpl() override; void iterateEventLoop(); void writingFunc(); + void setupConnection(bool remove_prev_connection); + void setupChannel(bool remove_prev_channel); + std::pair parsed_address; const std::pair login_password; const Names routing_keys; const String exchange_name; @@ -61,12 +62,15 @@ private: std::unique_ptr loop; std::unique_ptr event_handler; std::unique_ptr connection; - ChannelPtr producer_channel; + std::unique_ptr producer_channel; ConcurrentBoundedQueue payloads; UInt64 delivery_tag = 0; std::atomic wait_all = true; - std::atomic wait_num = 0, last_processed = 0; + std::atomic wait_num = 0; + std::set delivery_tags_record; + std::mutex mutex; + std::function remove_confirmed_tag; Poco::Logger * log; const std::optional delim;