From 99c183da0f3454c4a0104086015e35f5be95f0d3 Mon Sep 17 00:00:00 2001 From: Andrew Onyshchuk Date: Thu, 30 Jul 2020 23:42:24 -0500 Subject: [PATCH] AvroConfluent: skip tombstone records --- src/Processors/Formats/Impl/AvroRowInputFormat.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f1ca709e163..d985ee1cb54 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -727,6 +727,11 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten { return false; } + // skip tombstone records (kafka messages with null value) + if (in.available() == 0) + { + return false; + } SchemaId schema_id = readConfluentSchemaId(in); const auto & deserializer = getOrCreateDeserializer(schema_id); deserializer.deserializeRow(columns, *decoder, ext);