AvroConfluent + Kafla: Skip malformed messages that do not contain at least the AvroConfluent magic number and the schema id definition.

This commit is contained in:
Gervasio Varela 2020-08-21 11:11:41 +02:00
parent 8f1ba521a0
commit a6a18b62f9

View File

@ -697,8 +697,15 @@ static uint32_t readConfluentSchemaId(ReadBuffer & in)
uint8_t magic;
uint32_t schema_id;
readBinaryBigEndian(magic, in);
readBinaryBigEndian(schema_id, in);
try
{
readBinaryBigEndian(magic, in);
readBinaryBigEndian(schema_id, in);
}
catch(Exception & e) {
/* empty or incomplete message without Avro Confluent magic number or schema id */
throw Exception("Missing AvroConfluent magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA);
}
if (magic != 0x00)
{