mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Fix build & fix style & fix
This commit is contained in:
parent
d3b069e37b
commit
806fd27395
@ -10,7 +10,7 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern int CANNOT_CREATE_IO_BUFFER;
|
||||
extern const int CANNOT_CREATE_IO_BUFFER;
|
||||
}
|
||||
|
||||
|
||||
|
@ -15,7 +15,8 @@ RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) :
|
||||
void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message)
|
||||
{
|
||||
LOG_ERROR(log, "Library error report: {}", message);
|
||||
if (!connection->ready())
|
||||
|
||||
if (!connection->usable() || !connection->ready())
|
||||
{
|
||||
std::cerr << "Connection lost, no recovery is possible";
|
||||
throw;
|
||||
|
@ -72,7 +72,7 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
, num_consumers(num_consumers_)
|
||||
, num_queues(num_queues_)
|
||||
, hash_exchange(hash_exchange_)
|
||||
, log(&Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
|
||||
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
|
||||
, semaphore(0, num_consumers_)
|
||||
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
|
||||
, evbase(event_base_new())
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <amqpcpp.h>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -15,7 +16,7 @@ enum
|
||||
{
|
||||
Connection_setup_sleep = 200,
|
||||
Connection_setup_retries_max = 1000,
|
||||
Buffer_limit_to_flush = 10000 /// It is important to keep it low in order not to kill consumers
|
||||
Buffer_limit_to_flush = 5000 /// It is important to keep it low in order not to kill consumers
|
||||
};
|
||||
|
||||
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
@ -44,8 +45,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login("root", "clickhouse"), "/"))
|
||||
{
|
||||
/* The reason behind making a separate connection for each concurrent producer is explained here:
|
||||
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086
|
||||
* - publishing from different threads (as outputStreams are asynchronous) leads to internal libary errors.
|
||||
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
|
||||
* different threads (as outputStreams are asynchronous) with the same connection leads to internal libary errors.
|
||||
*/
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max)
|
||||
@ -107,9 +108,9 @@ void WriteBufferToRabbitMQProducer::count_row()
|
||||
|
||||
void WriteBufferToRabbitMQProducer::flush()
|
||||
{
|
||||
/* Why accumulating payloads and not publishing each of them at once in count_row()? Because publishing needs to
|
||||
* be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it each time
|
||||
* we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to
|
||||
/* The reason for accumulating payloads and not publishing each of them at once in count_row() is that publishing
|
||||
* needs to be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it
|
||||
* each time we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to
|
||||
* exchange becoming inactive at some point and part of messages is lost as a result.
|
||||
*/
|
||||
std::atomic<bool> exchange_declared = false, exchange_error = false;
|
||||
|
Loading…
Reference in New Issue
Block a user