From 3c22479961cef377f8a51397d68095ccdc72251a Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 25 Jun 2020 18:36:49 +0300 Subject: [PATCH] Experiments --- src/Storages/RabbitMQ/RabbitMQHandler.cpp | 12 ++++++++---- .../RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp | 10 ++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index cf2614d4de0..4cd7d914125 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -39,11 +39,15 @@ void RabbitMQHandler::startConsumerLoop(std::atomic & loop_started) /* The object of this class is shared between concurrent consumers (who share the same connection == share the same * event loop and handler). But the loop should not be attempted to start if it is already running. */ - std::lock_guard lock(mutex_before_event_loop); - loop_started.store(true); - stop_scheduled = false; + bool expected = false; + if (loop_started.compare_exchange_strong(expected, true)) + { + std::lock_guard lock(mutex_before_event_loop); + stop_scheduled = false; - uv_run(loop, UV_RUN_NOWAIT); + uv_run(loop, UV_RUN_NOWAIT); + loop_started.store(false); + } } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index da4bfc24a2d..f5d0651401c 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -372,7 +372,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) * executing all callbacks on the connection (not only its own), then there should be some point to unblock. * loop_started == 1 if current consumer is started the loop and not another. */ - if (!loop_started.load() && !event_handler.checkStopIsScheduled()) + if (!event_handler.checkStopIsScheduled()) { stopEventLoopWithTimeout(); } @@ -442,17 +442,15 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl() { /// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread. startEventLoop(loop_started); - loop_started.store(false); } + /// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback. + std::lock_guard lock(mutex); + if (received.empty()) return false; messages.clear(); - - /// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback. - std::lock_guard lock(mutex); - messages.swap(received); current = messages.begin(); }