AvroConfluent: skip tombstone records

This commit is contained in:
Andrew Onyshchuk 2020-07-30 23:42:24 -05:00
parent d4266d9619
commit 99c183da0f

View File

@ -727,6 +727,11 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten
{
return false;
}
// skip tombstone records (kafka messages with null value)
if (in.available() == 0)
{
return false;
}
SchemaId schema_id = readConfluentSchemaId(in);
const auto & deserializer = getOrCreateDeserializer(schema_id);
deserializer.deserializeRow(columns, *decoder, ext);