diff --git a/tests/integration/test_storage_kafka/clickhouse_path/format_schemas/message_with_repeated.proto b/tests/integration/test_storage_kafka/clickhouse_path/format_schemas/message_with_repeated.proto new file mode 100644 index 00000000000..791a5086866 --- /dev/null +++ b/tests/integration/test_storage_kafka/clickhouse_path/format_schemas/message_with_repeated.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; +option optimize_for = SPEED; +message Message { + uint32 tnow = 1; + string server = 2; + string clien = 3; + uint32 sPort = 4; + uint32 cPort = 5; + repeated dd r = 6; + string method = 7; +} + +message dd { + string name = 1; + uint32 class = 2; + uint32 type = 3; + uint64 ttl = 4; + bytes data = 5; +} \ No newline at end of file diff --git a/tests/integration/test_storage_kafka/message_with_repeated_pb2.py b/tests/integration/test_storage_kafka/message_with_repeated_pb2.py new file mode 100644 index 00000000000..69702307e7f --- /dev/null +++ b/tests/integration/test_storage_kafka/message_with_repeated_pb2.py @@ -0,0 +1,180 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: clickhouse_path/format_schemas/message_with_repeated.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='clickhouse_path/format_schemas/message_with_repeated.proto', + package='', + syntax='proto3', + serialized_options=_b('H\001'), + serialized_pb=_b('\n:clickhouse_path/format_schemas/message_with_repeated.proto\"t\n\x07Message\x12\x0c\n\x04tnow\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\t\x12\r\n\x05\x63lien\x18\x03 \x01(\t\x12\r\n\x05sPort\x18\x04 \x01(\r\x12\r\n\x05\x63Port\x18\x05 \x01(\r\x12\x0e\n\x01r\x18\x06 \x03(\x0b\x32\x03.dd\x12\x0e\n\x06method\x18\x07 \x01(\t\"J\n\x02\x64\x64\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63lass\x18\x02 \x01(\r\x12\x0c\n\x04type\x18\x03 \x01(\r\x12\x0b\n\x03ttl\x18\x04 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x42\x02H\x01\x62\x06proto3') +) + + + + +_MESSAGE = _descriptor.Descriptor( + name='Message', + full_name='Message', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='tnow', full_name='Message.tnow', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='server', full_name='Message.server', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='clien', full_name='Message.clien', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='sPort', full_name='Message.sPort', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cPort', full_name='Message.cPort', index=4, + number=5, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='r', full_name='Message.r', index=5, + number=6, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='method', full_name='Message.method', index=6, + number=7, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=62, + serialized_end=178, +) + + +_DD = _descriptor.Descriptor( + name='dd', + full_name='dd', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='dd.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='class', full_name='dd.class', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='type', full_name='dd.type', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='ttl', full_name='dd.ttl', index=3, + number=4, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='data', full_name='dd.data', index=4, + number=5, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=180, + serialized_end=254, +) + +_MESSAGE.fields_by_name['r'].message_type = _DD +DESCRIPTOR.message_types_by_name['Message'] = _MESSAGE +DESCRIPTOR.message_types_by_name['dd'] = _DD +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( + DESCRIPTOR = _MESSAGE, + __module__ = 'clickhouse_path.format_schemas.message_with_repeated_pb2' + # @@protoc_insertion_point(class_scope:Message) + )) +_sym_db.RegisterMessage(Message) + +dd = _reflection.GeneratedProtocolMessageType('dd', (_message.Message,), dict( + DESCRIPTOR = _DD, + __module__ = 'clickhouse_path.format_schemas.message_with_repeated_pb2' + # @@protoc_insertion_point(class_scope:dd) + )) +_sym_db.RegisterMessage(dd) + + +DESCRIPTOR._options = None +# @@protoc_insertion_point(module_scope) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 8883684730f..cc4cff94f1e 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -35,6 +35,7 @@ from kafka.admin import NewTopic from . import kafka_pb2 from . import social_pb2 +from . import message_with_repeated_pb2 # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. @@ -3077,6 +3078,123 @@ def test_kafka_consumer_failover(kafka_cluster): kafka_delete_topic(admin_client, topic_name) +# https://github.com/ClickHouse/ClickHouse/issues/26643 +def test_issue26643(kafka_cluster): + + # for backporting: + # admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092") + admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) + producer = KafkaProducer(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port), value_serializer=producer_serializer) + + topic_list = [] + topic_list.append(NewTopic(name="test_issue26643", num_partitions=4, replication_factor=1)) + admin_client.create_topics(new_topics=topic_list, validate_only=False) + + msg = message_with_repeated_pb2.Message( + tnow=1629000000, + server='server1', + clien='host1', + sPort=443, + cPort=50000, + r=[ + message_with_repeated_pb2.dd(name='1', type=444, ttl=123123, data=b'adsfasd'), + message_with_repeated_pb2.dd(name='2') + ], + method='GET' + ) + + data = b'' + serialized_msg = msg.SerializeToString() + data = data + _VarintBytes(len(serialized_msg)) + serialized_msg + + msg = message_with_repeated_pb2.Message( + tnow=1629000002 + ) + + serialized_msg = msg.SerializeToString() + data = data + _VarintBytes(len(serialized_msg)) + serialized_msg + + producer.send(topic="test_issue26643", value=data) + + data = _VarintBytes(len(serialized_msg)) + serialized_msg + producer.send(topic="test_issue26643", value=data) + producer.flush() + + instance.query(''' + CREATE TABLE IF NOT EXISTS test.test_queue + ( + `tnow` UInt32, + `server` String, + `client` String, + `sPort` UInt16, + `cPort` UInt16, + `r.name` Array(String), + `r.class` Array(UInt16), + `r.type` Array(UInt16), + `r.ttl` Array(UInt32), + `r.data` Array(String), + `method` String + ) + ENGINE = Kafka + SETTINGS + kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'test_issue26643', + kafka_group_name = 'test_issue26643_group', + kafka_format = 'Protobuf', + kafka_schema = 'message_with_repeated.proto:Message', + kafka_num_consumers = 4, + kafka_skip_broken_messages = 10000; + + SET allow_suspicious_low_cardinality_types=1; + + CREATE TABLE test.log + ( + `tnow` DateTime CODEC(DoubleDelta, LZ4), + `server` LowCardinality(String), + `client` LowCardinality(String), + `sPort` LowCardinality(UInt16), + `cPort` UInt16 CODEC(T64, LZ4), + `r.name` Array(String), + `r.class` Array(LowCardinality(UInt16)), + `r.type` Array(LowCardinality(UInt16)), + `r.ttl` Array(LowCardinality(UInt32)), + `r.data` Array(String), + `method` LowCardinality(String) + ) + ENGINE = MergeTree + PARTITION BY toYYYYMMDD(tnow) + ORDER BY (tnow, server) + TTL toDate(tnow) + toIntervalMonth(1000) + SETTINGS index_granularity = 16384, merge_with_ttl_timeout = 7200; + + CREATE MATERIALIZED VIEW test.test_consumer TO test.log AS + SELECT + toDateTime(a.tnow) AS tnow, + a.server AS server, + a.client AS client, + a.sPort AS sPort, + a.cPort AS cPort, + a.`r.name` AS `r.name`, + a.`r.class` AS `r.class`, + a.`r.type` AS `r.type`, + a.`r.ttl` AS `r.ttl`, + a.`r.data` AS `r.data`, + a.method AS method + FROM test.test_queue AS a; + ''') + + instance.wait_for_log_line("Committed offset") + result = instance.query('SELECT * FROM test.log') + + expected = '''\ +2021-08-15 07:00:00 server1 443 50000 ['1','2'] [0,0] [444,0] [123123,0] ['adsfasd',''] GET +2021-08-15 07:00:02 0 0 [] [] [] [] [] +2021-08-15 07:00:02 0 0 [] [] [] [] [] +''' + assert TSV(result) == TSV(expected) + + # kafka_cluster.open_bash_shell('instance') + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")