import os.path as p import random import threading import time import pytest from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV from helpers.client import QueryRuntimeException from helpers.network import PartitionManager import json import subprocess import kafka.errors from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection from kafka.admin import NewTopic from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1 from kafka.protocol.group import MemberAssignment import socket from google.protobuf.internal.encoder import _VarintBytes """ protoc --version libprotoc 3.0.0 # to create kafka_pb2.py protoc --python_out=. kafka.proto """ import kafka_pb2 # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. # TODO: add test for SELECT LIMIT is working. cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance', config_dir='configs', main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ], with_kafka=True, with_zookeeper=True, clickhouse_path_dir='clickhouse_path') kafka_id = '' # Helpers def check_kafka_is_available(): p = subprocess.Popen(('docker', 'exec', '-i', kafka_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'INSIDE://localhost:9092'), stdout=subprocess.PIPE) p.communicate() return p.returncode == 0 def wait_kafka_is_available(max_retries=50): retries = 0 while True: if check_kafka_is_available(): break else: retries += 1 if retries > max_retries: raise "Kafka is not available" print("Waiting for Kafka to start up") time.sleep(1) def kafka_produce(topic, messages, timestamp=None): producer = KafkaProducer(bootstrap_servers="localhost:9092") for message in messages: producer.send(topic=topic, value=message, timestamp_ms=timestamp) producer.flush() # print ("Produced {} messages for topic {}".format(len(messages), topic)) def kafka_consume(topic): consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest") consumer.subscribe(topics=(topic)) for toppar, messages in consumer.poll(5000).items(): if toppar.topic == topic: for message in messages: yield message.value consumer.unsubscribe() consumer.close() def kafka_produce_protobuf_messages(topic, start_index, num_messages): data = '' for i in range(start_index, start_index + num_messages): msg = kafka_pb2.KeyValuePair() msg.key = i msg.value = str(i) serialized_msg = msg.SerializeToString() data = data + _VarintBytes(len(serialized_msg)) + serialized_msg producer = KafkaProducer(bootstrap_servers="localhost:9092") producer.send(topic=topic, value=data) producer.flush() print("Produced {} messages for topic {}".format(num_messages, topic)) # Since everything is async and shaky when receiving messages from Kafka, # we may want to try and check results multiple times in a loop. def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'): fpath = p.join(p.dirname(__file__), ref_file) with open(fpath) as reference: if check: assert TSV(result) == TSV(reference) else: return TSV(result) == TSV(reference) # https://stackoverflow.com/a/57692111/1555175 def describe_consumer_group(name): client = BrokerConnection('localhost', 9092, socket.AF_INET) client.connect_blocking() list_members_in_groups = DescribeGroupsRequest_v1(groups=[name]) future = client.send(list_members_in_groups) while not future.is_done: for resp, f in client.recv(): f.success(resp) (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0] res = [] for member in members: (member_id, client_id, client_host, member_metadata, member_assignment) = member member_info = {} member_info['member_id'] = member_id member_info['client_id'] = client_id member_info['client_host'] = client_host member_topics_assignment = [] for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment: member_topics_assignment.append({'topic':topic, 'partitions':partitions}) member_info['assignment'] = member_topics_assignment res.append(member_info) return res # Fixtures @pytest.fixture(scope="module") def kafka_cluster(): try: global kafka_id cluster.start() kafka_id = instance.cluster.kafka_docker_id print("kafka_id is {}".format(kafka_id)) instance.query('CREATE DATABASE test') yield cluster finally: cluster.shutdown() @pytest.fixture(autouse=True) def kafka_setup_teardown(): instance.query('DROP TABLE IF EXISTS test.kafka') wait_kafka_is_available() print("kafka is available - running test") yield # run test instance.query('DROP TABLE IF EXISTS test.kafka') # Tests @pytest.mark.timeout(180) def test_kafka_settings_old_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka('kafka1:19092', 'old', 'old', 'JSONEachRow', '\\n'); ''') # Don't insert malformed messages since old settings syntax # doesn't support skipping of broken messages. messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('old', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) members = describe_consumer_group('old') assert members[0]['client_id'] == u'ClickHouse-instance-test-kafka' # text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose")) @pytest.mark.timeout(180) def test_kafka_settings_new_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'new', kafka_group_name = 'new', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n', kafka_client_id = '{instance} test 1234', kafka_skip_broken_messages = 1; ''') messages = [] for i in range(25): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('new', messages) # Insert couple of malformed messages. kafka_produce('new', ['}{very_broken_message,']) kafka_produce('new', ['}another{very_broken_message,']) messages = [] for i in range(25, 50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('new', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) members = describe_consumer_group('new') assert members[0]['client_id'] == u'instance test 1234' @pytest.mark.timeout(180) def test_kafka_issue11308(kafka_cluster): # Check that matview does respect Kafka SETTINGS kafka_produce('issue11308', ['{"t": 123, "e": {"x": "woof"} }', '{"t": 123, "e": {"x": "woof"} }', '{"t": 124, "e": {"x": "test"} }']) instance.query(''' CREATE TABLE test.persistent_kafka ( time UInt64, some_string String ) ENGINE = MergeTree() ORDER BY time; CREATE TABLE test.kafka (t UInt64, `e.x` String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'issue11308', kafka_group_name = 'issue11308', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n', kafka_flush_interval_ms=1000, input_format_import_nested_json = 1; CREATE MATERIALIZED VIEW test.persistent_kafka_mv TO test.persistent_kafka AS SELECT `t` AS `time`, `e.x` AS `some_string` FROM test.kafka; ''') time.sleep(9) result = instance.query('SELECT * FROM test.persistent_kafka ORDER BY time;') instance.query(''' DROP TABLE test.persistent_kafka; DROP TABLE test.persistent_kafka_mv; ''') expected = '''\ 123 woof 123 woof 124 test ''' assert TSV(result) == TSV(expected) @pytest.mark.timeout(180) def test_kafka_issue4116(kafka_cluster): # Check that format_csv_delimiter parameter works now - as part of all available format settings. kafka_produce('issue4116', ['1|foo', '2|bar', '42|answer','100|multi\n101|row\n103|message']) instance.query(''' CREATE TABLE test.kafka (a UInt64, b String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'issue4116', kafka_group_name = 'issue4116', kafka_format = 'CSV', kafka_row_delimiter = '\\n', format_csv_delimiter = '|'; ''') result = instance.query('SELECT * FROM test.kafka ORDER BY a;') expected = '''\ 1 foo 2 bar 42 answer 100 multi 101 row 103 message ''' assert TSV(result) == TSV(expected) @pytest.mark.timeout(180) def test_kafka_consumer_hang(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.kafka; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'consumer_hang', kafka_group_name = 'consumer_hang', kafka_format = 'JSONEachRow', kafka_num_consumers = 8, kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory(); CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') time.sleep(10) instance.query('SELECT * FROM test.view') # This should trigger heartbeat fail, # which will trigger REBALANCE_IN_PROGRESS, # and which can lead to consumer hang. kafka_cluster.pause_container('kafka1') time.sleep(0.5) kafka_cluster.unpause_container('kafka1') # print("Attempt to drop") instance.query('DROP TABLE test.kafka') #kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # original problem appearance was a sequence of the following messages in librdkafka logs: # BROKERFAIL -> |ASSIGN| -> REBALANCE_IN_PROGRESS -> "waiting for rebalance_cb" (repeated forever) # so it was waiting forever while the application will execute queued rebalance callback # from a user perspective: we expect no hanging 'drop' queries # 'dr'||'op' to avoid self matching assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 @pytest.mark.timeout(180) def test_kafka_consumer_hang2(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.kafka; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'consumer_hang2', kafka_group_name = 'consumer_hang2', kafka_format = 'JSONEachRow'; CREATE TABLE test.kafka2 (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'consumer_hang2', kafka_group_name = 'consumer_hang2', kafka_format = 'JSONEachRow'; ''') # first consumer subscribe the topic, try to poll some data, and go to rest instance.query('SELECT * FROM test.kafka') # second consumer do the same leading to rebalance in the first # consumer, try to poll some data instance.query('SELECT * FROM test.kafka2') #echo 'SELECT * FROM test.kafka; SELECT * FROM test.kafka2; DROP TABLE test.kafka;' | clickhouse client -mn & # kafka_cluster.open_bash_shell('instance') # first consumer has pending rebalance callback unprocessed (no poll after select) # one of those queries was failing because of # https://github.com/edenhill/librdkafka/issues/2077 # https://github.com/edenhill/librdkafka/issues/2898 instance.query('DROP TABLE test.kafka') instance.query('DROP TABLE test.kafka2') # from a user perspective: we expect no hanging 'drop' queries # 'dr'||'op' to avoid self matching assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 @pytest.mark.timeout(180) def test_kafka_csv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'csv', kafka_group_name = 'csv', kafka_format = 'CSV', kafka_row_delimiter = '\\n'; ''') messages = [] for i in range(50): messages.append('{i}, {i}'.format(i=i)) kafka_produce('csv', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_tsv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'tsv', kafka_group_name = 'tsv', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') messages = [] for i in range(50): messages.append('{i}\t{i}'.format(i=i)) kafka_produce('tsv', messages) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_select_empty(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'empty', kafka_group_name = 'empty', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') assert int(instance.query('SELECT count() FROM test.kafka')) == 0 @pytest.mark.timeout(180) def test_kafka_json_without_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'json', kafka_group_name = 'json', kafka_format = 'JSONEachRow'; ''') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('json', [messages]) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('json', [messages]) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_protobuf(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'pb', kafka_group_name = 'pb', kafka_format = 'Protobuf', kafka_schema = 'kafka.proto:KeyValuePair'; ''') kafka_produce_protobuf_messages('pb', 0, 20) kafka_produce_protobuf_messages('pb', 20, 1) kafka_produce_protobuf_messages('pb', 21, 29) result = '' while True: result += instance.query('SELECT * FROM test.kafka', ignore_error=True) if kafka_check_result(result): break kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_materialized_view(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'mv', kafka_group_name = 'mv', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mv', messages) while True: result = instance.query('SELECT * FROM test.view') if kafka_check_result(result): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_materialized_view_with_subquery(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'mvsq', kafka_group_name = 'mvsq', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM (SELECT * FROM test.kafka); ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mvsq', messages) while True: result = instance.query('SELECT * FROM test.view') if kafka_check_result(result): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_check_result(result, True) @pytest.mark.timeout(180) def test_kafka_many_materialized_views(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view1; DROP TABLE IF EXISTS test.view2; DROP TABLE IF EXISTS test.consumer1; DROP TABLE IF EXISTS test.consumer2; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'mmv', kafka_group_name = 'mmv', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view1 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE TABLE test.view2 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS SELECT * FROM test.kafka; CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS SELECT * FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('mmv', messages) while True: result1 = instance.query('SELECT * FROM test.view1') result2 = instance.query('SELECT * FROM test.view2') if kafka_check_result(result1) and kafka_check_result(result2): break instance.query(''' DROP TABLE test.consumer1; DROP TABLE test.consumer2; DROP TABLE test.view1; DROP TABLE test.view2; ''') kafka_check_result(result1, True) kafka_check_result(result2, True) @pytest.mark.timeout(300) def test_kafka_flush_on_big_message(kafka_cluster): # Create batchs of messages of size ~100Kb kafka_messages = 1000 batch_messages = 1000 messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(kafka_messages)] kafka_produce('flush', messages) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'flush', kafka_group_name = 'flush', kafka_format = 'JSONEachRow', kafka_max_block_size = 10; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') client = KafkaAdminClient(bootstrap_servers="localhost:9092") received = False while not received: try: offsets = client.list_consumer_group_offsets('flush') for topic, offset in offsets.items(): if topic.topic == 'flush' and offset.offset == kafka_messages: received = True break except kafka.errors.GroupCoordinatorNotAvailableError: continue while True: result = instance.query('SELECT count() FROM test.view') if int(result) == kafka_messages*batch_messages: break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(180) def test_kafka_virtual_columns(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt1', kafka_group_name = 'virt1', kafka_format = 'JSONEachRow'; ''') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('virt1', [messages], 0) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' kafka_produce('virt1', [messages], 0) result = '' while True: result += instance.query('SELECT _key, key, _topic, value, _offset, _partition, _timestamp FROM test.kafka', ignore_error=True) if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): break kafka_check_result(result, True, 'test_kafka_virtual1.reference') @pytest.mark.timeout(180) def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt2', kafka_group_name = 'virt2', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64, timestamp Nullable(DateTime)) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) kafka_produce('virt2', messages, 0) while True: result = instance.query('SELECT kafka_key, key, topic, value, offset, partition, timestamp FROM test.view') if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_check_result(result, True, 'test_kafka_virtual2.reference') @pytest.mark.timeout(180) def test_kafka_insert(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert1', kafka_group_name = 'insert1', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; ''') values = [] for i in range(50): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.kafka VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise messages = [] while True: messages.extend(kafka_consume('insert1')) if len(messages) == 50: break result = '\n'.join(messages) kafka_check_result(result, True) @pytest.mark.timeout(240) def test_kafka_produce_consume(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert2', kafka_group_name = 'insert2', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages_num = 10000 def insert(): values = [] for i in range(messages_num): values.append("({i}, {i})".format(i=i)) values = ','.join(values) while True: try: instance.query("INSERT INTO test.kafka VALUES {}".format(values)) break except QueryRuntimeException as e: if 'Local: Timed out.' in str(e): continue else: raise threads = [] threads_num = 16 for _ in range(threads_num): threads.append(threading.Thread(target=insert)) for thread in threads: time.sleep(random.uniform(0, 1)) thread.start() while True: result = instance.query('SELECT count() FROM test.view') time.sleep(1) if int(result) == messages_num * threads_num: break instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') for thread in threads: thread.join() assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) @pytest.mark.timeout(300) def test_kafka_commit_on_block_write(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'block', kafka_group_name = 'block', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') cancel = threading.Event() i = [0] def produce(): while not cancel.is_set(): messages = [] for _ in range(101): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 kafka_produce('block', messages) kafka_thread = threading.Thread(target=produce) kafka_thread.start() while int(instance.query('SELECT count() FROM test.view')) == 0: time.sleep(1) cancel.set() instance.query(''' DROP TABLE test.kafka; ''') while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka'")) == 1: time.sleep(1) instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'block', kafka_group_name = 'block', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; ''') while int(instance.query('SELECT uniqExact(key) FROM test.view')) < i[0]: time.sleep(1) result = int(instance.query('SELECT count() == uniqExact(key) FROM test.view')) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') kafka_thread.join() assert result == 1, 'Messages from kafka get duplicated!' @pytest.mark.timeout(180) def test_kafka_virtual_columns2(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="virt2_0", num_partitions=2, replication_factor=1)) topic_list.append(NewTopic(name="virt2_1", num_partitions=2, replication_factor=1)) admin_client.create_topics(new_topics=topic_list, validate_only=False) instance.query(''' CREATE TABLE test.kafka (value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt2_0,virt2_1', kafka_group_name = 'virt2', kafka_num_consumers = 2, kafka_format = 'JSONEachRow'; CREATE MATERIALIZED VIEW test.view Engine=Log AS SELECT value, _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp), toUnixTimestamp64Milli(_timestamp_ms), _headers.name, _headers.value FROM test.kafka; ''') producer = KafkaProducer(bootstrap_servers="localhost:9092") producer.send(topic='virt2_0', value=json.dumps({'value': 1}), partition=0, key='k1', timestamp_ms=1577836801001, headers=[('content-encoding', b'base64')]) producer.send(topic='virt2_0', value=json.dumps({'value': 2}), partition=0, key='k2', timestamp_ms=1577836802002, headers=[('empty_value', ''),('', 'empty name'), ('',''), ('repetition', '1'), ('repetition', '2')]) producer.flush() time.sleep(1) producer.send(topic='virt2_0', value=json.dumps({'value': 3}), partition=1, key='k3', timestamp_ms=1577836803003, headers=[('b', 'b'),('a', 'a')]) producer.send(topic='virt2_0', value=json.dumps({'value': 4}), partition=1, key='k4', timestamp_ms=1577836804004, headers=[('a', 'a'),('b', 'b')]) producer.flush() time.sleep(1) producer.send(topic='virt2_1', value=json.dumps({'value': 5}), partition=0, key='k5', timestamp_ms=1577836805005) producer.send(topic='virt2_1', value=json.dumps({'value': 6}), partition=0, key='k6', timestamp_ms=1577836806006) producer.flush() time.sleep(1) producer.send(topic='virt2_1', value=json.dumps({'value': 7}), partition=1, key='k7', timestamp_ms=1577836807007) producer.send(topic='virt2_1', value=json.dumps({'value': 8}), partition=1, key='k8', timestamp_ms=1577836808008) producer.flush() time.sleep(10) members = describe_consumer_group('virt2') #pprint.pprint(members) members[0]['client_id'] = u'ClickHouse-instance-test-kafka-0' members[1]['client_id'] = u'ClickHouse-instance-test-kafka-1' result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True) expected = '''\ 1 k1 virt2_0 0 0 1577836801 1577836801001 ['content-encoding'] ['base64'] 2 k2 virt2_0 0 1 1577836802 1577836802002 ['empty_value','','','repetition','repetition'] ['','empty name','','1','2'] 3 k3 virt2_0 1 0 1577836803 1577836803003 ['b','a'] ['b','a'] 4 k4 virt2_0 1 1 1577836804 1577836804004 ['a','b'] ['a','b'] 5 k5 virt2_1 0 0 1577836805 1577836805005 [] [] 6 k6 virt2_1 0 1 1577836806 1577836806006 [] [] 7 k7 virt2_1 1 0 1577836807 1577836807007 [] [] 8 k8 virt2_1 1 1 1577836808 1577836808008 [] [] ''' assert TSV(result) == TSV(expected) @pytest.mark.timeout(240) def test_kafka_produce_key_timestamp(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka_writer (key UInt64, value UInt64, _key String, _timestamp DateTime) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert3', kafka_group_name = 'insert3', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; CREATE TABLE test.kafka (key UInt64, value UInt64, inserted_key String, inserted_timestamp DateTime) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'insert3', kafka_group_name = 'insert3', kafka_format = 'TSV', kafka_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.view Engine=Log AS SELECT key, value, inserted_key, toUnixTimestamp(inserted_timestamp), _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka; ''') instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(1,1,'k1',1577836801)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(2,2,'k2',1577836802)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({})),({},{},'{}',toDateTime({}))".format(3,3,'k3',1577836803,4,4,'k4',1577836804)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(5,5,'k5',1577836805)) time.sleep(10) result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True) # print(result) expected = '''\ 1 1 k1 1577836801 k1 insert3 0 0 1577836801 2 2 k2 1577836802 k2 insert3 0 1 1577836802 3 3 k3 1577836803 k3 insert3 0 2 1577836803 4 4 k4 1577836804 k4 insert3 0 3 1577836804 5 5 k5 1577836805 k5 insert3 0 4 1577836805 ''' assert TSV(result) == TSV(expected) @pytest.mark.timeout(600) def test_kafka_flush_by_time(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'flush_by_time', kafka_group_name = 'flush_by_time', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') cancel = threading.Event() def produce(): while not cancel.is_set(): messages = [] messages.append(json.dumps({'key': 0, 'value': 0})) kafka_produce('flush_by_time', messages) time.sleep(1) kafka_thread = threading.Thread(target=produce) kafka_thread.start() time.sleep(18) result = instance.query('SELECT count() FROM test.view') print(result) cancel.set() kafka_thread.join() # kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # 40 = 2 flushes (7.5 sec), 15 polls each, about 1 mgs per 1.5 sec assert int(result) > 12, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!' @pytest.mark.timeout(600) def test_kafka_flush_by_block_size(kafka_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'flush_by_block_size', kafka_group_name = 'flush_by_block_size', kafka_format = 'JSONEachRow', kafka_max_block_size = 100, kafka_row_delimiter = '\\n'; SELECT * FROM test.kafka; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages = [] for _ in range(101): messages.append(json.dumps({'key': 0, 'value': 0})) kafka_produce('flush_by_block_size', messages) time.sleep(1) # TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216 # second flush happens earlier than expected, so we have 2 parts here instead of one # flush by block size works correctly, so the feature checked by the test is working correctly result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'") # print(result) # kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # 100 = first poll should return 100 messages (and rows) # not waiting for stream_flush_interval_ms assert int(result) == 100, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!' @pytest.mark.timeout(600) def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="topic_with_multiple_partitions2", num_partitions=10, replication_factor=1)) admin_client.create_topics(new_topics=topic_list, validate_only=False) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'topic_with_multiple_partitions2', kafka_group_name = 'topic_with_multiple_partitions2', kafka_format = 'JSONEachRow', kafka_max_block_size = 211; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka; ''') messages = [] count = 0 for dummy_msg in range(1000): rows = [] for dummy_row in range(random.randrange(3,10)): count = count + 1 rows.append(json.dumps({'key': count, 'value': count})) messages.append("\n".join(rows)) kafka_produce('topic_with_multiple_partitions2', messages) time.sleep(30) result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') print(result) assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(count) ) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') @pytest.mark.timeout(1200) def test_kafka_rebalance(kafka_cluster): NUMBER_OF_CONSURRENT_CONSUMERS=11 instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination ( key UInt64, value UInt64, _topic String, _key String, _offset UInt64, _partition UInt64, _timestamp Nullable(DateTime), _consumed_by LowCardinality(String) ) ENGINE = MergeTree() ORDER BY key; ''') # kafka_cluster.open_bash_shell('instance') #time.sleep(2) admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1)) admin_client.create_topics(new_topics=topic_list, validate_only=False) cancel = threading.Event() msg_index = [0] def produce(): while not cancel.is_set(): messages = [] for _ in range(59): messages.append(json.dumps({'key': msg_index[0], 'value': msg_index[0]})) msg_index[0] += 1 kafka_produce('topic_with_multiple_partitions', messages) kafka_thread = threading.Thread(target=produce) kafka_thread.start() for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS): table_name = 'kafka_consumer{}'.format(consumer_index) print("Setting up {}".format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; CREATE TABLE test.{0} (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'topic_with_multiple_partitions', kafka_group_name = 'rebalance_test_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 33; CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS SELECT key, value, _topic, _key, _offset, _partition, _timestamp, '{0}' as _consumed_by FROM test.{0}; '''.format(table_name)) # kafka_cluster.open_bash_shell('instance') while int(instance.query("SELECT count() FROM test.destination WHERE _consumed_by='{}'".format(table_name))) == 0: print("Waiting for test.kafka_consumer{} to start consume".format(consumer_index)) time.sleep(1) cancel.set() # I leave last one working by intent (to finish consuming after all rebalances) for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS-1): print("Dropping test.kafka_consumer{}".format(consumer_index)) instance.query('DROP TABLE IF EXISTS test.kafka_consumer{}'.format(consumer_index)) while int(instance.query("SELECT count() FROM system.tables WHERE database='test' AND name='kafka_consumer{}'".format(consumer_index))) == 1: time.sleep(1) # print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination')) # kafka_cluster.open_bash_shell('instance') while 1: messages_consumed = int(instance.query('SELECT uniqExact(key) FROM test.destination')) if messages_consumed >= msg_index[0]: break time.sleep(1) print("Waiting for finishing consuming (have {}, should be {})".format(messages_consumed,msg_index[0])) print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination')) # Some queries to debug... # SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1) # select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0; # SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key; # select _partition from test.destination group by _partition having count() <> max(_offset) + 1; # select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset; # SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset; # CREATE TABLE test.reference (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', # kafka_topic_list = 'topic_with_multiple_partitions', # kafka_group_name = 'rebalance_test_group_reference', # kafka_format = 'JSONEachRow', # kafka_max_block_size = 100000; # # CREATE MATERIALIZED VIEW test.reference_mv Engine=Log AS # SELECT key, value, _topic,_key,_offset, _partition, _timestamp, 'reference' as _consumed_by # FROM test.reference; # # select * from test.reference_mv left join test.destination using (key,_topic,_offset,_partition) where test.destination._consumed_by = ''; result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination')) for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS): print("kafka_consumer{}".format(consumer_index)) table_name = 'kafka_consumer{}'.format(consumer_index) instance.query(''' DROP TABLE IF EXISTS test.{0}; DROP TABLE IF EXISTS test.{0}_mv; '''.format(table_name)) instance.query(''' DROP TABLE IF EXISTS test.destination; ''') kafka_thread.join() assert result == 1, 'Messages from kafka get duplicated!' @pytest.mark.timeout(1200) def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)] kafka_produce('no_holes_when_write_suffix_failed', messages) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'no_holes_when_write_suffix_failed', kafka_group_name = 'no_holes_when_write_suffix_failed', kafka_format = 'JSONEachRow', kafka_max_block_size = 20; CREATE TABLE test.view (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1') ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka WHERE NOT sleepEachRow(1); ''') # the tricky part here is that disconnect should happen after write prefix, but before write suffix # so i use sleepEachRow with PartitionManager() as pm: time.sleep(12) pm.drop_instance_zk_connections(instance) time.sleep(20) pm.heal_all # connection restored and it will take a while until next block will be flushed # it takes years on CI :\ time.sleep(90) # as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly assert instance.contains_in_log("ZooKeeper session has been expired.: while write prefix to view") result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') print(result) # kafka_cluster.open_bash_shell('instance') instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') assert TSV(result) == TSV('22\t22\t22') @pytest.mark.timeout(120) def test_exception_from_destructor(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'xyz', kafka_group_name = '', kafka_format = 'JSONEachRow'; ''') instance.query_and_get_error(''' SELECT * FROM test.kafka; ''') instance.query(''' DROP TABLE test.kafka; ''') instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'xyz', kafka_group_name = '', kafka_format = 'JSONEachRow'; ''') instance.query(''' DROP TABLE test.kafka; ''') #kafka_cluster.open_bash_shell('instance') assert TSV(instance.query('SELECT 1')) == TSV('1') @pytest.mark.timeout(120) def test_commits_of_unprocessed_messages_on_drop(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(1)] kafka_produce('commits_of_unprocessed_messages_on_drop', messages) instance.query(''' DROP TABLE IF EXISTS test.destination; CREATE TABLE test.destination ( key UInt64, value UInt64, _topic String, _key String, _offset UInt64, _partition UInt64, _timestamp Nullable(DateTime), _consumed_by LowCardinality(String) ) ENGINE = MergeTree() ORDER BY key; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'commits_of_unprocessed_messages_on_drop', kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 1000; CREATE MATERIALIZED VIEW test.kafka_consumer TO test.destination AS SELECT key, value, _topic, _key, _offset, _partition, _timestamp FROM test.kafka; ''') while int(instance.query("SELECT count() FROM test.destination")) == 0: print("Waiting for test.kafka_consumer to start consume") time.sleep(1) cancel = threading.Event() i = [2] def produce(): while not cancel.is_set(): messages = [] for _ in range(113): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 kafka_produce('commits_of_unprocessed_messages_on_drop', messages) time.sleep(1) kafka_thread = threading.Thread(target=produce) kafka_thread.start() time.sleep(12) instance.query(''' DROP TABLE test.kafka; ''') instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'commits_of_unprocessed_messages_on_drop', kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 10000; ''') cancel.set() time.sleep(15) #kafka_cluster.open_bash_shell('instance') # SELECT key, _timestamp, _offset FROM test.destination where runningDifference(key) <> 1 ORDER BY key; result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.destination') print(result) instance.query(''' DROP TABLE test.kafka_consumer; DROP TABLE test.destination; ''') kafka_thread.join() assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0]-1)), 'Missing data!' @pytest.mark.timeout(120) def test_bad_reschedule(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)] kafka_produce('test_bad_reschedule', messages) instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'test_bad_reschedule', kafka_group_name = 'test_bad_reschedule', kafka_format = 'JSONEachRow', kafka_max_block_size = 1000; CREATE MATERIALIZED VIEW test.destination Engine=Log AS SELECT key, now() as consume_ts, value, _topic, _key, _offset, _partition, _timestamp FROM test.kafka; ''') while int(instance.query("SELECT count() FROM test.destination")) < 20000: print("Waiting for consume") time.sleep(1) assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8 @pytest.mark.timeout(1200) def test_kafka_duplicates_when_commit_failed(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)] kafka_produce('duplicates_when_commit_failed', messages) instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'duplicates_when_commit_failed', kafka_group_name = 'duplicates_when_commit_failed', kafka_format = 'JSONEachRow', kafka_max_block_size = 20; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka WHERE NOT sleepEachRow(0.5); ''') #print time.strftime("%m/%d/%Y %H:%M:%S") time.sleep(12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen #print time.strftime("%m/%d/%Y %H:%M:%S") kafka_cluster.pause_container('kafka1') # that timeout it VERY important, and picked after lot of experiments # when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka) # when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member" time.sleep(40) #print time.strftime("%m/%d/%Y %H:%M:%S") kafka_cluster.unpause_container('kafka1') #kafka_cluster.open_bash_shell('instance') # connection restored and it will take a while until next block will be flushed # it takes years on CI :\ time.sleep(30) # as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly assert instance.contains_in_log("Local: Waiting for coordinator") assert instance.contains_in_log("All commit attempts failed") result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') print(result) instance.query(''' DROP TABLE test.consumer; DROP TABLE test.view; ''') # After https://github.com/edenhill/librdkafka/issues/2631 # timeout triggers rebalance, making further commits to the topic after getting back online # impossible. So we have a duplicate in that scenario, but we report that situation properly. assert TSV(result) == TSV('42\t22\t22') if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...") cluster.shutdown()