mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
57 lines
1.4 KiB
C++
57 lines
1.4 KiB
C++
#include <common/logger_useful.h>
|
|
#include <Common/Exception.h>
|
|
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
|
|
* event loop and handler).
|
|
*/
|
|
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
|
|
AMQP::LibUvHandler(loop_),
|
|
loop(loop_),
|
|
log(log_),
|
|
connection_running(false),
|
|
loop_running(false),
|
|
loop_state(Loop::STOP)
|
|
{
|
|
}
|
|
|
|
///Method that is called when the connection ends up in an error state.
|
|
void RabbitMQHandler::onError(AMQP::TcpConnection * /* connection */, const char * message)
|
|
{
|
|
LOG_ERROR(log, "Library error report: {}", message);
|
|
connection_running.store(false);
|
|
}
|
|
|
|
void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
|
|
{
|
|
LOG_TRACE(log, "Connection is ready");
|
|
connection_running.store(true);
|
|
loop_state.store(Loop::RUN);
|
|
}
|
|
|
|
void RabbitMQHandler::startLoop()
|
|
{
|
|
std::lock_guard lock(startup_mutex);
|
|
|
|
LOG_DEBUG(log, "Background loop started");
|
|
loop_running.store(true);
|
|
|
|
while (loop_state.load() == Loop::RUN)
|
|
uv_run(loop, UV_RUN_NOWAIT);
|
|
|
|
LOG_DEBUG(log, "Background loop ended");
|
|
loop_running.store(false);
|
|
}
|
|
|
|
void RabbitMQHandler::iterateLoop()
|
|
{
|
|
std::unique_lock lock(startup_mutex, std::defer_lock);
|
|
if (lock.try_lock())
|
|
uv_run(loop, UV_RUN_NOWAIT);
|
|
}
|
|
|
|
}
|