diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 2a73375c5ea..5445ab1fed4 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -5,8 +5,10 @@ import socket import subprocess import threading import time +import io import avro.schema +import avro.io from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient from confluent_kafka.avro.serializer.message_serializer import MessageSerializer from confluent_kafka import admin @@ -140,6 +142,28 @@ def kafka_produce_protobuf_social(topic, start_index, num_messages): producer.flush() print(("Produced {} messages for topic {}".format(num_messages, topic))) +def avro_message(value): + # type: (CachedSchemaRegistryClient, dict) -> str + + + schema = avro.schema.make_avsc_object({ + 'name': 'row', + 'type': 'record', + 'fields': [ + {'name': 'id', 'type': 'long'}, + {'name': 'blockNo', 'type': 'int'}, + {'name': 'val1', 'type': 'string'}, + {'name': 'val2', 'type': 'float'}, + {'name': 'val3', 'type': 'int'} + ] + }) + writer = avro.io.DatumWriter(schema) + bytes_writer = io.BytesIO() + encoder = avro.io.BinaryEncoder(bytes_writer) + writer.write(value, encoder) + raw_bytes = bytes_writer.getvalue() + + return raw_bytes def avro_confluent_message(schema_registry_client, value): # type: (CachedSchemaRegistryClient, dict) -> str @@ -596,6 +620,17 @@ def test_kafka_formats(kafka_cluster): cluster.schema_registry_port ), 'supports_empty_value': True, + }, + 'Avro': { + 'data_sample': [ + avro_message({'id': 0, 'blockNo': 0, 'val1': str('AM'), 'val2': 0.5, "val3": 1}), + + b''.join([avro_message({'id': id, 'blockNo': 0, 'val1': str('AM'), + 'val2': 0.5, "val3": 1}) for id in range(1, 16)]), + + avro_message({'id': 0, 'blockNo': 0, 'val1': str('AM'), 'val2': 0.5, "val3": 1}), + ], + 'supports_empty_value': True, } # 'Arrow' : { # # Not working at all: DB::Exception: Error while opening a table: Invalid: File is too small: 0, Stack trace (when copying this message, always include the lines below):