diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 082d9704020..8e42a83459f 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -22,7 +22,6 @@ import kafka_pb2 # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. -# TODO: add test for mat. view is working. # TODO: add test for SELECT LIMIT is working. # TODO: modify tests to respect `skip_broken_messages` setting. @@ -148,13 +147,12 @@ def test_kafka_settings_new_syntax(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'new', - kafka_group_name = 'new', - kafka_format = 'JSONEachRow', - kafka_row_delimiter = '\\n', - kafka_skip_broken_messages = 1; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'new', + kafka_group_name = 'new', + kafka_format = 'JSONEachRow', + kafka_row_delimiter = '\\n', + kafka_skip_broken_messages = 1; ''') messages = [] @@ -172,7 +170,7 @@ def test_kafka_settings_new_syntax(kafka_cluster): kafka_produce('new', messages) result = '' - for i in range(50): + while True: result += instance.query('SELECT * FROM test.kafka') if kafka_check_result(result): break @@ -183,12 +181,11 @@ def test_kafka_csv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'csv', - kafka_group_name = 'csv', - kafka_format = 'CSV', - kafka_row_delimiter = '\\n'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'csv', + kafka_group_name = 'csv', + kafka_format = 'CSV', + kafka_row_delimiter = '\\n'; ''') messages = [] @@ -197,7 +194,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster): kafka_produce('csv', messages) result = '' - for i in range(50): + while True: result += instance.query('SELECT * FROM test.kafka') if kafka_check_result(result): break @@ -208,12 +205,11 @@ def test_kafka_tsv_with_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'tsv', - kafka_group_name = 'tsv', - kafka_format = 'TSV', - kafka_row_delimiter = '\\n'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'tsv', + kafka_group_name = 'tsv', + kafka_format = 'TSV', + kafka_row_delimiter = '\\n'; ''') messages = [] @@ -222,7 +218,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster): kafka_produce('tsv', messages) result = '' - for i in range(50): + while True: result += instance.query('SELECT * FROM test.kafka') if kafka_check_result(result): break @@ -233,25 +229,24 @@ def test_kafka_json_without_delimiter(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'json1', - kafka_group_name = 'json1', - kafka_format = 'JSONEachRow'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'json', + kafka_group_name = 'json', + kafka_format = 'JSONEachRow'; ''') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce('json1', [messages]) + kafka_produce('json', [messages]) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce('json1', [messages]) + kafka_produce('json', [messages]) result = '' - for i in range(50): + while True: result += instance.query('SELECT * FROM test.kafka') if kafka_check_result(result): break @@ -262,12 +257,11 @@ def test_kafka_protobuf(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'pb', - kafka_group_name = 'pb', - kafka_format = 'Protobuf', - kafka_schema = 'kafka.proto:KeyValuePair'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'pb', + kafka_group_name = 'pb', + kafka_format = 'Protobuf', + kafka_schema = 'kafka.proto:KeyValuePair'; ''') kafka_produce_protobuf_messages('pb', 0, 20) @@ -275,7 +269,7 @@ def test_kafka_protobuf(kafka_cluster): kafka_produce_protobuf_messages('pb', 21, 29) result = '' - for i in range(50): + while True: result += instance.query('SELECT * FROM test.kafka') if kafka_check_result(result): break @@ -288,12 +282,11 @@ def test_kafka_materialized_view(kafka_cluster): DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'json2', - kafka_group_name = 'json2', - kafka_format = 'JSONEachRow', - kafka_row_delimiter = '\\n'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'mv', + kafka_group_name = 'mv', + kafka_format = 'JSONEachRow', + kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; @@ -304,9 +297,9 @@ def test_kafka_materialized_view(kafka_cluster): messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) - kafka_produce('json2', messages) + kafka_produce('mv', messages) - for i in range(20): + while True: time.sleep(1) result = instance.query('SELECT * FROM test.view') if kafka_check_result(result): @@ -331,12 +324,11 @@ def test_kafka_flush_on_big_message(kafka_cluster): DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value String) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'flush', - kafka_group_name = 'flush', - kafka_format = 'JSONEachRow', - kafka_max_block_size = 10; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'flush', + kafka_group_name = 'flush', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 10; CREATE TABLE test.view (key UInt64, value String) ENGINE = MergeTree ORDER BY key; @@ -356,7 +348,7 @@ def test_kafka_flush_on_big_message(kafka_cluster): except kafka.errors.GroupCoordinatorNotAvailableError: continue - for _ in range(20): + while True: time.sleep(1) result = instance.query('SELECT count() FROM test.view') if int(result) == kafka_messages*batch_messages: @@ -369,30 +361,29 @@ def test_kafka_virtual_columns(kafka_cluster): instance.query(''' CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'json3', - kafka_group_name = 'json3', - kafka_format = 'JSONEachRow'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'virt1', + kafka_group_name = 'virt1', + kafka_format = 'JSONEachRow'; ''') messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce('json3', [messages]) + kafka_produce('virt1', [messages]) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce('json3', [messages]) + kafka_produce('virt1', [messages]) result = '' - for i in range(50): + while True: time.sleep(1) result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka') - if kafka_check_result(result, False, 'test_kafka_virtual.reference'): + if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): break - kafka_check_result(result, True, 'test_kafka_virtual.reference') + kafka_check_result(result, True, 'test_kafka_virtual1.reference') def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): @@ -401,12 +392,11 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): DROP TABLE IF EXISTS test.consumer; CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka - SETTINGS - kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'json3', - kafka_group_name = 'json3', - kafka_format = 'JSONEachRow', - kafka_row_delimiter = '\\n'; + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'virt2', + kafka_group_name = 'virt2', + kafka_format = 'JSONEachRow', + kafka_row_delimiter = '\\n'; CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64) ENGINE = MergeTree() ORDER BY key; @@ -417,14 +407,14 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) - kafka_produce('json3', messages) + kafka_produce('virt2', messages) - for i in range(20): + while True: time.sleep(1) result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view') - if kafka_check_result(result, False, 'test_kafka_virtual.reference'): + if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): break - kafka_check_result(result, True, 'test_kafka_virtual.reference') + kafka_check_result(result, True, 'test_kafka_virtual2.reference') instance.query(''' DROP TABLE test.consumer; diff --git a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual.reference b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual.reference deleted file mode 100644 index 6ee6017efd6..00000000000 --- a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual.reference +++ /dev/null @@ -1,50 +0,0 @@ - 0 json3 0 0 - 1 json3 1 0 - 2 json3 2 0 - 3 json3 3 0 - 4 json3 4 0 - 5 json3 5 0 - 6 json3 6 0 - 7 json3 7 0 - 8 json3 8 0 - 9 json3 9 0 - 10 json3 10 0 - 11 json3 11 0 - 12 json3 12 0 - 13 json3 13 0 - 14 json3 14 0 - 15 json3 15 0 - 16 json3 16 0 - 17 json3 17 0 - 18 json3 18 0 - 19 json3 19 0 - 20 json3 20 0 - 21 json3 21 0 - 22 json3 22 0 - 23 json3 23 0 - 24 json3 24 0 - 25 json3 25 1 - 26 json3 26 1 - 27 json3 27 1 - 28 json3 28 1 - 29 json3 29 1 - 30 json3 30 1 - 31 json3 31 1 - 32 json3 32 1 - 33 json3 33 1 - 34 json3 34 1 - 35 json3 35 1 - 36 json3 36 1 - 37 json3 37 1 - 38 json3 38 1 - 39 json3 39 1 - 40 json3 40 1 - 41 json3 41 1 - 42 json3 42 1 - 43 json3 43 1 - 44 json3 44 1 - 45 json3 45 1 - 46 json3 46 1 - 47 json3 47 1 - 48 json3 48 1 - 49 json3 49 1 diff --git a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference new file mode 100644 index 00000000000..5956210d25e --- /dev/null +++ b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference @@ -0,0 +1,50 @@ + 0 virt1 0 0 + 1 virt1 1 0 + 2 virt1 2 0 + 3 virt1 3 0 + 4 virt1 4 0 + 5 virt1 5 0 + 6 virt1 6 0 + 7 virt1 7 0 + 8 virt1 8 0 + 9 virt1 9 0 + 10 virt1 10 0 + 11 virt1 11 0 + 12 virt1 12 0 + 13 virt1 13 0 + 14 virt1 14 0 + 15 virt1 15 0 + 16 virt1 16 0 + 17 virt1 17 0 + 18 virt1 18 0 + 19 virt1 19 0 + 20 virt1 20 0 + 21 virt1 21 0 + 22 virt1 22 0 + 23 virt1 23 0 + 24 virt1 24 0 + 25 virt1 25 1 + 26 virt1 26 1 + 27 virt1 27 1 + 28 virt1 28 1 + 29 virt1 29 1 + 30 virt1 30 1 + 31 virt1 31 1 + 32 virt1 32 1 + 33 virt1 33 1 + 34 virt1 34 1 + 35 virt1 35 1 + 36 virt1 36 1 + 37 virt1 37 1 + 38 virt1 38 1 + 39 virt1 39 1 + 40 virt1 40 1 + 41 virt1 41 1 + 42 virt1 42 1 + 43 virt1 43 1 + 44 virt1 44 1 + 45 virt1 45 1 + 46 virt1 46 1 + 47 virt1 47 1 + 48 virt1 48 1 + 49 virt1 49 1 diff --git a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference new file mode 100644 index 00000000000..c20dc3513a0 --- /dev/null +++ b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference @@ -0,0 +1,50 @@ + 0 virt2 0 0 + 1 virt2 1 0 + 2 virt2 2 0 + 3 virt2 3 0 + 4 virt2 4 0 + 5 virt2 5 0 + 6 virt2 6 0 + 7 virt2 7 0 + 8 virt2 8 0 + 9 virt2 9 0 + 10 virt2 10 0 + 11 virt2 11 0 + 12 virt2 12 0 + 13 virt2 13 0 + 14 virt2 14 0 + 15 virt2 15 0 + 16 virt2 16 0 + 17 virt2 17 0 + 18 virt2 18 0 + 19 virt2 19 0 + 20 virt2 20 0 + 21 virt2 21 0 + 22 virt2 22 0 + 23 virt2 23 0 + 24 virt2 24 0 + 25 virt2 25 1 + 26 virt2 26 1 + 27 virt2 27 1 + 28 virt2 28 1 + 29 virt2 29 1 + 30 virt2 30 1 + 31 virt2 31 1 + 32 virt2 32 1 + 33 virt2 33 1 + 34 virt2 34 1 + 35 virt2 35 1 + 36 virt2 36 1 + 37 virt2 37 1 + 38 virt2 38 1 + 39 virt2 39 1 + 40 virt2 40 1 + 41 virt2 41 1 + 42 virt2 42 1 + 43 virt2 43 1 + 44 virt2 44 1 + 45 virt2 45 1 + 46 virt2 46 1 + 47 virt2 47 1 + 48 virt2 48 1 + 49 virt2 49 1