#pragma once #include "config_formats.h" #include "config_core.h" #if USE_AVRO #include #include #include #include #include #include #include #include #include namespace DB { class AvroDeserializer { public: AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema); void deserializeRow(MutableColumns & columns, avro::Decoder & decoder); private: using DeserializeFn = std::function; using SkipFn = std::function; static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type); static SkipFn createSkipFn(avro::NodePtr root_node); std::vector field_mapping; std::vector skip_fns; std::vector deserialize_fns; }; class AvroRowInputFormat : public IRowInputFormat { public: AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "AvroRowInputFormat"; } private: avro::DataFileReaderBase file_reader; AvroDeserializer deserializer; }; #if USE_POCO_JSON class AvroConfluentRowInputFormat : public IRowInputFormat { public: AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_); virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "AvroConfluentRowInputFormat"; } private: const ColumnsWithTypeAndName header_columns; class SchemaRegistry; std::unique_ptr schema_registry; using SchemaId = uint32_t; std::unordered_map deserializer_cache; AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); avro::InputStreamPtr input_stream; avro::DecoderPtr decoder; }; #endif } #endif