From d479836362579fea2dffa6119d621fb8447f510d Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 18 Apr 2019 18:52:18 +0300 Subject: [PATCH] Add test on lost messages --- .../helpers/docker_compose_kafka.yml | 2 +- .../integration/test_storage_kafka/test.py | 49 ++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/dbms/tests/integration/helpers/docker_compose_kafka.yml b/dbms/tests/integration/helpers/docker_compose_kafka.yml index bed537a9760..8fea4faa272 100644 --- a/dbms/tests/integration/helpers/docker_compose_kafka.yml +++ b/dbms/tests/integration/helpers/docker_compose_kafka.yml @@ -12,7 +12,7 @@ services: - label:disable kafka1: - image: confluentinc/cp-kafka:4.1.0 + image: confluentinc/cp-kafka:5.2.0 hostname: kafka1 ports: - "9092:9092" diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 0258d38b8ab..c67b95c1e83 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -7,7 +7,8 @@ from helpers.test_tools import TSV import json import subprocess -from kafka import KafkaProducer +import kafka.errors +from kafka import KafkaAdminClient, KafkaProducer from google.protobuf.internal.encoder import _VarintBytes """ @@ -318,6 +319,52 @@ def test_kafka_materialized_view(kafka_cluster): ''') +def test_kafka_flush_on_big_message(kafka_cluster): + # Create batchs of messages of size ~100Kb + kafka_messages = 10000 + batch_messages = 1000 + messages = [json.dumps({'key': i, 'value': 'x' * 100}) * batch_messages for i in range(kafka_messages)] + kafka_produce('flush', messages) + + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.kafka (key UInt64, value String) + ENGINE = Kafka + SETTINGS + kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'flush', + kafka_group_name = 'flush', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 10; + CREATE TABLE test.view (key UInt64, value String) + ENGINE = MergeTree + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.kafka; + ''') + + client = KafkaAdminClient(bootstrap_servers="localhost:9092") + received = False + while not received: + try: + offsets = client.list_consumer_group_offsets('flush') + for topic, offset in offsets.items(): + if topic.topic == 'flush' and offset.offset == kafka_messages: + received = True + break + except kafka.errors.GroupCoordinatorNotAvailableError: + continue + + for _ in range(20): + time.sleep(1) + result = instance.query('SELECT count() FROM test.view') + if int(result) == kafka_messages*batch_messages: + break + + assert int(result) == kafka_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) + + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...")