Allow separate vhost for each table

This commit is contained in:
kssenii 2021-04-21 15:51:05 +00:00
parent 3604aa92f8
commit 2e6c753add
3 changed files with 27 additions and 1 deletions

View File

@ -24,6 +24,7 @@ namespace DB
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \ M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \ M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \ M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \
#define LIST_OF_RABBITMQ_SETTINGS(M) \ #define LIST_OF_RABBITMQ_SETTINGS(M) \
RABBITMQ_RELATED_SETTINGS(M) \ RABBITMQ_RELATED_SETTINGS(M) \

View File

@ -94,7 +94,7 @@ StorageRabbitMQ::StorageRabbitMQ(
, login_password(std::make_pair( , login_password(std::make_pair(
getContext()->getConfigRef().getString("rabbitmq.username"), getContext()->getConfigRef().getString("rabbitmq.username"),
getContext()->getConfigRef().getString("rabbitmq.password"))) getContext()->getConfigRef().getString("rabbitmq.password")))
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", "/")) , vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value))
, semaphore(0, num_consumers) , semaphore(0, num_consumers)
, unique_strbase(getRandomName()) , unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize()))) , queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
@ -988,6 +988,8 @@ void registerStorageRabbitMQ(StorageFactory & factory)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size) CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms) CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_vhost)
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT #undef CHECK_RABBITMQ_STORAGE_ARGUMENT
return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings)); return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings));

View File

@ -1971,6 +1971,29 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
assert(result == expected) assert(result == expected)
@pytest.mark.timeout(120)
def test_rabbitmq_vhost(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_vhost (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'vhost',
rabbitmq_format = 'JSONEachRow',
rabbitmq_vhost = '/'
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_publish(exchange='vhost', routing_key='', body=json.dumps({'key': 1, 'value': 2}))
connection.close()
while True:
result = instance.query('SELECT * FROM test.rabbitmq_vhost ORDER BY key', ignore_error=True)
if result == "1\t2\n":
break
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")