Merge pull request #16762 from filimonov/kafka-protobuf-issue12615

Test for the issue #12615
This commit is contained in:
alexey-milovidov 2020-11-07 13:40:19 +03:00 committed by GitHub
commit 712a35bb7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 176 additions and 1 deletions

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message User {
string username = 1;
int32 timestamp = 2;
}

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: social.proto
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='social.proto',
package='',
syntax='proto3',
serialized_options=None,
serialized_pb=b'\n\x0csocial.proto\"+\n\x04User\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x05\x62\x06proto3'
)
_USER = _descriptor.Descriptor(
name='User',
full_name='User',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='username', full_name='User.username', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timestamp', full_name='User.timestamp', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=16,
serialized_end=59,
)
DESCRIPTOR.message_types_by_name['User'] = _USER
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
User = _reflection.GeneratedProtocolMessageType('User', (_message.Message,), {
'DESCRIPTOR' : _USER,
'__module__' : 'social_pb2'
# @@protoc_insertion_point(class_scope:User)
})
_sym_db.RegisterMessage(User)
# @@protoc_insertion_point(module_scope)

View File

@ -30,6 +30,8 @@ libprotoc 3.0.0
protoc --python_out=. kafka.proto protoc --python_out=. kafka.proto
""" """
from . import kafka_pb2 from . import kafka_pb2
from . import social_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for SELECT LIMIT is working. # TODO: add test for SELECT LIMIT is working.
@ -115,6 +117,20 @@ def kafka_produce_protobuf_messages_no_delimeters(topic, start_index, num_messag
producer.flush() producer.flush()
print("Produced {} messages for topic {}".format(num_messages, topic)) print("Produced {} messages for topic {}".format(num_messages, topic))
def kafka_produce_protobuf_social(topic, start_index, num_messages):
data = b''
for i in range(start_index, start_index + num_messages):
msg = social_pb2.User()
msg.username='John Doe {}'.format(i)
msg.timestamp=1000000+i
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=producer_serializer)
producer.send(topic=topic, value=data)
producer.flush()
print(("Produced {} messages for topic {}".format(num_messages, topic)))
def avro_confluent_message(schema_registry_client, value): def avro_confluent_message(schema_registry_client, value):
# type: (CachedSchemaRegistryClient, dict) -> str # type: (CachedSchemaRegistryClient, dict) -> str
@ -982,6 +998,84 @@ def test_kafka_protobuf(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_string_field_on_first_position_in_protobuf(kafka_cluster):
# https://github.com/ClickHouse/ClickHouse/issues/12615
instance.query('''
CREATE TABLE test.kafka (
username String,
timestamp Int32
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'string_field_on_first_position_in_protobuf',
kafka_group_name = 'string_field_on_first_position_in_protobuf',
kafka_format = 'Protobuf',
kafka_schema = 'social:User';
SELECT * FROM test.kafka;
''')
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 0, 20)
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 20, 1)
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 21, 29)
result = instance.query('SELECT * FROM test.kafka', ignore_error=True)
expected = '''\
John Doe 0 1000000
John Doe 1 1000001
John Doe 2 1000002
John Doe 3 1000003
John Doe 4 1000004
John Doe 5 1000005
John Doe 6 1000006
John Doe 7 1000007
John Doe 8 1000008
John Doe 9 1000009
John Doe 10 1000010
John Doe 11 1000011
John Doe 12 1000012
John Doe 13 1000013
John Doe 14 1000014
John Doe 15 1000015
John Doe 16 1000016
John Doe 17 1000017
John Doe 18 1000018
John Doe 19 1000019
John Doe 20 1000020
John Doe 21 1000021
John Doe 22 1000022
John Doe 23 1000023
John Doe 24 1000024
John Doe 25 1000025
John Doe 26 1000026
John Doe 27 1000027
John Doe 28 1000028
John Doe 29 1000029
John Doe 30 1000030
John Doe 31 1000031
John Doe 32 1000032
John Doe 33 1000033
John Doe 34 1000034
John Doe 35 1000035
John Doe 36 1000036
John Doe 37 1000037
John Doe 38 1000038
John Doe 39 1000039
John Doe 40 1000040
John Doe 41 1000041
John Doe 42 1000042
John Doe 43 1000043
John Doe 44 1000044
John Doe 45 1000045
John Doe 46 1000046
John Doe 47 1000047
John Doe 48 1000048
John Doe 49 1000049
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(30) @pytest.mark.timeout(30)
def test_kafka_protobuf_no_delimiter(kafka_cluster): def test_kafka_protobuf_no_delimiter(kafka_cluster):
instance.query(''' instance.query('''
@ -2117,7 +2211,7 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_max_block_size = 20, kafka_max_block_size = 20,
kafka_flush_interval_ms = 1000; kafka_flush_interval_ms = 1000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */ SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
''') ''')