mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-17 14:00:51 +00:00
Experiments
This commit is contained in:
parent
36eb2c3028
commit
3c22479961
@ -39,11 +39,15 @@ void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
|
|||||||
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
|
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
|
||||||
* event loop and handler). But the loop should not be attempted to start if it is already running.
|
* event loop and handler). But the loop should not be attempted to start if it is already running.
|
||||||
*/
|
*/
|
||||||
|
bool expected = false;
|
||||||
|
if (loop_started.compare_exchange_strong(expected, true))
|
||||||
|
{
|
||||||
std::lock_guard lock(mutex_before_event_loop);
|
std::lock_guard lock(mutex_before_event_loop);
|
||||||
loop_started.store(true);
|
|
||||||
stop_scheduled = false;
|
stop_scheduled = false;
|
||||||
|
|
||||||
uv_run(loop, UV_RUN_NOWAIT);
|
uv_run(loop, UV_RUN_NOWAIT);
|
||||||
|
loop_started.store(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -372,7 +372,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
|
|||||||
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
|
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
|
||||||
* loop_started == 1 if current consumer is started the loop and not another.
|
* loop_started == 1 if current consumer is started the loop and not another.
|
||||||
*/
|
*/
|
||||||
if (!loop_started.load() && !event_handler.checkStopIsScheduled())
|
if (!event_handler.checkStopIsScheduled())
|
||||||
{
|
{
|
||||||
stopEventLoopWithTimeout();
|
stopEventLoopWithTimeout();
|
||||||
}
|
}
|
||||||
@ -442,17 +442,15 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
|
|||||||
{
|
{
|
||||||
/// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread.
|
/// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread.
|
||||||
startEventLoop(loop_started);
|
startEventLoop(loop_started);
|
||||||
loop_started.store(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback.
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
if (received.empty())
|
if (received.empty())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
messages.clear();
|
messages.clear();
|
||||||
|
|
||||||
/// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback.
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
|
|
||||||
messages.swap(received);
|
messages.swap(received);
|
||||||
current = messages.begin();
|
current = messages.begin();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user