Refactor tests

This commit is contained in:
Ivan Lezhankin 2019-06-23 17:48:58 +03:00
parent f34e4b53ce
commit d6d10120c8
4 changed files with 163 additions and 123 deletions

View File

@ -22,7 +22,6 @@ import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for mat. view is working.
# TODO: add test for SELECT LIMIT is working.
# TODO: modify tests to respect `skip_broken_messages` setting.
@ -148,13 +147,12 @@ def test_kafka_settings_new_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
kafka_skip_broken_messages = 1;
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
kafka_skip_broken_messages = 1;
''')
messages = []
@ -172,7 +170,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_produce('new', messages)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -183,12 +181,11 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n';
''')
messages = []
@ -197,7 +194,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
kafka_produce('csv', messages)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -208,12 +205,11 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
messages = []
@ -222,7 +218,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_produce('tsv', messages)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -233,25 +229,24 @@ def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json1',
kafka_group_name = 'json1',
kafka_format = 'JSONEachRow';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json1', [messages])
kafka_produce('json', [messages])
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json1', [messages])
kafka_produce('json', [messages])
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -262,12 +257,11 @@ def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages('pb', 0, 20)
@ -275,7 +269,7 @@ def test_kafka_protobuf(kafka_cluster):
kafka_produce_protobuf_messages('pb', 21, 29)
result = ''
for i in range(50):
while True:
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
@ -288,12 +282,11 @@ def test_kafka_materialized_view(kafka_cluster):
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json2',
kafka_group_name = 'json2',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'mv',
kafka_group_name = 'mv',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
@ -304,9 +297,9 @@ def test_kafka_materialized_view(kafka_cluster):
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('json2', messages)
kafka_produce('mv', messages)
for i in range(20):
while True:
time.sleep(1)
result = instance.query('SELECT * FROM test.view')
if kafka_check_result(result):
@ -331,12 +324,11 @@ def test_kafka_flush_on_big_message(kafka_cluster):
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush',
kafka_group_name = 'flush',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10;
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'flush',
kafka_group_name = 'flush',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree
ORDER BY key;
@ -356,7 +348,7 @@ def test_kafka_flush_on_big_message(kafka_cluster):
except kafka.errors.GroupCoordinatorNotAvailableError:
continue
for _ in range(20):
while True:
time.sleep(1)
result = instance.query('SELECT count() FROM test.view')
if int(result) == kafka_messages*batch_messages:
@ -369,30 +361,29 @@ def test_kafka_virtual_columns(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json3',
kafka_group_name = 'json3',
kafka_format = 'JSONEachRow';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt1',
kafka_group_name = 'virt1',
kafka_format = 'JSONEachRow';
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json3', [messages])
kafka_produce('virt1', [messages])
messages = ''
for i in range(25, 50):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json3', [messages])
kafka_produce('virt1', [messages])
result = ''
for i in range(50):
while True:
time.sleep(1)
result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka')
if kafka_check_result(result, False, 'test_kafka_virtual.reference'):
if kafka_check_result(result, False, 'test_kafka_virtual1.reference'):
break
kafka_check_result(result, True, 'test_kafka_virtual.reference')
kafka_check_result(result, True, 'test_kafka_virtual1.reference')
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
@ -401,12 +392,11 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json3',
kafka_group_name = 'json3',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt2',
kafka_group_name = 'virt2',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64)
ENGINE = MergeTree()
ORDER BY key;
@ -417,14 +407,14 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
messages = []
for i in range(50):
messages.append(json.dumps({'key': i, 'value': i}))
kafka_produce('json3', messages)
kafka_produce('virt2', messages)
for i in range(20):
while True:
time.sleep(1)
result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view')
if kafka_check_result(result, False, 'test_kafka_virtual.reference'):
if kafka_check_result(result, False, 'test_kafka_virtual2.reference'):
break
kafka_check_result(result, True, 'test_kafka_virtual.reference')
kafka_check_result(result, True, 'test_kafka_virtual2.reference')
instance.query('''
DROP TABLE test.consumer;

View File

@ -1,50 +0,0 @@
0 json3 0 0
1 json3 1 0
2 json3 2 0
3 json3 3 0
4 json3 4 0
5 json3 5 0
6 json3 6 0
7 json3 7 0
8 json3 8 0
9 json3 9 0
10 json3 10 0
11 json3 11 0
12 json3 12 0
13 json3 13 0
14 json3 14 0
15 json3 15 0
16 json3 16 0
17 json3 17 0
18 json3 18 0
19 json3 19 0
20 json3 20 0
21 json3 21 0
22 json3 22 0
23 json3 23 0
24 json3 24 0
25 json3 25 1
26 json3 26 1
27 json3 27 1
28 json3 28 1
29 json3 29 1
30 json3 30 1
31 json3 31 1
32 json3 32 1
33 json3 33 1
34 json3 34 1
35 json3 35 1
36 json3 36 1
37 json3 37 1
38 json3 38 1
39 json3 39 1
40 json3 40 1
41 json3 41 1
42 json3 42 1
43 json3 43 1
44 json3 44 1
45 json3 45 1
46 json3 46 1
47 json3 47 1
48 json3 48 1
49 json3 49 1

View File

@ -0,0 +1,50 @@
0 virt1 0 0
1 virt1 1 0
2 virt1 2 0
3 virt1 3 0
4 virt1 4 0
5 virt1 5 0
6 virt1 6 0
7 virt1 7 0
8 virt1 8 0
9 virt1 9 0
10 virt1 10 0
11 virt1 11 0
12 virt1 12 0
13 virt1 13 0
14 virt1 14 0
15 virt1 15 0
16 virt1 16 0
17 virt1 17 0
18 virt1 18 0
19 virt1 19 0
20 virt1 20 0
21 virt1 21 0
22 virt1 22 0
23 virt1 23 0
24 virt1 24 0
25 virt1 25 1
26 virt1 26 1
27 virt1 27 1
28 virt1 28 1
29 virt1 29 1
30 virt1 30 1
31 virt1 31 1
32 virt1 32 1
33 virt1 33 1
34 virt1 34 1
35 virt1 35 1
36 virt1 36 1
37 virt1 37 1
38 virt1 38 1
39 virt1 39 1
40 virt1 40 1
41 virt1 41 1
42 virt1 42 1
43 virt1 43 1
44 virt1 44 1
45 virt1 45 1
46 virt1 46 1
47 virt1 47 1
48 virt1 48 1
49 virt1 49 1

View File

@ -0,0 +1,50 @@
0 virt2 0 0
1 virt2 1 0
2 virt2 2 0
3 virt2 3 0
4 virt2 4 0
5 virt2 5 0
6 virt2 6 0
7 virt2 7 0
8 virt2 8 0
9 virt2 9 0
10 virt2 10 0
11 virt2 11 0
12 virt2 12 0
13 virt2 13 0
14 virt2 14 0
15 virt2 15 0
16 virt2 16 0
17 virt2 17 0
18 virt2 18 0
19 virt2 19 0
20 virt2 20 0
21 virt2 21 0
22 virt2 22 0
23 virt2 23 0
24 virt2 24 0
25 virt2 25 1
26 virt2 26 1
27 virt2 27 1
28 virt2 28 1
29 virt2 29 1
30 virt2 30 1
31 virt2 31 1
32 virt2 32 1
33 virt2 33 1
34 virt2 34 1
35 virt2 35 1
36 virt2 36 1
37 virt2 37 1
38 virt2 38 1
39 virt2 39 1
40 virt2 40 1
41 virt2 41 1
42 virt2 42 1
43 virt2 43 1
44 virt2 44 1
45 virt2 45 1
46 virt2 46 1
47 virt2 47 1
48 virt2 48 1
49 virt2 49 1