mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
174 lines
5.4 KiB
Python
174 lines
5.4 KiB
Python
import os.path as p
|
|
import time
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import TSV
|
|
|
|
import json
|
|
import subprocess
|
|
|
|
|
|
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
|
|
# TODO: add test for mat. view is working.
|
|
# TODO: add test for SELECT LIMIT is working.
|
|
# TODO: modify tests to respect `skip_broken_messages` setting.
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance('instance',
|
|
main_configs=['configs/kafka.xml'],
|
|
with_kafka=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
instance.query('CREATE DATABASE test')
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def kafka_is_available(kafka_id):
|
|
p = subprocess.Popen(('docker',
|
|
'exec',
|
|
'-i',
|
|
kafka_id,
|
|
'/usr/bin/kafka-broker-api-versions',
|
|
'--bootstrap-server',
|
|
'PLAINTEXT://localhost:9092'),
|
|
stdout=subprocess.PIPE)
|
|
p.communicate()[0]
|
|
return p.returncode == 0
|
|
|
|
|
|
def kafka_produce(kafka_id, topic, messages):
|
|
p = subprocess.Popen(('docker',
|
|
'exec',
|
|
'-i',
|
|
kafka_id,
|
|
'/usr/bin/kafka-console-producer',
|
|
'--broker-list',
|
|
'localhost:9092',
|
|
'--topic',
|
|
topic),
|
|
stdin=subprocess.PIPE)
|
|
p.communicate(messages)
|
|
p.stdin.close()
|
|
|
|
|
|
def kafka_check_json_numbers(instance, insert_malformed=False, table='test.kafka', select_count=3):
|
|
retries = 0
|
|
while True:
|
|
if kafka_is_available(instance.cluster.kafka_docker_id):
|
|
break
|
|
else:
|
|
retries += 1
|
|
if retries > 50:
|
|
raise 'Cannot connect to kafka.'
|
|
print("Waiting for kafka to be available...")
|
|
time.sleep(1)
|
|
|
|
messages = ''
|
|
for i in range(25):
|
|
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
|
kafka_produce(instance.cluster.kafka_docker_id, 'json', messages)
|
|
|
|
if insert_malformed:
|
|
# Insert couple of malformed messages.
|
|
kafka_produce(instance.cluster.kafka_docker_id, 'json', '}{very_broken_message,\n')
|
|
kafka_produce(instance.cluster.kafka_docker_id, 'json', '}{very_broken_message,\n')
|
|
|
|
messages = ''
|
|
for i in range(25, 50):
|
|
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
|
kafka_produce(instance.cluster.kafka_docker_id, 'json', messages)
|
|
|
|
# Since the broken message breaks the `select` reading
|
|
# we'll try to select multiple times.
|
|
result = ''
|
|
for i in range(select_count):
|
|
while True:
|
|
time.sleep(1)
|
|
new_result = instance.query('SELECT * FROM {};'.format(table))
|
|
if new_result:
|
|
result += new_result
|
|
break
|
|
|
|
fpath = p.join(p.dirname(__file__), 'test_kafka_json.reference')
|
|
with open(fpath) as reference:
|
|
assert TSV(result) == TSV(reference)
|
|
|
|
|
|
def test_kafka_json(started_cluster):
|
|
instance.query('''
|
|
DROP TABLE IF EXISTS test.kafka;
|
|
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
|
ENGINE = Kafka('kafka1:9092', 'json', 'json',
|
|
'JSONEachRow', '\\n');
|
|
''')
|
|
|
|
# Don't insert malformed messages since old settings syntax
|
|
# doesn't support skipping of broken messages.
|
|
kafka_check_json_numbers(instance)
|
|
|
|
instance.query('DROP TABLE test.kafka')
|
|
|
|
|
|
def test_kafka_json_settings(started_cluster):
|
|
instance.query('''
|
|
DROP TABLE IF EXISTS test.kafka;
|
|
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
|
ENGINE = Kafka
|
|
SETTINGS
|
|
kafka_broker_list = 'kafka1:9092',
|
|
kafka_topic_list = 'json',
|
|
kafka_group_name = 'json',
|
|
kafka_format = 'JSONEachRow',
|
|
kafka_row_delimiter = '\\n',
|
|
kafka_skip_broken_messages = 1;
|
|
''')
|
|
|
|
kafka_check_json_numbers(instance, True)
|
|
|
|
instance.query('DROP TABLE test.kafka')
|
|
|
|
|
|
def test_kafka_json_materialized_view(started_cluster):
|
|
instance.query('''
|
|
DROP TABLE IF EXISTS test.kafka;
|
|
DROP TABLE IF EXISTS test.view;
|
|
DROP TABLE IF EXISTS test.consumer;
|
|
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
|
ENGINE = Kafka
|
|
SETTINGS
|
|
kafka_broker_list = 'kafka1:9092',
|
|
kafka_topic_list = 'json',
|
|
kafka_group_name = 'json',
|
|
kafka_format = 'JSONEachRow',
|
|
kafka_row_delimiter = '\\n',
|
|
kafka_skip_broken_messages = 2;
|
|
CREATE TABLE test.view (key UInt64, value UInt64)
|
|
ENGINE = MergeTree()
|
|
ORDER BY key;
|
|
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
|
|
SELECT * FROM test.kafka;
|
|
''')
|
|
|
|
kafka_check_json_numbers(instance, True, 'test.view', 1)
|
|
|
|
instance.query('''
|
|
DROP TABLE test.kafka;
|
|
DROP TABLE test.view;
|
|
DROP TABLE test.consumer;
|
|
''')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cluster.start()
|
|
raw_input("Cluster created, press any key to destroy...")
|
|
cluster.shutdown()
|