mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
Merge pull request #23452 from kssenii/rabbitmq-allow-multiple-hosts
Allow rabbitmq vhost in table settings
This commit is contained in:
commit
566db34cca
@ -24,6 +24,7 @@ namespace DB
|
||||
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
|
||||
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
|
||||
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \
|
||||
M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \
|
||||
|
||||
#define LIST_OF_RABBITMQ_SETTINGS(M) \
|
||||
RABBITMQ_RELATED_SETTINGS(M) \
|
||||
|
@ -94,7 +94,7 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
, login_password(std::make_pair(
|
||||
getContext()->getConfigRef().getString("rabbitmq.username"),
|
||||
getContext()->getConfigRef().getString("rabbitmq.password")))
|
||||
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", "/"))
|
||||
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value))
|
||||
, semaphore(0, num_consumers)
|
||||
, unique_strbase(getRandomName())
|
||||
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
|
||||
@ -988,6 +988,8 @@ void registerStorageRabbitMQ(StorageFactory & factory)
|
||||
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_max_block_size)
|
||||
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_flush_interval_ms)
|
||||
|
||||
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_vhost)
|
||||
|
||||
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT
|
||||
|
||||
return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings));
|
||||
|
@ -1971,6 +1971,29 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
|
||||
assert(result == expected)
|
||||
|
||||
|
||||
@pytest.mark.timeout(120)
|
||||
def test_rabbitmq_vhost(rabbitmq_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.rabbitmq_vhost (key UInt64, value UInt64)
|
||||
ENGINE = RabbitMQ
|
||||
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
|
||||
rabbitmq_exchange_name = 'vhost',
|
||||
rabbitmq_format = 'JSONEachRow',
|
||||
rabbitmq_vhost = '/'
|
||||
''')
|
||||
|
||||
credentials = pika.PlainCredentials('root', 'clickhouse')
|
||||
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
channel.basic_publish(exchange='vhost', routing_key='', body=json.dumps({'key': 1, 'value': 2}))
|
||||
connection.close()
|
||||
while True:
|
||||
result = instance.query('SELECT * FROM test.rabbitmq_vhost ORDER BY key', ignore_error=True)
|
||||
if result == "1\t2\n":
|
||||
break
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
Loading…
Reference in New Issue
Block a user