From 2e6c753addb8b4eebc95cc9d006060a9859e5ded Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 21 Apr 2021 15:51:05 +0000 Subject: [PATCH] Allow separate vhost for each table --- src/Storages/RabbitMQ/RabbitMQSettings.h | 1 + src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 4 +++- .../integration/test_storage_rabbitmq/test.py | 23 +++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index 66348d61424..c6725903898 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -24,6 +24,7 @@ namespace DB M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \ M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \ M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \ + M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \ #define LIST_OF_RABBITMQ_SETTINGS(M) \ RABBITMQ_RELATED_SETTINGS(M) \ diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 55629f2a205..525a08784be 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -94,7 +94,7 @@ StorageRabbitMQ::StorageRabbitMQ( , login_password(std::make_pair( getContext()->getConfigRef().getString("rabbitmq.username"), getContext()->getConfigRef().getString("rabbitmq.password"))) - , vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", "/")) + , vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value)) , semaphore(0, num_consumers) , unique_strbase(getRandomName()) , queue_size(std::max(QUEUE_SIZE, static_cast(getMaxBlockSize()))) @@ -988,6 +988,8 @@ void registerStorageRabbitMQ(StorageFactory & factory) CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size) CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms) + CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_vhost) + #undef CHECK_RABBITMQ_STORAGE_ARGUMENT return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings)); diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 50fcdd8d77e..cab7685d96c 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -1971,6 +1971,29 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster): assert(result == expected) +@pytest.mark.timeout(120) +def test_rabbitmq_vhost(rabbitmq_cluster): + instance.query(''' + CREATE TABLE test.rabbitmq_vhost (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'vhost', + rabbitmq_format = 'JSONEachRow', + rabbitmq_vhost = '/' + ''') + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.basic_publish(exchange='vhost', routing_key='', body=json.dumps({'key': 1, 'value': 2})) + connection.close() + while True: + result = instance.query('SELECT * FROM test.rabbitmq_vhost ORDER BY key', ignore_error=True) + if result == "1\t2\n": + break + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")