Tiny changes

This commit is contained in:
kssenii 2021-05-13 09:39:57 +00:00
parent 0cb0bd48f7
commit 7115045317
4 changed files with 16 additions and 21 deletions

View File

@ -18,14 +18,6 @@ namespace Loop
}
class RabbitMQChannel : public AMQP::TcpChannel
{
public:
RabbitMQChannel(AMQP::TcpConnection * connection) : TcpChannel(connection) {}
~RabbitMQChannel() override { close(); }
};
class RabbitMQHandler : public AMQP::LibUvHandler
{

View File

@ -271,7 +271,7 @@ void StorageRabbitMQ::initRabbitMQ()
return;
}
RabbitMQChannel rabbit_channel(connection.get());
AMQP::TcpChannel rabbit_channel(connection.get());
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
@ -283,10 +283,11 @@ void StorageRabbitMQ::initRabbitMQ()
LOG_TRACE(log, "RabbitMQ setup completed");
rabbit_is_ready = true;
rabbit_channel.close();
}
void StorageRabbitMQ::initExchange(RabbitMQChannel & rabbit_channel)
void StorageRabbitMQ::initExchange(AMQP::TcpChannel & rabbit_channel)
{
/// Exchange hierarchy:
/// 1. Main exchange (defined with table settings - rabbitmq_exchange_name, rabbitmq_exchange_type).
@ -357,7 +358,7 @@ void StorageRabbitMQ::initExchange(RabbitMQChannel & rabbit_channel)
}
void StorageRabbitMQ::bindExchange(RabbitMQChannel & rabbit_channel)
void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
{
size_t bound_keys = 0;
@ -418,7 +419,7 @@ void StorageRabbitMQ::bindExchange(RabbitMQChannel & rabbit_channel)
}
void StorageRabbitMQ::bindQueue(size_t queue_id, RabbitMQChannel & rabbit_channel)
void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel)
{
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
@ -574,7 +575,7 @@ void StorageRabbitMQ::unbindExchange()
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
RabbitMQChannel rabbit_channel(connection.get());
AMQP::TcpChannel rabbit_channel(connection.get());
rabbit_channel.removeExchange(bridge_exchange)
.onSuccess([&]()
{
@ -589,6 +590,7 @@ void StorageRabbitMQ::unbindExchange()
{
event_handler->iterateLoop();
}
rabbit_channel.close();
});
}
@ -720,7 +722,7 @@ void StorageRabbitMQ::cleanupRabbitMQ() const
if (use_user_setup)
return;
RabbitMQChannel rabbit_channel(connection.get());
AMQP::TcpChannel rabbit_channel(connection.get());
for (const auto & queue : queues)
{
/// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping
@ -740,6 +742,7 @@ void StorageRabbitMQ::cleanupRabbitMQ() const
});
}
event_handler->startBlockingLoop();
rabbit_channel.close();
/// Also there is no need to cleanup exchanges as they were created with AMQP::autodelete option. Once queues
/// are removed, exchanges will also be cleaned.

View File

@ -159,9 +159,9 @@ private:
void initRabbitMQ();
void cleanupRabbitMQ() const;
void initExchange(RabbitMQChannel & rabbit_channel);
void bindExchange(RabbitMQChannel & rabbit_channel);
void bindQueue(size_t queue_id, RabbitMQChannel & rabbit_channel);
void initExchange(AMQP::TcpChannel & rabbit_channel);
void bindExchange(AMQP::TcpChannel & rabbit_channel);
void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel);
bool restoreConnection(bool reconnecting);
bool streamToViews();

View File

@ -1980,7 +1980,7 @@ def test_rabbitmq_drop_table_properly(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'drop',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue'
rabbitmq_queue_base = 'rabbit_queue_drop'
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
@ -1994,14 +1994,14 @@ def test_rabbitmq_drop_table_properly(rabbitmq_cluster):
if result == "1\t2\n":
break
exists = channel.queue_declare(queue='rabbit_queue', passive=True)
exists = channel.queue_declare(queue='rabbit_queue_drop', passive=True)
assert(exists)
instance.query("DROP TABLE test.rabbitmq_drop")
time.sleep(30)
try:
exists = channel.queue_declare(callback, queue='rabbit_queue', passive=True)
exists = channel.queue_declare(callback, queue='rabbit_queue_drop', passive=True)
except Exception as e:
exists = False
@ -2016,7 +2016,7 @@ def test_rabbitmq_queue_settings(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'rabbit_exchange',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue',
rabbitmq_queue_base = 'rabbit_queue_settings',
rabbitmq_queue_settings_list = 'x-max-length=10,x-overflow=reject-publish'
''')