mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Better name
This commit is contained in:
parent
4ce975c512
commit
f6237dc334
@ -166,38 +166,38 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
|
||||
|
||||
bool ReadBufferFromRabbitMQConsumer::ackMessages()
|
||||
{
|
||||
AckTracker record = last_inserted_record;
|
||||
AckTracker record_info = last_inserted_record_info;
|
||||
|
||||
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
|
||||
* channel, so if channel fails, all previous delivery tags become invalid
|
||||
*/
|
||||
if (record.channel_id == channel_id && record.delivery_tag && record.delivery_tag > prev_tag)
|
||||
if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag)
|
||||
{
|
||||
/// Commit all received messages with delivery tags from last commited to last inserted
|
||||
if (!consumer_channel->ack(record.delivery_tag, AMQP::multiple))
|
||||
if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple))
|
||||
{
|
||||
LOG_ERROR(log, "Failed to commit messages with delivery tags from last commited to {} on channel {}",
|
||||
record.delivery_tag, channel_id);
|
||||
record_info.delivery_tag, channel_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
prev_tag = record.delivery_tag;
|
||||
LOG_TRACE(log, "Consumer commited messages with deliveryTags up to {} on channel {}", record.delivery_tag, channel_id);
|
||||
prev_tag = record_info.delivery_tag;
|
||||
LOG_TRACE(log, "Consumer commited messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record)
|
||||
void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record_info)
|
||||
{
|
||||
if (record.delivery_tag && channel_error.load())
|
||||
if (record_info.delivery_tag && channel_error.load())
|
||||
return;
|
||||
|
||||
if (!record.delivery_tag)
|
||||
if (!record_info.delivery_tag)
|
||||
prev_tag = 0;
|
||||
|
||||
last_inserted_record = record;
|
||||
last_inserted_record_info = record_info;
|
||||
}
|
||||
|
||||
|
||||
|
@ -107,7 +107,7 @@ private:
|
||||
MessageData current;
|
||||
size_t subscribed = 0;
|
||||
|
||||
AckTracker last_inserted_record;
|
||||
AckTracker last_inserted_record_info;
|
||||
UInt64 prev_tag = 0, channel_id_counter = 0;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user