Test for issue #26643

This commit is contained in:
Mikhail Filimonov 2021-08-18 16:55:15 +02:00 committed by mergify-bot
parent dd2c4c2c3b
commit ea320c96d5
3 changed files with 317 additions and 0 deletions

View File

@ -0,0 +1,19 @@
syntax = "proto3";
option optimize_for = SPEED;
message Message {
uint32 tnow = 1;
string server = 2;
string clien = 3;
uint32 sPort = 4;
uint32 cPort = 5;
repeated dd r = 6;
string method = 7;
}
message dd {
string name = 1;
uint32 class = 2;
uint32 type = 3;
uint64 ttl = 4;
bytes data = 5;
}

View File

@ -0,0 +1,180 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/message_with_repeated.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='clickhouse_path/format_schemas/message_with_repeated.proto',
package='',
syntax='proto3',
serialized_options=_b('H\001'),
serialized_pb=_b('\n:clickhouse_path/format_schemas/message_with_repeated.proto\"t\n\x07Message\x12\x0c\n\x04tnow\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\t\x12\r\n\x05\x63lien\x18\x03 \x01(\t\x12\r\n\x05sPort\x18\x04 \x01(\r\x12\r\n\x05\x63Port\x18\x05 \x01(\r\x12\x0e\n\x01r\x18\x06 \x03(\x0b\x32\x03.dd\x12\x0e\n\x06method\x18\x07 \x01(\t\"J\n\x02\x64\x64\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63lass\x18\x02 \x01(\r\x12\x0c\n\x04type\x18\x03 \x01(\r\x12\x0b\n\x03ttl\x18\x04 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x42\x02H\x01\x62\x06proto3')
)
_MESSAGE = _descriptor.Descriptor(
name='Message',
full_name='Message',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='tnow', full_name='Message.tnow', index=0,
number=1, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='server', full_name='Message.server', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='clien', full_name='Message.clien', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='sPort', full_name='Message.sPort', index=3,
number=4, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cPort', full_name='Message.cPort', index=4,
number=5, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='r', full_name='Message.r', index=5,
number=6, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='method', full_name='Message.method', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=62,
serialized_end=178,
)
_DD = _descriptor.Descriptor(
name='dd',
full_name='dd',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='dd.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='class', full_name='dd.class', index=1,
number=2, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='type', full_name='dd.type', index=2,
number=3, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='ttl', full_name='dd.ttl', index=3,
number=4, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='data', full_name='dd.data', index=4,
number=5, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=180,
serialized_end=254,
)
_MESSAGE.fields_by_name['r'].message_type = _DD
DESCRIPTOR.message_types_by_name['Message'] = _MESSAGE
DESCRIPTOR.message_types_by_name['dd'] = _DD
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict(
DESCRIPTOR = _MESSAGE,
__module__ = 'clickhouse_path.format_schemas.message_with_repeated_pb2'
# @@protoc_insertion_point(class_scope:Message)
))
_sym_db.RegisterMessage(Message)
dd = _reflection.GeneratedProtocolMessageType('dd', (_message.Message,), dict(
DESCRIPTOR = _DD,
__module__ = 'clickhouse_path.format_schemas.message_with_repeated_pb2'
# @@protoc_insertion_point(class_scope:dd)
))
_sym_db.RegisterMessage(dd)
DESCRIPTOR._options = None
# @@protoc_insertion_point(module_scope)

View File

@ -35,6 +35,7 @@ from kafka.admin import NewTopic
from . import kafka_pb2 from . import kafka_pb2
from . import social_pb2 from . import social_pb2
from . import message_with_repeated_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side. # TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
@ -3077,6 +3078,123 @@ def test_kafka_consumer_failover(kafka_cluster):
kafka_delete_topic(admin_client, topic_name) kafka_delete_topic(admin_client, topic_name)
# https://github.com/ClickHouse/ClickHouse/issues/26643
def test_issue26643(kafka_cluster):
# for backporting:
# admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
producer = KafkaProducer(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port), value_serializer=producer_serializer)
topic_list = []
topic_list.append(NewTopic(name="test_issue26643", num_partitions=4, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
msg = message_with_repeated_pb2.Message(
tnow=1629000000,
server='server1',
clien='host1',
sPort=443,
cPort=50000,
r=[
message_with_repeated_pb2.dd(name='1', type=444, ttl=123123, data=b'adsfasd'),
message_with_repeated_pb2.dd(name='2')
],
method='GET'
)
data = b''
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
msg = message_with_repeated_pb2.Message(
tnow=1629000002
)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer.send(topic="test_issue26643", value=data)
data = _VarintBytes(len(serialized_msg)) + serialized_msg
producer.send(topic="test_issue26643", value=data)
producer.flush()
instance.query('''
CREATE TABLE IF NOT EXISTS test.test_queue
(
`tnow` UInt32,
`server` String,
`client` String,
`sPort` UInt16,
`cPort` UInt16,
`r.name` Array(String),
`r.class` Array(UInt16),
`r.type` Array(UInt16),
`r.ttl` Array(UInt32),
`r.data` Array(String),
`method` String
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'test_issue26643',
kafka_group_name = 'test_issue26643_group',
kafka_format = 'Protobuf',
kafka_schema = 'message_with_repeated.proto:Message',
kafka_num_consumers = 4,
kafka_skip_broken_messages = 10000;
SET allow_suspicious_low_cardinality_types=1;
CREATE TABLE test.log
(
`tnow` DateTime CODEC(DoubleDelta, LZ4),
`server` LowCardinality(String),
`client` LowCardinality(String),
`sPort` LowCardinality(UInt16),
`cPort` UInt16 CODEC(T64, LZ4),
`r.name` Array(String),
`r.class` Array(LowCardinality(UInt16)),
`r.type` Array(LowCardinality(UInt16)),
`r.ttl` Array(LowCardinality(UInt32)),
`r.data` Array(String),
`method` LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(tnow)
ORDER BY (tnow, server)
TTL toDate(tnow) + toIntervalMonth(1000)
SETTINGS index_granularity = 16384, merge_with_ttl_timeout = 7200;
CREATE MATERIALIZED VIEW test.test_consumer TO test.log AS
SELECT
toDateTime(a.tnow) AS tnow,
a.server AS server,
a.client AS client,
a.sPort AS sPort,
a.cPort AS cPort,
a.`r.name` AS `r.name`,
a.`r.class` AS `r.class`,
a.`r.type` AS `r.type`,
a.`r.ttl` AS `r.ttl`,
a.`r.data` AS `r.data`,
a.method AS method
FROM test.test_queue AS a;
''')
instance.wait_for_log_line("Committed offset")
result = instance.query('SELECT * FROM test.log')
expected = '''\
2021-08-15 07:00:00 server1 443 50000 ['1','2'] [0,0] [444,0] [123123,0] ['adsfasd',''] GET
2021-08-15 07:00:02 0 0 [] [] [] [] []
2021-08-15 07:00:02 0 0 [] [] [] [] []
'''
assert TSV(result) == TSV(expected)
# kafka_cluster.open_bash_shell('instance')
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")