diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f1ca709e163..d985ee1cb54 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -727,6 +727,11 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten { return false; } + // skip tombstone records (kafka messages with null value) + if (in.available() == 0) + { + return false; + } SchemaId schema_id = readConfluentSchemaId(in); const auto & deserializer = getOrCreateDeserializer(schema_id); deserializer.deserializeRow(columns, *decoder, ext);