mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #21138 from kssenii/rabbit-fix
rabbitmq: fix bug when late onReady callback causes duplicates
This commit is contained in:
commit
02b12cd347
@ -482,7 +482,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
|
||||
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
|
||||
* an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed
|
||||
*/
|
||||
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
|
||||
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
||||
event_handler->iterateLoop();
|
||||
|
||||
/// This will force immediate closure if not yet closed
|
||||
|
@ -97,7 +97,7 @@ WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
|
||||
connection->close();
|
||||
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
|
||||
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
@ -189,11 +189,12 @@ void WriteBufferToRabbitMQProducer::setupChannel()
|
||||
/// Delivery tags are scoped per channel.
|
||||
delivery_record.clear();
|
||||
delivery_tag = 0;
|
||||
producer_ready = false;
|
||||
});
|
||||
|
||||
producer_channel->onReady([&]()
|
||||
{
|
||||
channel_id = channel_id_base + std::to_string(channel_id_counter++);
|
||||
channel_id = channel_id_base + "_" + std::to_string(channel_id_counter++);
|
||||
LOG_DEBUG(log, "Producer's channel {} is ready", channel_id);
|
||||
|
||||
/* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails,
|
||||
@ -211,6 +212,7 @@ void WriteBufferToRabbitMQProducer::setupChannel()
|
||||
{
|
||||
removeRecord(nacked_delivery_tag, multiple, true);
|
||||
});
|
||||
producer_ready = true;
|
||||
});
|
||||
}
|
||||
|
||||
@ -218,30 +220,27 @@ void WriteBufferToRabbitMQProducer::setupChannel()
|
||||
void WriteBufferToRabbitMQProducer::removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish)
|
||||
{
|
||||
auto record_iter = delivery_record.find(received_delivery_tag);
|
||||
assert(record_iter != delivery_record.end());
|
||||
|
||||
if (record_iter != delivery_record.end())
|
||||
if (multiple)
|
||||
{
|
||||
if (multiple)
|
||||
{
|
||||
/// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack).
|
||||
++record_iter;
|
||||
/// If multiple is true, then all delivery tags up to and including current are confirmed (with ack or nack).
|
||||
++record_iter;
|
||||
|
||||
if (republish)
|
||||
for (auto record = delivery_record.begin(); record != record_iter; ++record)
|
||||
returned.tryPush(record->second);
|
||||
if (republish)
|
||||
for (auto record = delivery_record.begin(); record != record_iter; ++record)
|
||||
returned.tryPush(record->second);
|
||||
|
||||
/// Delete the records even in case when republished because new delivery tags will be assigned by the server.
|
||||
delivery_record.erase(delivery_record.begin(), record_iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (republish)
|
||||
returned.tryPush(record_iter->second);
|
||||
|
||||
delivery_record.erase(record_iter);
|
||||
}
|
||||
/// Delete the records even in case when republished because new delivery tags will be assigned by the server.
|
||||
delivery_record.erase(delivery_record.begin(), record_iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (republish)
|
||||
returned.tryPush(record_iter->second);
|
||||
|
||||
delivery_record.erase(record_iter);
|
||||
}
|
||||
/// else is theoretically not possible
|
||||
}
|
||||
|
||||
|
||||
@ -308,13 +307,18 @@ void WriteBufferToRabbitMQProducer::writingFunc()
|
||||
{
|
||||
while ((!payloads.empty() || wait_all) && wait_confirm.load())
|
||||
{
|
||||
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished
|
||||
* as fast as possible and no new publishes are made before returned messages are handled
|
||||
*/
|
||||
if (!returned.empty() && producer_channel->usable())
|
||||
publish(returned, true);
|
||||
else if (!payloads.empty() && producer_channel->usable())
|
||||
publish(payloads, false);
|
||||
/// If onReady callback is not received, producer->usable() will anyway return true,
|
||||
/// but must publish only after onReady callback.
|
||||
if (producer_ready)
|
||||
{
|
||||
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned messages are republished
|
||||
* as fast as possible and no new publishes are made before returned messages are handled
|
||||
*/
|
||||
if (!returned.empty() && producer_channel->usable())
|
||||
publish(returned, true);
|
||||
else if (!payloads.empty() && producer_channel->usable())
|
||||
publish(payloads, false);
|
||||
}
|
||||
|
||||
iterateEventLoop();
|
||||
|
||||
|
@ -73,6 +73,7 @@ private:
|
||||
std::unique_ptr<RabbitMQHandler> event_handler;
|
||||
std::unique_ptr<AMQP::TcpConnection> connection;
|
||||
std::unique_ptr<AMQP::TcpChannel> producer_channel;
|
||||
bool producer_ready = false;
|
||||
|
||||
/// Channel errors lead to channel closure, need to count number of recreated channels to update channel id
|
||||
UInt64 channel_id_counter = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user