AvroConfluent: skip broken messages

This commit is contained in:
Andrew Onyshchuk 2020-08-02 17:55:57 -05:00
parent e472c83601
commit 2883831564
2 changed files with 9 additions and 0 deletions

View File

@ -739,6 +739,12 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten
return true; return true;
} }
void AvroConfluentRowInputFormat::syncAfterError()
{
// skip until the end of current kafka message
in.tryIgnore(in.available());
}
const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id) const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id)
{ {
auto it = deserializer_cache.find(schema_id); auto it = deserializer_cache.find(schema_id);

View File

@ -129,6 +129,9 @@ public:
String getName() const override { return "AvroConfluentRowInputFormat"; } String getName() const override { return "AvroConfluentRowInputFormat"; }
class SchemaRegistry; class SchemaRegistry;
protected:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
private: private:
std::shared_ptr<SchemaRegistry> schema_registry; std::shared_ptr<SchemaRegistry> schema_registry;
using SchemaId = uint32_t; using SchemaId = uint32_t;