mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Rewrite tests one more time
This commit is contained in:
parent
018df69d3d
commit
c9bc09ab7f
@ -18,21 +18,12 @@ cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance('instance',
|
||||
main_configs=['configs/kafka.xml'],
|
||||
with_kafka=True)
|
||||
kafka_id = instance.cluster.kafka_docker_id
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
instance.query('CREATE DATABASE test')
|
||||
# Helpers
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def kafka_is_available(kafka_id):
|
||||
def check_kafka_is_available():
|
||||
p = subprocess.Popen(('docker',
|
||||
'exec',
|
||||
'-i',
|
||||
@ -41,11 +32,24 @@ def kafka_is_available(kafka_id):
|
||||
'--bootstrap-server',
|
||||
'PLAINTEXT://localhost:9092'),
|
||||
stdout=subprocess.PIPE)
|
||||
p.communicate()[0]
|
||||
p.communicate()
|
||||
return p.returncode == 0
|
||||
|
||||
|
||||
def kafka_produce(kafka_id, topic, messages):
|
||||
def wait_kafka_is_available(max_retries=50):
|
||||
retries = 0
|
||||
while True:
|
||||
if check_kafka_is_available():
|
||||
break
|
||||
else:
|
||||
retries += 1
|
||||
if retries > max_retries:
|
||||
raise "Kafka is not available"
|
||||
print("Waiting for Kafka to start up")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def kafka_produce(topic, messages):
|
||||
p = subprocess.Popen(('docker',
|
||||
'exec',
|
||||
'-i',
|
||||
@ -60,86 +64,132 @@ def kafka_produce(kafka_id, topic, messages):
|
||||
p.stdin.close()
|
||||
|
||||
|
||||
def kafka_check_json_numbers(instance, insert_malformed=False, table='test.kafka', select_count=3):
|
||||
retries = 0
|
||||
while True:
|
||||
if kafka_is_available(instance.cluster.kafka_docker_id):
|
||||
break
|
||||
else:
|
||||
retries += 1
|
||||
if retries > 50:
|
||||
raise 'Cannot connect to kafka.'
|
||||
print("Waiting for kafka to be available...")
|
||||
time.sleep(1)
|
||||
|
||||
messages = ''
|
||||
for i in range(25):
|
||||
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
||||
kafka_produce(instance.cluster.kafka_docker_id, 'json', messages)
|
||||
|
||||
if insert_malformed:
|
||||
# Insert couple of malformed messages.
|
||||
kafka_produce(instance.cluster.kafka_docker_id, 'json', '}{very_broken_message,\n')
|
||||
kafka_produce(instance.cluster.kafka_docker_id, 'json', '}{very_broken_message,\n')
|
||||
|
||||
messages = ''
|
||||
for i in range(25, 50):
|
||||
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
||||
kafka_produce(instance.cluster.kafka_docker_id, 'json', messages)
|
||||
|
||||
# Since the broken message breaks the `select` reading
|
||||
# we'll try to select multiple times.
|
||||
result = ''
|
||||
for i in range(select_count):
|
||||
while True:
|
||||
time.sleep(1)
|
||||
new_result = instance.query('SELECT * FROM {};'.format(table))
|
||||
if new_result:
|
||||
result += new_result
|
||||
break
|
||||
|
||||
def kafka_check_result(result):
|
||||
fpath = p.join(p.dirname(__file__), 'test_kafka_json.reference')
|
||||
with open(fpath) as reference:
|
||||
assert TSV(result) == TSV(reference)
|
||||
|
||||
|
||||
def test_kafka_json(started_cluster):
|
||||
# Fixtures
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def kafka_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
instance.query('CREATE DATABASE test')
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def kafka_setup_teardown():
|
||||
instance.query('DROP TABLE IF EXISTS test.kafka')
|
||||
wait_kafka_is_available()
|
||||
yield # run test
|
||||
instance.query('DROP TABLE test.kafka')
|
||||
|
||||
|
||||
# Tests
|
||||
|
||||
def test_kafka_settings_old_syntax(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka('kafka1:9092', 'json', 'json',
|
||||
'JSONEachRow', '\\n');
|
||||
ENGINE = Kafka('kafka1:9092', 'old', 'old', 'JSONEachRow', '\\n');
|
||||
''')
|
||||
|
||||
# Don't insert malformed messages since old settings syntax
|
||||
# doesn't support skipping of broken messages.
|
||||
kafka_check_json_numbers(instance)
|
||||
messages = ''
|
||||
for i in range(50):
|
||||
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
||||
kafka_produce('old', messages)
|
||||
|
||||
instance.query('DROP TABLE test.kafka')
|
||||
result = instance.query('SELECT * FROM test.kafka')
|
||||
kafka_check_result(result)
|
||||
|
||||
|
||||
def test_kafka_json_settings(started_cluster):
|
||||
def test_kafka_settings_new_syntax(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS
|
||||
kafka_broker_list = 'kafka1:9092',
|
||||
kafka_topic_list = 'json',
|
||||
kafka_group_name = 'json',
|
||||
kafka_topic_list = 'new',
|
||||
kafka_group_name = 'new',
|
||||
kafka_format = 'JSONEachRow',
|
||||
kafka_row_delimiter = '\\n',
|
||||
kafka_skip_broken_messages = 1;
|
||||
''')
|
||||
|
||||
kafka_check_json_numbers(instance, True)
|
||||
messages = ''
|
||||
for i in range(25):
|
||||
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
||||
kafka_produce('new', messages)
|
||||
|
||||
instance.query('DROP TABLE test.kafka')
|
||||
# Insert couple of malformed messages.
|
||||
kafka_produce('new', '}{very_broken_message,\n')
|
||||
kafka_produce('new', '}another{very_broken_message,\n')
|
||||
|
||||
messages = ''
|
||||
for i in range(25, 50):
|
||||
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
||||
kafka_produce('new', messages)
|
||||
|
||||
# Since the broken message breaks the `select`,
|
||||
# we'll try to select multiple times.
|
||||
result = instance.query('SELECT * FROM test.kafka')
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
result += instance.query('SELECT * FROM test.kafka')
|
||||
kafka_check_result(result)
|
||||
|
||||
|
||||
def test_kafka_json_materialized_view(started_cluster):
|
||||
def test_kafka_csv_with_delimiter(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS
|
||||
kafka_broker_list = 'kafka1:9092',
|
||||
kafka_topic_list = 'csv',
|
||||
kafka_group_name = 'csv',
|
||||
kafka_format = 'CSV',
|
||||
kafka_row_delimiter = '\\n';
|
||||
''')
|
||||
|
||||
messages = ''
|
||||
for i in range(50):
|
||||
messages += '{i}, {i}\n'.format(i=i)
|
||||
kafka_produce('csv', messages)
|
||||
|
||||
result = instance.query('SELECT * FROM test.kafka')
|
||||
kafka_check_result(result)
|
||||
|
||||
|
||||
def test_kafka_tsv_with_delimiter(kafka_cluster):
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS
|
||||
kafka_broker_list = 'kafka1:9092',
|
||||
kafka_topic_list = 'tsv',
|
||||
kafka_group_name = 'tsv',
|
||||
kafka_format = 'TSV',
|
||||
kafka_row_delimiter = '\\n';
|
||||
''')
|
||||
|
||||
messages = ''
|
||||
for i in range(50):
|
||||
messages += '{i}\t{i}\n'.format(i=i)
|
||||
kafka_produce('tsv', messages)
|
||||
|
||||
result = instance.query('SELECT * FROM test.kafka')
|
||||
kafka_check_result(result)
|
||||
|
||||
|
||||
def test_kafka_materialized_view(kafka_cluster):
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.consumer;
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
@ -149,8 +199,7 @@ def test_kafka_json_materialized_view(started_cluster):
|
||||
kafka_topic_list = 'json',
|
||||
kafka_group_name = 'json',
|
||||
kafka_format = 'JSONEachRow',
|
||||
kafka_row_delimiter = '\\n',
|
||||
kafka_skip_broken_messages = 2;
|
||||
kafka_row_delimiter = '\\n';
|
||||
CREATE TABLE test.view (key UInt64, value UInt64)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY key;
|
||||
@ -158,12 +207,22 @@ def test_kafka_json_materialized_view(started_cluster):
|
||||
SELECT * FROM test.kafka;
|
||||
''')
|
||||
|
||||
kafka_check_json_numbers(instance, True, 'test.view', 1)
|
||||
messages = ''
|
||||
for i in range(50):
|
||||
messages += json.dumps({'key': i, 'value': i}) + '\n'
|
||||
kafka_produce('json', messages)
|
||||
|
||||
# Try select multiple times, until we get results
|
||||
for i in range(3):
|
||||
time.sleep(1)
|
||||
result = instance.query('SELECT * FROM test.view')
|
||||
if result:
|
||||
break
|
||||
kafka_check_result(result)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE test.kafka;
|
||||
DROP TABLE test.view;
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user