This commit is contained in:
kssenii 2020-06-01 16:56:16 +00:00
parent 5939422b85
commit 386dc4d95e
3 changed files with 3 additions and 8 deletions

View File

@ -44,7 +44,6 @@ void RabbitMQHandler::start(std::atomic<bool> & check_param)
void RabbitMQHandler::stop()
{
std::lock_guard lock(mutex);
event_base_loopbreak(evbase);
}

View File

@ -207,9 +207,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
if (row_delimiter != '\0')
message_received += row_delimiter;
/* Needed because this vector can be used at the same time by another thread in nextImpl() (below).
* So we lock mutex here and there so that they do not use it asynchronosly.
*/
/// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl() (below).
std::lock_guard lock(mutex);
received.push_back(message_received);
}
@ -255,9 +253,7 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
messages.clear();
/* Needed because this vector can be used at the same time by another thread in onReceived callback (above).
* So we lock mutex here and there so that they do not use it asynchronosly.
*/
/// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback (above).
std::lock_guard lock(mutex);
messages.swap(received);

View File

@ -15,7 +15,7 @@ enum
{
Connection_setup_sleep = 200,
Connection_setup_retries_max = 1000,
Buffer_limit_to_flush = 50000
Buffer_limit_to_flush = 10000 /// It is important to keep it low in order not to kill consumers
};
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(