diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 41076ac78c4..a591a343c5f 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -27,11 +27,11 @@ def started_cluster(): cluster.shutdown() -def kafka_is_available(started_cluster): +def kafka_is_available(kafka_id): p = subprocess.Popen(('docker', 'exec', '-i', - started_cluster.kafka_docker_id, + kafka_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'PLAINTEXT://localhost:9092'), @@ -40,11 +40,11 @@ def kafka_is_available(started_cluster): return p.returncode == 0 -def kafka_produce(started_cluster, topic, messages): +def kafka_produce(kafka_id, topic, messages): p = subprocess.Popen(('docker', 'exec', '-i', - started_cluster.kafka_docker_id, + kafka_id, '/usr/bin/kafka-console-producer', '--broker-list', 'localhost:9092', @@ -58,7 +58,7 @@ def kafka_produce(started_cluster, topic, messages): def kafka_check_json_numbers(instance): retries = 0 while True: - if kafka_is_available(started_cluster): + if kafka_is_available(instance.cluster.kafka_docker_id): break else: retries += 1 @@ -67,13 +67,17 @@ def kafka_check_json_numbers(instance): print("Waiting for kafka to be available...") time.sleep(1) messages = '' - for i in xrange(50): + for i in range(50): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce(started_cluster, 'json', messages) - time.sleep(3) - result = instance.query('SELECT * FROM test.kafka;') - file = p.join(p.dirname(__file__), 'test_kafka_json.reference') - with open(file) as reference: + kafka_produce(instance.cluster.kafka_docker_id, 'json', messages) + for i in range(30): + result = instance.query('SELECT * FROM test.kafka;') + if result: + break + time.sleep(0.5) + + fpath = p.join(p.dirname(__file__), 'test_kafka_json.reference') + with open(fpath) as reference: assert TSV(result) == TSV(reference) @@ -95,9 +99,9 @@ def test_kafka_json_settings(started_cluster): ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:9092', - kafka_topic_list = 'json' - kafka_group_name = 'json' - kafka_format = 'JSONEachRow' + kafka_topic_list = 'json', + kafka_group_name = 'json', + kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; ''') kafka_check_json_numbers(instance)