2020-05-20 09:40:49 +00:00
|
|
|
#include <common/logger_useful.h>
|
2020-06-29 15:41:17 +00:00
|
|
|
#include <Common/Exception.h>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2020-06-29 15:41:17 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int CANNOT_CONNECT_RABBITMQ;
|
|
|
|
}
|
2020-06-04 06:22:53 +00:00
|
|
|
|
2020-06-27 17:26:00 +00:00
|
|
|
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
|
|
|
|
* event loop and handler).
|
|
|
|
*/
|
2020-06-24 21:14:49 +00:00
|
|
|
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
|
|
|
|
AMQP::LibUvHandler(loop_),
|
|
|
|
loop(loop_),
|
2020-05-20 09:40:49 +00:00
|
|
|
log(log_)
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-05 14:27:56 +00:00
|
|
|
void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * message)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-05-26 20:43:20 +00:00
|
|
|
LOG_ERROR(log, "Library error report: {}", message);
|
2020-06-01 20:48:24 +00:00
|
|
|
|
2020-06-04 06:22:53 +00:00
|
|
|
if (!connection->usable() || !connection->ready())
|
2020-05-31 09:34:57 +00:00
|
|
|
{
|
2020-06-29 15:41:17 +00:00
|
|
|
throw Exception("Connection error", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
|
2020-05-31 09:34:57 +00:00
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-29 15:41:17 +00:00
|
|
|
void RabbitMQHandler::startBackgroundLoop()
|
2020-06-27 17:26:00 +00:00
|
|
|
{
|
2020-06-29 15:41:17 +00:00
|
|
|
/// stop_loop variable is updated in a separate thread
|
2020-06-30 01:48:11 +00:00
|
|
|
while (!stop_loop.load())
|
2020-06-27 17:26:00 +00:00
|
|
|
{
|
|
|
|
uv_run(loop, UV_RUN_NOWAIT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-29 15:41:17 +00:00
|
|
|
void RabbitMQHandler::startLoop()
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-06-29 15:41:17 +00:00
|
|
|
if (starting_loop.try_lock())
|
|
|
|
{
|
|
|
|
uv_run(loop, UV_RUN_NOWAIT);
|
|
|
|
starting_loop.unlock();
|
|
|
|
}
|
2020-06-07 11:14:05 +00:00
|
|
|
}
|
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|