From 806fd2739567562f62fae565fea980bdcaea051b Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 1 Jun 2020 20:48:24 +0000 Subject: [PATCH] Fix build & fix style & fix --- src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp | 2 +- src/Storages/RabbitMQ/RabbitMQHandler.cpp | 3 ++- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 2 +- .../RabbitMQ/WriteBufferToRabbitMQProducer.cpp | 13 +++++++------ 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp index 3f940891c23..0858e2101df 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp @@ -10,7 +10,7 @@ namespace DB namespace ErrorCodes { - extern int CANNOT_CREATE_IO_BUFFER; + extern const int CANNOT_CREATE_IO_BUFFER; } diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 547851f349a..6308e2e0089 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -15,7 +15,8 @@ RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) : void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message) { LOG_ERROR(log, "Library error report: {}", message); - if (!connection->ready()) + + if (!connection->usable() || !connection->ready()) { std::cerr << "Connection lost, no recovery is possible"; throw; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index ee5dede5261..147d3ba2115 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -72,7 +72,7 @@ StorageRabbitMQ::StorageRabbitMQ( , num_consumers(num_consumers_) , num_queues(num_queues_) , hash_exchange(hash_exchange_) - , log(&Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) + , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) , semaphore(0, num_consumers_) , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , evbase(event_base_new()) diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 12d6c2b0fb8..73434bc0ea6 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -15,7 +16,7 @@ enum { Connection_setup_sleep = 200, Connection_setup_retries_max = 1000, - Buffer_limit_to_flush = 10000 /// It is important to keep it low in order not to kill consumers + Buffer_limit_to_flush = 5000 /// It is important to keep it low in order not to kill consumers }; WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( @@ -44,8 +45,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( , connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/")) { /* The reason behind making a separate connection for each concurrent producer is explained here: - * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - * - publishing from different threads (as outputStreams are asynchronous) leads to internal libary errors. + * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from + * different threads (as outputStreams are asynchronous) with the same connection leads to internal libary errors. */ size_t cnt_retries = 0; while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max) @@ -107,9 +108,9 @@ void WriteBufferToRabbitMQProducer::count_row() void WriteBufferToRabbitMQProducer::flush() { - /* Why accumulating payloads and not publishing each of them at once in count_row()? Because publishing needs to - * be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it each time - * we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to + /* The reason for accumulating payloads and not publishing each of them at once in count_row() is that publishing + * needs to be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it + * each time we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to * exchange becoming inactive at some point and part of messages is lost as a result. */ std::atomic exchange_declared = false, exchange_error = false;