ClickHouse/src/Storages/RabbitMQ/RabbitMQHandler.cpp

71 lines
1.8 KiB
C++
Raw Normal View History

#include <common/logger_useful.h>
2020-06-29 15:41:17 +00:00
#include <Common/Exception.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
namespace DB
{
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler).
*/
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
AMQP::LibUvHandler(loop_),
loop(loop_),
2020-08-08 16:45:52 +00:00
log(log_),
connection_running(false),
2020-08-28 08:52:02 +00:00
loop_running(false),
2020-08-08 16:45:52 +00:00
loop_state(Loop::STOP)
{
}
///Method that is called when the connection ends up in an error state.
2020-08-28 08:52:02 +00:00
void RabbitMQHandler::onError(AMQP::TcpConnection * /* connection */, const char * message)
{
2020-05-26 20:43:20 +00:00
LOG_ERROR(log, "Library error report: {}", message);
2020-08-15 06:50:53 +00:00
connection_running.store(false);
}
void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
{
2020-08-08 16:45:52 +00:00
LOG_TRACE(log, "Connection is ready");
2020-08-15 06:50:53 +00:00
connection_running.store(true);
2020-08-08 16:45:52 +00:00
loop_state.store(Loop::RUN);
}
2020-07-02 16:44:04 +00:00
void RabbitMQHandler::startLoop()
{
2020-07-02 16:44:04 +00:00
std::lock_guard lock(startup_mutex);
2020-08-28 08:52:02 +00:00
2020-08-26 08:54:29 +00:00
LOG_DEBUG(log, "Background loop started");
2020-08-28 08:52:02 +00:00
loop_running.store(true);
2020-08-08 16:45:52 +00:00
while (loop_state.load() == Loop::RUN)
uv_run(loop, UV_RUN_NOWAIT);
2020-08-26 08:54:29 +00:00
LOG_DEBUG(log, "Background loop ended");
2020-08-28 08:52:02 +00:00
loop_running.store(false);
}
2020-07-02 16:44:04 +00:00
void RabbitMQHandler::iterateLoop()
{
2020-07-02 16:44:04 +00:00
std::unique_lock lock(startup_mutex, std::defer_lock);
if (lock.try_lock())
2020-06-29 15:41:17 +00:00
uv_run(loop, UV_RUN_NOWAIT);
2020-06-07 11:14:05 +00:00
}
2021-05-04 19:57:45 +00:00
/// Do not need synchronization as in iterateLoop(), because this method is used only for
/// initial RabbitMQ setup - at this point there is no background loop thread.
2021-05-05 07:52:21 +00:00
void RabbitMQHandler::startBlockingLoop()
2021-05-04 16:26:47 +00:00
{
LOG_DEBUG(log, "Started blocking loop.");
2021-05-05 07:52:21 +00:00
uv_run(loop, UV_RUN_DEFAULT);
2021-05-04 16:26:47 +00:00
}
void RabbitMQHandler::stopLoop()
{
LOG_DEBUG(log, "Implicit loop stop.");
2021-05-04 16:26:47 +00:00
uv_stop(loop);
}
}