diff --git a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp index 41b4cb4a1c8..0213139a469 100644 --- a/src/Storages/RabbitMQ/RabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/RabbitMQConsumer.cpp @@ -61,12 +61,13 @@ void RabbitMQConsumer::subscribe() { String message_received = std::string(message.body(), message.body() + message.bodySize()); + std::unique_lock lock(mutex); + if (!received.push({message_received, message.hasMessageID() ? message.messageID() : "", message.hasTimestamp() ? message.timestamp() : 0, redelivered, AckTracker(delivery_tag, channel_id)})) throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue"); - std::unique_lock lock(mutex); cv.notify_one(); } })