#pragma once #include #include #include #include #include #include #include namespace DB { namespace Loop { static const UInt8 RUN = 1; static const UInt8 STOP = 2; } class RabbitMQChannel : public AMQP::TcpChannel { public: RabbitMQChannel(AMQP::TcpConnection * connection) : TcpChannel(connection) {} ~RabbitMQChannel() override { close(); } }; class RabbitMQHandler : public AMQP::LibUvHandler { public: RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_); void onError(AMQP::TcpConnection * connection, const char * message) override; void onReady(AMQP::TcpConnection * connection) override; void startLoop(); void iterateLoop(); bool connectionRunning() { return connection_running.load(); } bool loopRunning() { return loop_running.load(); } void updateLoopState(UInt8 state) { loop_state.store(state); } UInt8 getLoopState() { return loop_state.load(); } private: uv_loop_t * loop; Poco::Logger * log; std::atomic connection_running, loop_running; std::atomic loop_state; std::mutex startup_mutex; }; }