import os.path as p import random import threading import time import pytest from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV from helpers.client import QueryRuntimeException from helpers.network import PartitionManager import json import subprocess import kafka.errors from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer from kafka.admin import NewTopic from google.protobuf.internal.encoder import _VarintBytes """ protoc --version libprotoc 3.0.0 # to create kafka_pb2.py protoc --python_out=. kafka.proto """ import kafka_pb2 # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. # TODO: add test for SELECT LIMIT is working. cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance', config_dir='configs', main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ], with_kafka=True, with_zookeeper=True, clickhouse_path_dir='clickhouse_path') kafka_id = '' # Helpers def check_kafka_is_available(): p = subprocess.Popen(('docker', 'exec', '-i', kafka_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'INSIDE://localhost:9092'), stdout=subprocess.PIPE) p.communicate() return p.returncode == 0 def wait_kafka_is_available(max_retries=50): retries = 0 while True: if check_kafka_is_available(): break else: retries += 1 if retries > max_retries: raise "Kafka is not available" print("Waiting for Kafka to start up") time.sleep(1) def kafka_produce(topic, messages, timestamp=None): producer = KafkaProducer(bootstrap_servers="localhost:9092") for message in messages: producer.send(topic=topic, value=message, timestamp_ms=timestamp) producer.flush() # print ("Produced {} messages for topic {}".format(len(messages), topic)) def kafka_consume(topic): consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest") consumer.subscribe(topics=(topic)) for toppar, messages in consumer.poll(5000).items(): if toppar.topic == topic: for message in messages: yield message.value consumer.unsubscribe() consumer.close() def kafka_produce_protobuf_messages(topic, start_index, num_messages): data = '' for i in range(start_index, start_index + num_messages): msg = kafka_pb2.KeyValuePair() msg.key = i msg.value = str(i) serialized_msg = msg.SerializeToString() data = data + _VarintBytes(len(serialized_msg)) + serialized_msg producer = KafkaProducer(bootstrap_servers="localhost:9092") producer.send(topic=topic, value=data) producer.flush() print("Produced {} messages for topic {}".format(num_messages, topic)) # Since everything is async and shaky when receiving messages from Kafka, # we may want to try and check results multiple times in a loop. def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'): fpath = p.join(p.dirname(__file__), ref_file) with open(fpath) as reference: if check: assert TSV(result) == TSV(reference) else: return TSV(result) == TSV(reference) # Fixtures @pytest.fixture(scope="module") def kafka_cluster(): try: global kafka_id cluster.start() kafka_id = instance.cluster.kafka_docker_id print("kafka_id is {}".format(kafka_id)) instance.query('CREATE DATABASE test') yield cluster finally: cluster.shutdown() @pytest.fixture(autouse=True) def kafka_setup_teardown(): instance.query('DROP TABLE IF EXISTS test.kafka') wait_kafka_is_available() print("kafka is available - running test") yield # run test instance.query('DROP TABLE IF EXISTS test.kafka') # Tests @pytest.mark.timeout(180) def test_kafka_settings_old_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka('kafka1:19092', 'old', 'old', 'JSONEachRow', '\\n'); ''') # Don't insert malformed messages since old settings syntax # doesn't support skipping of broken messages. messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('old', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_settings_new_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'new', kafka_group_name = 'new', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n', kafka_skip_broken_messages = 1; ''') messages = [] for i in range(25): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('new', messages) # Insert couple of malformed messages. kafka_produce('new', ['}{very_broken_message,']) kafka_produce('new', ['}another{very_broken_message,']) messages = [] for i in range(25, 50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('new', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_consumer_hang(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.kafka; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'consumer_hang', kafka_group_name = 'consumer_hang', kafka_format = 'JSONEachRow', kafka_num_consumers = 8, kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory(); CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') time.sleep(10) instance.query('SELECT * FROM test.view') # This should trigger heartbeat fail, # which will trigger REBALANCE_IN_PROGRESS, # and which can lead to consumer hang. kafka_cluster.pause_container('kafka1') time.sleep(0.5) kafka_cluster.unpause_container('kafka1') # print("Attempt to drop") instance.query('DROP TABLE test.kafka') #kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # original problem appearance was a sequence of the following messages in librdkafka logs: # BROKERFAIL -> |ASSIGN| -> REBALANCE_IN_PROGRESS -> "waiting for rebalance_cb" (repeated forever) # so it was waiting forever while the application will execute queued rebalance callback # from a user perspective: we expect no hanging 'drop' queries # 'dr'||'op' to avoid self matching assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 @pytest.mark.timeout(180) def test_kafka_csv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'csv', kafka_group_name = 'csv', kafka_format = 'CSV', kafka_row_delimiter = '\\n'; ''') messages = [] for i in range(50): messages.append('{i}, {i}'.format(i=i)) kafka_produce('csv', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_tsv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'tsv', kafka_group_name = 'tsv', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') messages = [] for i in range(50): messages.append('{i}\t{i}'.format(i=i)) kafka_produce('tsv', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_select_empty(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'empty', kafka_group_name = 'empty', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') assert int(instance.query('SELECT count() FROM test.kafka')) == 0 @pytest.mark.timeout(180) def test_kafka_json_without_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'json', kafka_group_name = 'json', kafka_format = 'JSONEachRow'; ''') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('json', [messages]) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('json', [messages]) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_protobuf(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'pb', kafka_group_name = 'pb', kafka_format = 'Protobuf', kafka_schema = 'kafka.proto:KeyValuePair'; ''') kafka_produce_protobuf_messages('pb', 0, 20) kafka_produce_protobuf_messages('pb', 20, 1) kafka_produce_protobuf_messages('pb', 21, 29) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_materialized_view(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'mv', kafka_group_name = 'mv', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mv', messages) while True: result = instance.query('SELECT * FROM test.view') if kafka_check_result(result): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_materialized_view_with_subquery(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'mvsq', kafka_group_name = 'mvsq', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM (SELECT * FROM test.kafka); ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mvsq', messages) while True: result = instance.query('SELECT * FROM test.view') if kafka_check_result(result): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_many_materialized_views(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view1; DROP TABLE IF EXISTS test.view2; DROP TABLE IF EXISTS test.consumer1; DROP TABLE IF EXISTS test.consumer2; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'mmv', kafka_group_name = 'mmv', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view1 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE TABLE test.view2 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS SELECT * FROM test.kafka; CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS SELECT * FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mmv', messages) while True: result1 = instance.query('SELECT * FROM test.view1') result2 = instance.query('SELECT * FROM test.view2') if kafka_check_result(result1) and kafka_check_result(result2): break instance.query(''' DROP TABLE test.consumer1; DROP TABLE test.consumer2; DROP TABLE test.view1; DROP TABLE test.view2; ''') kafka_check_result(result1, True) kafka_check_result(result2, True) @pytest.mark.timeout(300) def test_kafka_flush_on_big_message(kafka_cluster): # Create batchs of messages of size ~100Kb kafka_messages = 1000 batch_messages = 1000 messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(kafka_messages)] kafka_produce('flush', messages) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'flush', kafka_group_name = 'flush', kafka_format = 'JSONEachRow', kafka_max_block_size = 10; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') client = KafkaAdminClient(bootstrap_servers="localhost:9092") received = False while not received: try: offsets = client.list_consumer_group_offsets('flush') for topic, offset in offsets.items(): if topic.topic == 'flush' and offset.offset == kafka_messages: received = True break except kafka.errors.GroupCoordinatorNotAvailableError: continue while True: result = instance.query('SELECT count() FROM test.view') if int(result) == kafka_messages*batch_messages: break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(180) def test_kafka_virtual_columns(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt1', kafka_group_name = 'virt1', kafka_format = 'JSONEachRow'; ''') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('virt1', [messages], 0) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('virt1', [messages], 0) result = '' while True: result += instance.query('SELECT _key, key, _topic, value, _offset, _partition, _timestamp FROM test.kafka', ignore_error=True) if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): break kafka_check_result(result, True, 'test_kafka_virtual1.reference') @pytest.mark.timeout(180) def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt2', kafka_group_name = 'virt2', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64, timestamp Nullable(DateTime)) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('virt2', messages, 0) while True: result = instance.query('SELECT kafka_key, key, topic, value, offset, partition, timestamp FROM test.view') if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_check_result(result, True, 'test_kafka_virtual2.reference') @pytest.mark.timeout(180) def test_kafka_insert(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert1', kafka_group_name = 'insert1', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') values = [] for i in range(50): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.kafka VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise messages = [] while True: messages.extend(kafka_consume('insert1')) if len(messages) == 50: break result = '\n'.join(messages) kafka_check_result(result, True) @pytest.mark.timeout(240) def test_kafka_produce_consume(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert2', kafka_group_name = 'insert2', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages_num = 10000 def insert(): values = [] for i in range(messages_num): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.kafka VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise threads = [] threads_num = 16 for _ in range(threads_num): threads.append(threading.Thread(target=insert)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view') time.sleep(1) if int(result) == messages_num * threads_num: break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(300) def test_kafka_commit_on_block_write(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'block', kafka_group_name = 'block', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') cancel = threading.Event() i = [0] def produce(): while not cancel.is_set(): messages = [] for _ in range(101): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 kafka_produce('block', messages) kafka_thread = threading.Thread(target=produce) kafka_thread.start() while int(instance.query('SELECT count() FROM test.view')) == 0: time.sleep(1) cancel.set() instance.query(''' DROP TABLE test.kafka; ''') while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka'")) == 1: time.sleep(1) instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'block', kafka_group_name = 'block', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; ''') while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]: time.sleep(1) result = int(instance.query('SELECT count() == uniqExact(key) FROM test.view')) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_thread.join() assert result == 1, 'Messages from kafka get duplicated!' @pytest.mark.timeout(180) def test_kafka_virtual_columns2(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="virt2_0", num_partitions=2, replication_factor=1)) topic_list.append(NewTopic(name="virt2_1", num_partitions=2, replication_factor=1)) admin_client.create_topics(new_topics=topic_list, validate_only=False) instance.query(''' CREATE TABLE test.kafka (value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt2_0,virt2_1', kafka_group_name = 'virt2', kafka_format = 'JSONEachRow'; CREATE MATERIALIZED VIEW test.view Engine=Log AS SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka; ''') producer = KafkaProducer(bootstrap_servers="localhost:9092") producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801000) producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802000) producer.flush() time.sleep(1) producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803000) producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804000) producer.flush() time.sleep(1) producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805000) producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806000) producer.flush() time.sleep(1) producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807000) producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808000) producer.flush() time.sleep(10) result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True) expected = '''\ 1 k1 virt2_0 0 0 1577836801 2 k2 virt2_0 0 1 1577836802 3 k3 virt2_0 1 0 1577836803 4 k4 virt2_0 1 1 1577836804 5 k5 virt2_1 0 0 1577836805 6 k6 virt2_1 0 1 1577836806 7 k7 virt2_1 1 0 1577836807 8 k8 virt2_1 1 1 1577836808 ''' assert TSV(result) == TSV(expected) @pytest.mark.timeout(240) def test_kafka_produce_key_timestamp(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka_writer (key UInt64, value UInt64, _key String, _timestamp DateTime) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert3', kafka_group_name = 'insert3', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; CREATE TABLE test.kafka (key UInt64, value UInt64, inserted_key String, inserted_timestamp DateTime) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert3', kafka_group_name = 'insert3', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.view Engine=Log AS SELECT key, value, inserted_key, toUnixTimestamp(inserted_timestamp), _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka; ''') instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(1,1,'k1',1577836801)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(2,2,'k2',1577836802)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({})),({},{},'{}',toDateTime({}))".format(3,3,'k3',1577836803,4,4,'k4',1577836804)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(5,5,'k5',1577836805)) time.sleep(10) result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True) # print(result) expected = '''\ 1 1 k1 1577836801 k1 insert3 0 0 1577836801 2 2 k2 1577836802 k2 insert3 0 1 1577836802 3 3 k3 1577836803 k3 insert3 0 2 1577836803 4 4 k4 1577836804 k4 insert3 0 3 1577836804 5 5 k5 1577836805 k5 insert3 0 4 1577836805 ''' assert TSV(result) == TSV(expected) @pytest.mark.timeout(600) def test_kafka_flush_by_time(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'flush_by_time', kafka_group_name = 'flush_by_time', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') cancel = threading.Event() def produce(): while not cancel.is_set(): messages = [] messages.append(json.dumps({'key': 0, 'value': 0})) kafka_produce('flush_by_time', messages) time.sleep(1) kafka_thread = threading.Thread(target=produce) kafka_thread.start() time.sleep(18) result = instance.query('SELECT count() FROM test.view') print(result) cancel.set() kafka_thread.join() # kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # 40 = 2 flushes (7.5 sec), 15 polls each, about 1 mgs per 1.5 sec assert int(result) > 12, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!' @pytest.mark.timeout(600) def test_kafka_flush_by_block_size(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'flush_by_block_size', kafka_group_name = 'flush_by_block_size', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; SELECT * FROM test.kafka; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages = [] for _ in range(101): messages.append(json.dumps({'key': 0, 'value': 0})) kafka_produce('flush_by_block_size', messages) time.sleep(1) result = instance.query('SELECT count() FROM test.view') # print(result) # kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # 100 = first poll should return 100 messages (and rows) # not waiting for stream_flush_interval_ms assert int(result) == 100, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!' @pytest.mark.timeout(600) def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="topic_with_multiple_partitions2", num_partitions=10, replication_factor=1)) admin_client.create_topics(new_topics=topic_list, validate_only=False) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'topic_with_multiple_partitions2', kafka_group_name = 'topic_with_multiple_partitions2', kafka_format = 'JSONEachRow', kafka_max_block_size = 211; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages = [] count = 0 for dummy_msg in range(1000): rows = [] for dummy_row in range(random.randrange(3,10)): count = count + 1 rows.append(json.dumps({'key': count, 'value': count})) messages.append("\n".join(rows)) kafka_produce('topic_with_multiple_partitions2', messages) time.sleep(30) result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') print(result) assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(count) ) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') @pytest.mark.timeout(1200) def test_kafka_rebalance(kafka_cluster): NUMBER_OF_CONSURRENT_CONSUMERS=11 instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination ( key UInt64, value UInt64, _topic String, _key String, _offset UInt64, _partition UInt64, _timestamp Nullable(DateTime), _consumed_by LowCardinality(String) ) ENGINE = MergeTree() ORDER BY key; ''') # kafka_cluster.open_bash_shell('instance') #time.sleep(2) admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1)) admin_client.create_topics(new_topics=topic_list, validate_only=False) cancel = threading.Event() msg_index = [0] def produce(): while not cancel.is_set(): messages = [] for _ in range(59): messages.append(json.dumps({'key': msg_index[0], 'value': msg_index[0]})) msg_index[0] += 1 kafka_produce('topic_with_multiple_partitions', messages) kafka_thread = threading.Thread(target=produce) kafka_thread.start() for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS): table_name = 'kafka_consumer{}'.format(consumer_index) print("Setting up {}".format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; CREATE TABLE test.{0} (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'topic_with_multiple_partitions', kafka_group_name = 'rebalance_test_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 33; CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS SELECT key, value, _topic, _key, _offset, _partition, _timestamp, '{0}' as _consumed_by FROM test.{0}; '''.format(table_name)) # kafka_cluster.open_bash_shell('instance') while int(instance.query("SELECT count() FROM test.destination WHERE _consumed_by='{}'".format(table_name))) == 0: print("Waiting for test.kafka_consumer{} to start consume".format(consumer_index)) time.sleep(1) cancel.set() # I leave last one working by intent (to finish consuming after all rebalances) for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS-1): print("Dropping test.kafka_consumer{}".format(consumer_index)) instance.query('DROP TABLE IF EXISTS test.kafka_consumer{}'.format(consumer_index)) while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka_consumer{}'".format(consumer_index))) == 1: time.sleep(1) # print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination')) # kafka_cluster.open_bash_shell('instance') while 1: messages_consumed = int(instance.query('SELECT uniqExact(key) FROM test.destination')) if messages_consumed >= msg_index[0]: break time.sleep(1) print("Waiting for finishing consuming (have {}, should be {})".format(messages_consumed,msg_index[0])) print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination')) # SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1) # select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0; # SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key; # select _partition from test.destination group by _partition having count() <> max(_offset) + 1; # select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset; # SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset; result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination')) for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS): print("kafka_consumer{}".format(consumer_index)) table_name = 'kafka_consumer{}'.format(consumer_index) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; '''.format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') kafka_thread.join() assert result == 1, 'Messages from kafka get duplicated!' @pytest.mark.timeout(1200) def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)] kafka_produce('no_holes_when_write_suffix_failed', messages) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'no_holes_when_write_suffix_failed', kafka_group_name = 'no_holes_when_write_suffix_failed', kafka_format = 'JSONEachRow', kafka_max_block_size = 20; CREATE TABLE test.view (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1') ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka WHERE NOT sleepEachRow(1); ''') # the tricky part here is that disconnect should happen after write prefix, but before write suffix # so i use sleepEachRow with PartitionManager() as pm: time.sleep(12) pm.drop_instance_zk_connections(instance) time.sleep(20) pm.heal_all # connection restored and it will take a while until next block will be flushed # it takes years on CI :\ time.sleep(90) # as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly assert instance.contains_in_log("ZooKeeper session has been expired.: while write prefix to view") result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') print(result) # kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') assert TSV(result) == TSV('22\t22\t22') @pytest.mark.timeout(120) def test_exception_from_destructor(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'xyz', kafka_group_name = '', kafka_format = 'JSONEachRow'; ''') instance.query_and_get_error(''' SELECT * FROM test.kafka; ''') instance.query(''' DROP TABLE test.kafka; ''') instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'xyz', kafka_group_name = '', kafka_format = 'JSONEachRow'; ''') instance.query(''' DROP TABLE test.kafka; ''') #kafka_cluster.open_bash_shell('instance') assert TSV(instance.query('SELECT 1')) == TSV('1') @pytest.mark.timeout(1200) def test_kafka_duplicates_when_commit_failed(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)] kafka_produce('duplicates_when_commit_failed', messages) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'duplicates_when_commit_failed', kafka_group_name = 'duplicates_when_commit_failed', kafka_format = 'JSONEachRow', kafka_max_block_size = 20; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka WHERE NOT sleepEachRow(0.5); ''') #print time.strftime("%m/%d/%Y %H:%M:%S") time.sleep(12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen #print time.strftime("%m/%d/%Y %H:%M:%S") kafka_cluster.pause_container('kafka1') # that timeout it VERY important, and picked after lot of experiments # when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka) # when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member" time.sleep(40) #print time.strftime("%m/%d/%Y %H:%M:%S") kafka_cluster.unpause_container('kafka1') #kafka_cluster.open_bash_shell('instance') # connection restored and it will take a while until next block will be flushed # it takes years on CI :\ time.sleep(30) # as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly assert instance.contains_in_log("Local: Waiting for coordinator") result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') print(result) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') assert TSV(result) == TSV('22\t22\t22') if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...") cluster.shutdown()