WriteBufferToRabbitMQProduced fixed queue usage

This commit is contained in:
Maksim Kita 2021-10-13 20:10:41 +03:00
parent 35f2d914dc
commit 659a46a972

View File

@ -197,8 +197,13 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
/* It is important to make sure that delivery_record.size() is never bigger than returned.size(), i.e. number if unacknowledged
* messages cannot exceed returned.size(), because they all might end up there
*/
while (messages.pop(payload) && producer_channel->usable() && delivery_record.size() < RETURNED_LIMIT)
while (!messages.empty() && producer_channel->usable() && delivery_record.size() < RETURNED_LIMIT)
{
bool pop_result = messages.pop(payload);
if (!pop_result)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not pop payload");
AMQP::Envelope envelope(payload.second.data(), payload.second.size());
/// if headers exchange is used, routing keys are added here via headers, if not - it is just empty