CLICKHOUSE-3894: Fix Kafka test

This commit is contained in:
alesapin 2018-08-27 19:15:39 +03:00
parent 716c8016d7
commit 1f0f1ecad3

View File

@ -27,11 +27,11 @@ def started_cluster():
cluster.shutdown()
def kafka_is_available(started_cluster):
def kafka_is_available(kafka_id):
p = subprocess.Popen(('docker',
'exec',
'-i',
started_cluster.kafka_docker_id,
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'PLAINTEXT://localhost:9092'),
@ -40,11 +40,11 @@ def kafka_is_available(started_cluster):
return p.returncode == 0
def kafka_produce(started_cluster, topic, messages):
def kafka_produce(kafka_id, topic, messages):
p = subprocess.Popen(('docker',
'exec',
'-i',
started_cluster.kafka_docker_id,
kafka_id,
'/usr/bin/kafka-console-producer',
'--broker-list',
'localhost:9092',
@ -58,7 +58,7 @@ def kafka_produce(started_cluster, topic, messages):
def kafka_check_json_numbers(instance):
retries = 0
while True:
if kafka_is_available(started_cluster):
if kafka_is_available(instance.cluster.kafka_docker_id):
break
else:
retries += 1
@ -67,13 +67,17 @@ def kafka_check_json_numbers(instance):
print("Waiting for kafka to be available...")
time.sleep(1)
messages = ''
for i in xrange(50):
for i in range(50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce(started_cluster, 'json', messages)
time.sleep(3)
result = instance.query('SELECT * FROM test.kafka;')
file = p.join(p.dirname(__file__), 'test_kafka_json.reference')
with open(file) as reference:
kafka_produce(instance.cluster.kafka_docker_id, 'json', messages)
for i in range(30):
result = instance.query('SELECT * FROM test.kafka;')
if result:
break
time.sleep(0.5)
fpath = p.join(p.dirname(__file__), 'test_kafka_json.reference')
with open(fpath) as reference:
assert TSV(result) == TSV(reference)
@ -95,9 +99,9 @@ def test_kafka_json_settings(started_cluster):
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_topic_list = 'json'
kafka_group_name = 'json'
kafka_format = 'JSONEachRow'
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
''')
kafka_check_json_numbers(instance)