Fix producer

This commit is contained in:
kssenii 2020-06-04 06:14:09 +00:00
parent 786874e867
commit 5624066195
4 changed files with 8 additions and 50 deletions

View File

@ -46,6 +46,8 @@ void RabbitMQBlockOutputStream::write(const Block & block)
if (buffer) if (buffer)
buffer->flush(); buffer->flush();
storage.pingConnection();
} }

View File

@ -54,6 +54,8 @@ public:
const String & getFormatName() const { return format_name; } const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;
const void pingConnection() { connection.heartbeat(); }
protected: protected:
StorageRabbitMQ( StorageRabbitMQ(
const StorageID & table_id_, const StorageID & table_id_,
@ -88,7 +90,7 @@ private:
event_base * evbase; event_base * evbase;
RabbitMQHandler eventHandler; RabbitMQHandler eventHandler;
AMQP::TcpConnection connection; AMQP::TcpConnection connection; /// Connection for all consumers
Poco::Semaphore semaphore; Poco::Semaphore semaphore;
std::mutex mutex; std::mutex mutex;

View File

@ -16,7 +16,7 @@ enum
{ {
Connection_setup_sleep = 200, Connection_setup_sleep = 200,
Connection_setup_retries_max = 1000, Connection_setup_retries_max = 1000,
Buffer_limit_to_flush = 5000 /// It is important to keep it low in order not to kill consumers Buffer_limit_to_flush = 10000 /// It is important to keep it low in order not to kill consumers
}; };
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
@ -113,38 +113,11 @@ void WriteBufferToRabbitMQProducer::flush()
/* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name /* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name
* and makes it visible from current producer_channel. * and makes it visible from current producer_channel.
*/ */
producer_channel->declareExchange(exchange_name + "_direct", AMQP::direct, AMQP::passive) producer_channel->declareExchange(exchange_name + "_direct", AMQP::direct, AMQP::passive)
.onSuccess([&]() .onSuccess([&]()
{ {
exchange_declared = true; exchange_declared = true;
/// The case that should not normally happen: message was not delivered to queue (queue ttl exceeded) / not forwareded to consumer
if (flush_returned)
{
/// Needed to avoid data race because two different threads may access this vector
std::lock_guard lock(mutex);
LOG_TRACE(log, "Redelivering returned messages");
for (auto & payload : returned)
{
next_queue = next_queue % num_queues + 1;
if (bind_by_id || hash_exchange)
{
producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
}
else
{
producer_channel->publish(exchange_name, routing_key, payload);
}
--message_counter;
}
returned.clear();
}
/* The reason for accumulating payloads and not publishing each of them at once in count_row() is that publishing /* The reason for accumulating payloads and not publishing each of them at once in count_row() is that publishing
* needs to be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it * needs to be wrapped inside declareExchange() callback and it is too expensive in terms of time to declare it
* each time we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to * each time we publish. Declaring it once and then publishing without wrapping inside onSuccess callback leads to
@ -159,27 +132,11 @@ void WriteBufferToRabbitMQProducer::flush()
if (bind_by_id || hash_exchange) if (bind_by_id || hash_exchange)
{ {
producer_channel->publish(exchange_name, std::to_string(next_queue), payload, AMQP::mandatory || AMQP::immediate) producer_channel->publish(exchange_name, std::to_string(next_queue), payload);
.onReturned([&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */)
{
flush_returned = true;
/// Needed to avoid data race because two different threads may access this variable
std::lock_guard lock(mutex);
returned.emplace_back(std::string(message.body(), message.body() + message.bodySize()));
});
} }
else else
{ {
producer_channel->publish(exchange_name, routing_key, payload, AMQP::mandatory || AMQP::immediate) producer_channel->publish(exchange_name, routing_key, payload);
.onReturned([&](const AMQP::Message & message, int16_t /* code */, const std::string & /* description */)
{
flush_returned = true;
/// Needed to avoid data race because two different threads may access this vector
std::lock_guard lock(mutex);
returned.emplace_back(std::string(message.body(), message.body() + message.bodySize()));
});
} }
--message_counter; --message_counter;

View File

@ -54,11 +54,8 @@ private:
size_t next_queue = 0; size_t next_queue = 0;
UInt64 message_counter = 0; UInt64 message_counter = 0;
String channel_id; String channel_id;
std::atomic<bool> flush_returned = false;
std::mutex mutex;
Messages messages; Messages messages;
Messages returned;
Poco::Logger * log; Poco::Logger * log;
const std::optional<char> delim; const std::optional<char> delim;