Merge pull request #28934 from vitlibar/allow-macros-in-rabbitmq-settings-test

Add test for expanding macros in RabbitMQ settings.
This commit is contained in:
Vitaly Baranov 2021-10-11 08:32:24 +03:00 committed by GitHub
commit e74a4a8b6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 54 deletions

View File

@ -0,0 +1,8 @@
<yandex>
<macros>
<rabbitmq_host>rabbitmq1</rabbitmq_host>
<rabbitmq_port>5672</rabbitmq_port>
<rabbitmq_exchange_name>macro</rabbitmq_exchange_name>
<rabbitmq_format>JSONEachRow</rabbitmq_format>
</macros>
</yandex>

View File

@ -18,7 +18,7 @@ from . import rabbitmq_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/rabbitmq.xml'],
main_configs=['configs/rabbitmq.xml', 'configs/macros.xml'],
with_rabbitmq=True)
@ -65,7 +65,8 @@ def rabbitmq_cluster():
def rabbitmq_setup_teardown():
print("RabbitMQ is available - running test")
yield # run test
instance.query('DROP TABLE IF EXISTS test.rabbitmq')
for table_name in ['view', 'consumer', 'rabbitmq']:
instance.query(f'DROP TABLE IF EXISTS test.{table_name}')
# Tests
@ -195,8 +196,6 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -233,10 +232,39 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
def test_rabbitmq_macros(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_host}:{rabbitmq_port}',
rabbitmq_exchange_name = '{rabbitmq_exchange_name}',
rabbitmq_format = '{rabbitmq_format}'
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
message = ''
for i in range(50):
message += json.dumps({'key': i, 'value': i}) + '\n'
channel.basic_publish(exchange='macro', routing_key='', body=message)
connection.close()
time.sleep(1)
result = ''
while True:
result += instance.query('SELECT * FROM test.rabbitmq ORDER BY key', ignore_error=True)
if rabbitmq_check_result(result):
break
rabbitmq_check_result(result, True)
def test_rabbitmq_materialized_view(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -266,19 +294,12 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
if (rabbitmq_check_result(result)):
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
connection.close()
rabbitmq_check_result(result, True)
def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -308,11 +329,6 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
if rabbitmq_check_result(result):
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
connection.close()
rabbitmq_check_result(result, True)
@ -373,8 +389,6 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
@pytest.mark.skip(reason="clichouse_path with rabbitmq.proto fails to be exported")
def test_rabbitmq_protobuf(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value String)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -426,11 +440,6 @@ def test_rabbitmq_protobuf(rabbitmq_cluster):
if rabbitmq_check_result(result):
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
rabbitmq_check_result(result, True)
@ -446,8 +455,6 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
channel = connection.channel()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq (key UInt64, value String)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -469,10 +476,6 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
break
connection.close()
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
assert int(result) == rabbitmq_messages * batch_messages, 'ClickHouse lost some messages: {}'.format(result)
@ -490,8 +493,6 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
rabbitmq_num_consumers = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
ENGINE = MergeTree
ORDER BY key
@ -1367,7 +1368,6 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
def test_rabbitmq_virtual_columns(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
CREATE TABLE test.rabbitmq_virtuals (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -1428,8 +1428,6 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster):
def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.rabbitmq_virtuals_mv (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -1575,8 +1573,6 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.consume;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
@ -1634,8 +1630,6 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.consume;
DROP TABLE test.producer_reconnect;
''')
@ -1672,8 +1666,6 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
properties=pika.BasicProperties(delivery_mode=2, message_id=str(msg_id)))
connection.close()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
@ -1712,9 +1704,6 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
DROP TABLE IF EXISTS test.rabbitmq;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
@ -1803,8 +1792,6 @@ def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster):
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = '5',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
@ -1868,8 +1855,6 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster):
break;
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (
id String, date DateTime
) ENGINE = MergeTree ORDER BY id;
@ -1971,8 +1956,6 @@ def test_rabbitmq_queue_settings(rabbitmq_cluster):
connection.close()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
@ -2025,9 +2008,6 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster):
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'rabbit_queue',
rabbitmq_queue_consume = 1;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS