import os.path as p import random import threading import time import pytest from random import randrange import pika from sys import getdefaultencoding from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV from helpers.client import QueryRuntimeException from helpers.network import PartitionManager import json import subprocess from google.protobuf.internal.encoder import _VarintBytes cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance', config_dir='configs', main_configs=['configs/rabbitmq.xml','configs/log_conf.xml'], with_rabbitmq=True) rabbitmq_id = '' # Helpers def check_rabbitmq_is_available(): p = subprocess.Popen(('docker', 'exec', '-i', rabbitmq_id, 'rabbitmqctl', 'await_startup'), stdout=subprocess.PIPE) p.communicate() return p.returncode == 0 def enable_consistent_hash_plugin(): p = subprocess.Popen(('docker', 'exec', '-i', rabbitmq_id, "rabbitmq-plugins", "enable", "rabbitmq_consistent_hash_exchange"), stdout=subprocess.PIPE) p.communicate() return p.returncode == 0 def wait_rabbitmq_is_available(max_retries=50): retries = 0 while True: if check_rabbitmq_is_available(): break else: retries += 1 if retries > max_retries: raise "RabbitMQ is not available" print("Waiting for RabbitMQ to start up") time.sleep(1) def wait_rabbitmq_plugin_enabled(max_retries=50): retries = 0 while True: if enable_consistent_hash_plugin(): break else: retries += 1 if retries > max_retries: raise "RabbitMQ plugin is not available" print("Waiting for plugin") time.sleep(1) def rabbitmq_check_result(result, check=False, ref_file='test_rabbitmq_json.reference'): fpath = p.join(p.dirname(__file__), ref_file) with open(fpath) as reference: if check: assert TSV(result) == TSV(reference) else: return TSV(result) == TSV(reference) # Fixtures @pytest.fixture(scope="module") def rabbitmq_cluster(): try: global rabbitmq_id cluster.start() rabbitmq_id = instance.cluster.rabbitmq_docker_id print("rabbitmq_id is {}".format(rabbitmq_id)) instance.query('CREATE DATABASE test') yield cluster finally: cluster.shutdown() @pytest.fixture(autouse=True) def rabbitmq_setup_teardown(): wait_rabbitmq_is_available() wait_rabbitmq_plugin_enabled() print("RabbitMQ is available - running test") yield # run test instance.query('DROP TABLE IF EXISTS test.rabbitmq') # Tests @pytest.mark.timeout(180) def test_rabbitmq_select_from_new_syntax_table(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'new', rabbitmq_exchange_name = 'clickhouse-exchange', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(25): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='new', body=message) messages = [] for i in range(25, 50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='new', body=message) connection.close() result = '' while True: result += instance.query('SELECT * FROM test.rabbitmq', ignore_error=True) if rabbitmq_check_result(result): break rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_select_from_old_syntax_table(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ('rabbitmq1:5672', 'old', 'clickhouse-exchange', 'JSONEachRow', '\\n'); ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='old', body=message) connection.close() result = '' while True: result += instance.query('SELECT * FROM test.rabbitmq', ignore_error=True) if rabbitmq_check_result(result): break rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_select_empty(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'empty', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; ''') assert int(instance.query('SELECT count() FROM test.rabbitmq')) == 0 @pytest.mark.timeout(180) def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'json', rabbitmq_exchange_name = 'clickhouse-exchange', rabbitmq_format = 'JSONEachRow' ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' all_messages = [messages] for message in all_messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='json', body=message) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' all_messages = [messages] for message in all_messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='json', body=message) result = '' while True: result += instance.query('SELECT * FROM test.rabbitmq', ignore_error=True) if rabbitmq_check_result(result): break connection.close() rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'csv', rabbitmq_exchange_name = 'clickhouse-exchange', rabbitmq_format = 'CSV', rabbitmq_row_delimiter = '\\n'; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(50): messages.append('{i}, {i}'.format(i=i)) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='csv', body=message) result = '' while True: result += instance.query('SELECT * FROM test.rabbitmq', ignore_error=True) if rabbitmq_check_result(result): break connection.close() rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'tsv', rabbitmq_exchange_name = 'clickhouse-exchange', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(50): messages.append('{i}\t{i}'.format(i=i)) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='tsv', body=message) result = '' while True: result += instance.query('SELECT * FROM test.rabbitmq', ignore_error=True) if rabbitmq_check_result(result): break connection.close() rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_materialized_view(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'mv', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.rabbitmq; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='mv', body=message) while True: result = instance.query('SELECT * FROM test.view') if (rabbitmq_check_result(result)): break; instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') connection.close() rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'mvsq', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM (SELECT * FROM test.rabbitmq); ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='mvsq', body=message) while True: result = instance.query('SELECT * FROM test.view') if rabbitmq_check_result(result): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') connection.close(); rabbitmq_check_result(result, True) @pytest.mark.timeout(180) def test_rabbitmq_many_materialized_views(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view1; DROP TABLE IF EXISTS test.view2; DROP TABLE IF EXISTS test.consumer1; DROP TABLE IF EXISTS test.consumer2; CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'mmv', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view1 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE TABLE test.view2 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS SELECT * FROM test.rabbitmq; CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS SELECT * FROM test.rabbitmq; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='mmv', body=message) while True: result1 = instance.query('SELECT * FROM test.view1') result2 = instance.query('SELECT * FROM test.view2') if rabbitmq_check_result(result1) and rabbitmq_check_result(result2): break instance.query(''' DROP TABLE test.consumer1; DROP TABLE test.consumer2; DROP TABLE test.view1; DROP TABLE test.view2; ''') rabbitmq_check_result(result1, True) rabbitmq_check_result(result2, True) @pytest.mark.timeout(240) def test_rabbitmq_big_message(rabbitmq_cluster): # Create batchs of messages of size ~100Kb rabbitmq_messages = 1000 batch_messages = 1000 messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(rabbitmq_messages)] credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.rabbitmq (key UInt64, value String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'big', rabbitmq_format = 'JSONEachRow'; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.rabbitmq; ''') for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key='big', body=message) while True: result = instance.query('SELECT count() FROM test.view') print("Result", result, "Expected", batch_messages * rabbitmq_messages) if int(result) == batch_messages * rabbitmq_messages: break connection.close() instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') assert int(result) == rabbitmq_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): NUM_CHANNELS = 5 instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 5, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.rabbitmq; ''') time.sleep(1) i = [0] messages_num = 10000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key = str(randrange(1, NUM_CHANNELS)) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) connection.close() threads = [] threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view') time.sleep(1) print("Result", result, "Expected", messages_num * threads_num) if int(result) == messages_num * threads_num: break for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): NUM_QUEUES = 4 instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_queues = 4, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.rabbitmq; ''') time.sleep(1) i = [0] messages_num = 10000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key = str(randrange(1, NUM_QUEUES)) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) connection.close() threads = [] threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view') time.sleep(1) if int(result) == messages_num * threads_num: break for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster): NUM_CONSUMERS = 10 NUM_QUEUES = 2 instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_queues = 2, rabbitmq_num_consumers = 10, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.rabbitmq; ''') time.sleep(1) i = [0] messages_num = 10000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key = str(randrange(1, NUM_QUEUES * NUM_CONSUMERS)) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) connection.close() threads = [] threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view') time.sleep(1) if int(result) == messages_num * threads_num: break for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_read_only_combo(rabbitmq_cluster): NUM_MV = 5; NUM_CONSUMERS = 4 instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 4, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; ''') for mv_id in range(NUM_MV): table_name = 'view{}'.format(mv_id) print("Setting up {}".format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; CREATE TABLE test.{0} (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.{0}_mv TO test.{0} AS SELECT * FROM test.rabbitmq; '''.format(table_name)) time.sleep(2) i = [0] messages_num = 10000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key = str(randrange(1, NUM_CONSUMERS)) for message in messages: channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) connection.close() threads = [] threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = 0 for view in range(NUM_MV): result += int(instance.query('SELECT count() FROM test.view{0}'.format(view))) if int(result) == messages_num * threads_num * NUM_MV: break time.sleep(1) for thread in threads: thread.join() for mv_id in range(NUM_MV): table_name = 'view{}'.format(mv_id) instance.query(''' DROP TABLE IF EXISTS test.{0}; '''.format(table_name)) assert int(result) == messages_num * threads_num * NUM_MV, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(240) def test_rabbitmq_insert(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'insert', rabbitmq_routing_key_list = 'insert1', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) consumer_connection = pika.BlockingConnection(parameters) consumer = consumer_connection.channel() consumer.exchange_declare(exchange='insert_rabbitmq_direct', exchange_type='direct') result = consumer.queue_declare(queue='') queue_name = result.method.queue consumer.queue_bind(exchange='insert_rabbitmq_direct', queue=queue_name, routing_key='insert1') values = [] for i in range(50): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.rabbitmq VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise insert_messages = [] def onReceived(channel, method, properties, body): i = 0 insert_messages.append(body.decode()) if (len(insert_messages) == 50): channel.stop_consuming() consumer.basic_qos(prefetch_count=50) consumer.basic_consume(onReceived, queue_name) consumer.start_consuming() consumer_connection.close() result = '\n'.join(insert_messages) rabbitmq_check_result(result, True) @pytest.mark.timeout(240) def test_rabbitmq_many_inserts(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.rabbitmq_many; DROP TABLE IF EXISTS test.view_many; DROP TABLE IF EXISTS test.consumer_many; CREATE TABLE test.rabbitmq_many (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'insert2', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view_many (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS SELECT * FROM test.rabbitmq_many; ''') messages_num = 1000 def insert(): values = [] for i in range(messages_num): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.rabbitmq_many VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise threads = [] threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=insert)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view_many') time.sleep(1) if int(result) == messages_num * threads_num: break instance.query(''' DROP TABLE IF EXISTS test.rabbitmq_many; DROP TABLE IF EXISTS test.consumer_many; DROP TABLE IF EXISTS test.view_many; ''') for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(240) def test_rabbitmq_sharding_between_channels_and_queues_insert(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view_sharding; DROP TABLE IF EXISTS test.consumer_sharding; CREATE TABLE test.rabbitmq_sharding (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 5, rabbitmq_num_queues = 2, rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view_sharding (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer_sharding TO test.view_sharding AS SELECT * FROM test.rabbitmq_sharding; ''') messages_num = 10000 def insert(): values = [] for i in range(messages_num): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.rabbitmq_sharding VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise threads = [] threads_num = 20 for _ in range(threads_num): threads.append(threading.Thread(target=insert)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view_sharding') time.sleep(1) if int(result) == messages_num * threads_num: break instance.query(''' DROP TABLE IF EXISTS test.rabbitmq_sharding; DROP TABLE IF EXISTS test.consumer_sharding; DROP TABLE IF EXISTS test.view_sharding; ''') for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_overloaded_insert(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view_overload; DROP TABLE IF EXISTS test.consumer_overload; CREATE TABLE test.rabbitmq_overload (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 10, rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view_overload (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS SELECT * FROM test.rabbitmq_overload; ''') messages_num = 100000 def insert(): values = [] for i in range(messages_num): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.rabbitmq_overload VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise threads = [] threads_num = 5 for _ in range(threads_num): threads.append(threading.Thread(target=insert)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view_overload') time.sleep(1) print("Result", int(result), "Expected", messages_num * threads_num) if int(result) == messages_num * threads_num: break instance.query(''' DROP TABLE IF EXISTS test.rabbitmq_overload; DROP TABLE IF EXISTS test.consumer_overload; DROP TABLE IF EXISTS test.view_overload; ''') for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_direct_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination(key UInt64, value UInt64, _consumed_by LowCardinality(String)) ENGINE = MergeTree() ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; ''') num_tables = 5 for consumer_id in range(num_tables): print("Setting up table {}".format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.direct_exchange_{0}; DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; CREATE TABLE test.direct_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 5, rabbitmq_exchange_name = 'direct_exchange_testing', rabbitmq_exchange_type = 'direct', rabbitmq_routing_key_list = 'direct_{0}', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.direct_exchange_{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.direct_exchange_{0}; '''.format(consumer_id)) i = [0] messages_num = 1000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='direct_exchange_testing', exchange_type='direct') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key_num = 0 for num in range(num_tables): key = "direct_" + str(key_num) key_num += 1 for message in messages: mes_id = str(randrange(10)) channel.basic_publish( exchange='direct_exchange_testing', routing_key=key, properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) if int(result) == messages_num * num_tables: break for consumer_id in range(num_tables): instance.query(''' DROP TABLE IF EXISTS test.direct_exchange_{0}; DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; '''.format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') assert int(result) == messages_num * num_tables, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_fanout_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination(key UInt64, value UInt64, _consumed_by LowCardinality(String)) ENGINE = MergeTree() ORDER BY key; ''') num_tables = 5 for consumer_id in range(num_tables): print("Setting up table {}".format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.fanout_exchange_{0}; DROP TABLE IF EXISTS test.fanout_exchange_{0}_mv; CREATE TABLE test.fanout_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 5, rabbitmq_routing_key_list = 'key_{0}', rabbitmq_exchange_name = 'fanout_exchange_testing', rabbitmq_exchange_type = 'fanout', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.fanout_exchange_{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.fanout_exchange_{0}; '''.format(consumer_id)) i = [0] messages_num = 1000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='fanout_exchange_testing', exchange_type='fanout') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key_num = 0 for message in messages: mes_id = str(randrange(10)) channel.basic_publish( exchange='fanout_exchange_testing', routing_key='', properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) if int(result) == messages_num * num_tables: break for consumer_id in range(num_tables): instance.query(''' DROP TABLE IF EXISTS test.fanout_exchange_{0}; DROP TABLE IF EXISTS test.fanout_exchange_{0}_mv; '''.format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') assert int(result) == messages_num * num_tables, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_topic_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination(key UInt64, value UInt64, _consumed_by LowCardinality(String)) ENGINE = MergeTree() ORDER BY key; ''') num_tables = 5 for consumer_id in range(num_tables): print("Setting up table {}".format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.topic_exchange_{0}; DROP TABLE IF EXISTS test.topic_exchange_{0}_mv; CREATE TABLE test.topic_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 5, rabbitmq_exchange_name = 'topic_exchange_testing', rabbitmq_exchange_type = 'topic', rabbitmq_routing_key_list = '*.{0}', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.topic_exchange_{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.topic_exchange_{0}; '''.format(consumer_id)) for consumer_id in range(num_tables): print("Setting up table {}".format(num_tables + consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.topic_exchange_{0}; DROP TABLE IF EXISTS test.topic_exchange_{0}_mv; CREATE TABLE test.topic_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 4, rabbitmq_exchange_name = 'topic_exchange_testing', rabbitmq_exchange_type = 'topic', rabbitmq_routing_key_list = '*.logs', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.topic_exchange_{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.topic_exchange_{0}; '''.format(num_tables + consumer_id)) i = [0] messages_num = 1000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='topic_exchange_testing', exchange_type='topic') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key_num = 0 for num in range(num_tables): key = "topic." + str(key_num) key_num += 1 for message in messages: channel.basic_publish(exchange='topic_exchange_testing', routing_key=key, body=message) key = "random.logs" for message in messages: mes_id = str(randrange(10)) channel.basic_publish( exchange='topic_exchange_testing', routing_key=key, properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) if int(result) == messages_num * num_tables + messages_num * num_tables: break for consumer_id in range(num_tables * 2): instance.query(''' DROP TABLE IF EXISTS test.topic_exchange_{0}; DROP TABLE IF EXISTS test.topic_exchange_{0}_mv; '''.format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') assert int(result) == messages_num * num_tables + messages_num * num_tables, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_hash_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination(key UInt64, value UInt64, _consumed_by LowCardinality(String)) ENGINE = MergeTree() ORDER BY key; ''') num_tables = 4 for consumer_id in range(num_tables): table_name = 'rabbitmq_consumer{}'.format(consumer_id) print("Setting up {}".format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; CREATE TABLE test.{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 10, rabbitmq_exchange_type = 'consistent_hash', rabbitmq_exchange_name = 'hash_exchange_testing', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.{0}; '''.format(table_name)) i = [0] messages_num = 500 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) def produce(): # init connection here because otherwise python rabbitmq client might fail connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='hash_exchange_testing', exchange_type='x-consistent-hash') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 for message in messages: key = str(randrange(10)) channel.basic_publish(exchange='hash_exchange_testing', routing_key=key, body=message) connection.close() threads = [] threads_num = 10 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) if int(result) == messages_num * threads_num: break for consumer_id in range(num_tables): table_name = 'rabbitmq_consumer{}'.format(consumer_id) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; '''.format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_multiple_bindings(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination(key UInt64, value UInt64, _consumed_by LowCardinality(String)) ENGINE = MergeTree() ORDER BY key; ''') instance.query(''' DROP TABLE IF EXISTS test.bindings_1; DROP TABLE IF EXISTS test.bindings_1_mv; CREATE TABLE test.bindings_1 (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 5, rabbitmq_num_queues = 2, rabbitmq_exchange_name = 'multiple_bindings_testing', rabbitmq_exchange_type = 'direct', rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.bindings_1_mv TO test.destination AS SELECT * FROM test.bindings_1; ''') # in case num_consumers and num_queues are not set - multiple bindings are implemented differently, so test them too instance.query(''' DROP TABLE IF EXISTS test.bindings_2; DROP TABLE IF EXISTS test.bindings_2_mv; CREATE TABLE test.bindings_2 (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'multiple_bindings_testing', rabbitmq_exchange_type = 'direct', rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.bindings_2_mv TO test.destination AS SELECT * FROM test.bindings_2; ''') i = [0] messages_num = 500 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) def produce(): # init connection here because otherwise python rabbitmq client might fail connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='multiple_bindings_testing', exchange_type='direct') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 keys = ['key1', 'key2', 'key3', 'key4', 'key5'] for key in keys: for message in messages: mes_id = str(randrange(10)) channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() threads = [] threads_num = 10 for _ in range(threads_num): threads.append(threading.Thread(target=produce)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) if int(result) == messages_num * threads_num * 5 * 2: break for thread in threads: thread.join() instance.query(''' DROP TABLE IF EXISTS test.bindings_1; DROP TABLE IF EXISTS test.bindings_2; DROP TABLE IF EXISTS test.destination; ''') assert int(result) == messages_num * threads_num * 5 * 2, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(420) def test_rabbitmq_headers_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination(key UInt64, value UInt64, _consumed_by LowCardinality(String)) ENGINE = MergeTree() ORDER BY key; ''') num_tables_to_receive = 3 for consumer_id in range(num_tables_to_receive): print("Setting up table {}".format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.headers_exchange_{0}; DROP TABLE IF EXISTS test.headers_exchange_{0}_mv; CREATE TABLE test.headers_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_num_consumers = 4, rabbitmq_exchange_name = 'headers_exchange_testing', rabbitmq_exchange_type = 'headers', rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.headers_exchange_{0}; '''.format(consumer_id)) num_tables_to_ignore = 2 for consumer_id in range(num_tables_to_ignore): print("Setting up table {}".format(consumer_id + num_tables_to_receive)) instance.query(''' DROP TABLE IF EXISTS test.headers_exchange_{0}; DROP TABLE IF EXISTS test.headers_exchange_{0}_mv; CREATE TABLE test.headers_exchange_{0} (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'headers_exchange_testing', rabbitmq_exchange_type = 'headers', rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2019', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.headers_exchange_{0}_mv TO test.destination AS SELECT key, value, '{0}' as _consumed_by FROM test.headers_exchange_{0}; '''.format(consumer_id + num_tables_to_receive)) i = [0] messages_num = 1000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange='headers_exchange_testing', exchange_type='headers') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 fields={} fields['format']='logs' fields['type']='report' fields['year']='2020' key_num = 0 for message in messages: mes_id = str(randrange(10)) channel.basic_publish(exchange='headers_exchange_testing', routing_key='', properties=pika.BasicProperties(headers=fields, message_id=mes_id), body=message) connection.close() while True: result = instance.query('SELECT count() FROM test.destination') time.sleep(1) if int(result) == messages_num * num_tables_to_receive: break for consumer_id in range(num_tables_to_receive + num_tables_to_ignore): instance.query(''' DROP TABLE IF EXISTS test.direct_exchange_{0}; DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; '''.format(consumer_id)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') assert int(result) == messages_num * num_tables_to_receive, 'ClickHouse lost some messages: {}'.format(result) if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...") cluster.shutdown()