From b072bc7d2bfe390c835ff69184e00bf041146470 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 6 Nov 2020 17:54:44 +0100 Subject: [PATCH] Test for the issue #12615 --- .../format_schemas/social.proto | 6 ++ .../test_storage_kafka/social_pb2.py | 75 +++++++++++++++ tests/integration/test_storage_kafka/test.py | 96 ++++++++++++++++++- 3 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_storage_kafka/clickhouse_path/format_schemas/social.proto create mode 100644 tests/integration/test_storage_kafka/social_pb2.py diff --git a/tests/integration/test_storage_kafka/clickhouse_path/format_schemas/social.proto b/tests/integration/test_storage_kafka/clickhouse_path/format_schemas/social.proto new file mode 100644 index 00000000000..3bf82737fa5 --- /dev/null +++ b/tests/integration/test_storage_kafka/clickhouse_path/format_schemas/social.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message User { + string username = 1; + int32 timestamp = 2; +} \ No newline at end of file diff --git a/tests/integration/test_storage_kafka/social_pb2.py b/tests/integration/test_storage_kafka/social_pb2.py new file mode 100644 index 00000000000..eeba5efc8b1 --- /dev/null +++ b/tests/integration/test_storage_kafka/social_pb2.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: social.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='social.proto', + package='', + syntax='proto3', + serialized_options=None, + serialized_pb=b'\n\x0csocial.proto\"+\n\x04User\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x05\x62\x06proto3' +) + + + + +_USER = _descriptor.Descriptor( + name='User', + full_name='User', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='username', full_name='User.username', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='timestamp', full_name='User.timestamp', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=16, + serialized_end=59, +) + +DESCRIPTOR.message_types_by_name['User'] = _USER +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +User = _reflection.GeneratedProtocolMessageType('User', (_message.Message,), { + 'DESCRIPTOR' : _USER, + '__module__' : 'social_pb2' + # @@protoc_insertion_point(class_scope:User) + }) +_sym_db.RegisterMessage(User) + + +# @@protoc_insertion_point(module_scope) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 6ef37c1e231..5d943361414 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -30,6 +30,8 @@ libprotoc 3.0.0 protoc --python_out=. kafka.proto """ from . import kafka_pb2 +from . import social_pb2 + # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. # TODO: add test for SELECT LIMIT is working. @@ -115,6 +117,20 @@ def kafka_produce_protobuf_messages_no_delimeters(topic, start_index, num_messag producer.flush() print("Produced {} messages for topic {}".format(num_messages, topic)) +def kafka_produce_protobuf_social(topic, start_index, num_messages): + data = b'' + for i in range(start_index, start_index + num_messages): + msg = social_pb2.User() + msg.username='John Doe {}'.format(i) + msg.timestamp=1000000+i + serialized_msg = msg.SerializeToString() + data = data + _VarintBytes(len(serialized_msg)) + serialized_msg + producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=producer_serializer) + producer.send(topic=topic, value=data) + producer.flush() + print(("Produced {} messages for topic {}".format(num_messages, topic))) + + def avro_confluent_message(schema_registry_client, value): # type: (CachedSchemaRegistryClient, dict) -> str @@ -982,6 +998,84 @@ def test_kafka_protobuf(kafka_cluster): kafka_check_result(result, True) +@pytest.mark.timeout(180) +def test_kafka_string_field_on_first_position_in_protobuf(kafka_cluster): +# https://github.com/ClickHouse/ClickHouse/issues/12615 + + instance.query(''' +CREATE TABLE test.kafka ( + username String, + timestamp Int32 + ) ENGINE = Kafka() +SETTINGS + kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'string_field_on_first_position_in_protobuf', + kafka_group_name = 'string_field_on_first_position_in_protobuf', + kafka_format = 'Protobuf', + kafka_schema = 'social:User'; + + SELECT * FROM test.kafka; + ''') + + kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 0, 20) + kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 20, 1) + kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 21, 29) + + result = instance.query('SELECT * FROM test.kafka', ignore_error=True) + expected = '''\ +John Doe 0 1000000 +John Doe 1 1000001 +John Doe 2 1000002 +John Doe 3 1000003 +John Doe 4 1000004 +John Doe 5 1000005 +John Doe 6 1000006 +John Doe 7 1000007 +John Doe 8 1000008 +John Doe 9 1000009 +John Doe 10 1000010 +John Doe 11 1000011 +John Doe 12 1000012 +John Doe 13 1000013 +John Doe 14 1000014 +John Doe 15 1000015 +John Doe 16 1000016 +John Doe 17 1000017 +John Doe 18 1000018 +John Doe 19 1000019 +John Doe 20 1000020 +John Doe 21 1000021 +John Doe 22 1000022 +John Doe 23 1000023 +John Doe 24 1000024 +John Doe 25 1000025 +John Doe 26 1000026 +John Doe 27 1000027 +John Doe 28 1000028 +John Doe 29 1000029 +John Doe 30 1000030 +John Doe 31 1000031 +John Doe 32 1000032 +John Doe 33 1000033 +John Doe 34 1000034 +John Doe 35 1000035 +John Doe 36 1000036 +John Doe 37 1000037 +John Doe 38 1000038 +John Doe 39 1000039 +John Doe 40 1000040 +John Doe 41 1000041 +John Doe 42 1000042 +John Doe 43 1000043 +John Doe 44 1000044 +John Doe 45 1000045 +John Doe 46 1000046 +John Doe 47 1000047 +John Doe 48 1000048 +John Doe 49 1000049 +''' + assert TSV(result) == TSV(expected) + @pytest.mark.timeout(30) def test_kafka_protobuf_no_delimiter(kafka_cluster): instance.query(''' @@ -2117,7 +2211,7 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster): kafka_format = 'JSONEachRow', kafka_max_block_size = 20, kafka_flush_interval_ms = 1000; - + SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */ ''')