2020-01-08 09:13:12 +00:00
|
|
|
#pragma once
|
|
|
|
#include "config_formats.h"
|
2020-01-18 19:29:53 +00:00
|
|
|
#include "config_core.h"
|
2020-01-08 09:13:12 +00:00
|
|
|
#if USE_AVRO
|
|
|
|
|
|
|
|
#include <unordered_map>
|
2020-02-01 04:13:12 +00:00
|
|
|
#include <map>
|
2020-01-08 09:13:12 +00:00
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
#include <Core/Block.h>
|
|
|
|
#include <Formats/FormatSchemaInfo.h>
|
|
|
|
#include <Processors/Formats/IRowInputFormat.h>
|
|
|
|
|
|
|
|
#include <avro/DataFile.hh>
|
|
|
|
#include <avro/Decoder.hh>
|
|
|
|
#include <avro/Schema.hh>
|
|
|
|
#include <avro/ValidSchema.hh>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
class AvroDeserializer
|
|
|
|
{
|
|
|
|
public:
|
2020-01-18 20:15:49 +00:00
|
|
|
AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema);
|
2020-02-02 00:53:11 +00:00
|
|
|
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const;
|
2020-01-08 09:13:12 +00:00
|
|
|
|
|
|
|
private:
|
|
|
|
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
|
|
|
|
using SkipFn = std::function<void(avro::Decoder & decoder)>;
|
|
|
|
static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
|
2020-02-01 04:13:12 +00:00
|
|
|
SkipFn createSkipFn(avro::NodePtr root_node);
|
2020-01-08 09:13:12 +00:00
|
|
|
|
2020-01-23 02:12:11 +00:00
|
|
|
/// Map from field index in Avro schema to column number in block header. Or -1 if there is no corresponding column.
|
2020-01-08 09:13:12 +00:00
|
|
|
std::vector<int> field_mapping;
|
2020-01-23 02:12:11 +00:00
|
|
|
|
|
|
|
/// How to skip the corresponding field in Avro schema.
|
2020-01-08 09:13:12 +00:00
|
|
|
std::vector<SkipFn> skip_fns;
|
2020-01-23 02:12:11 +00:00
|
|
|
|
|
|
|
/// How to deserialize the corresponding field in Avro schema.
|
2020-01-08 09:13:12 +00:00
|
|
|
std::vector<DeserializeFn> deserialize_fns;
|
2020-02-01 04:13:12 +00:00
|
|
|
|
2020-02-01 17:13:50 +00:00
|
|
|
/// Map from name of named Avro type (record, enum, fixed) to SkipFn.
|
2020-02-01 04:13:12 +00:00
|
|
|
/// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList
|
2020-02-01 17:13:50 +00:00
|
|
|
std::map<avro::Name, SkipFn> symbolic_skip_fn_map;
|
2020-01-08 09:13:12 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
class AvroRowInputFormat : public IRowInputFormat
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
|
|
|
|
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
|
|
|
|
String getName() const override { return "AvroRowInputFormat"; }
|
|
|
|
|
|
|
|
private:
|
|
|
|
avro::DataFileReaderBase file_reader;
|
|
|
|
AvroDeserializer deserializer;
|
|
|
|
};
|
|
|
|
|
2020-01-18 19:29:53 +00:00
|
|
|
#if USE_POCO_JSON
|
2020-02-02 00:53:11 +00:00
|
|
|
/// Confluent framing + Avro binary datum encoding. Mainly used for Kafka.
|
|
|
|
/// Uses 3 caches:
|
|
|
|
/// 1. global: schema registry cache (base_url -> SchemaRegistry)
|
|
|
|
/// 2. SchemaRegistry: schema cache (schema_id -> schema)
|
|
|
|
/// 3. AvroConfluentRowInputFormat: deserializer cache (schema_id -> AvroDeserializer)
|
|
|
|
/// This is needed because KafkaStorage creates a new instance of InputFormat per a batch of messages
|
2020-01-08 09:13:12 +00:00
|
|
|
class AvroConfluentRowInputFormat : public IRowInputFormat
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
|
|
|
|
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
|
|
|
|
String getName() const override { return "AvroConfluentRowInputFormat"; }
|
|
|
|
|
2020-02-02 00:53:11 +00:00
|
|
|
class SchemaRegistry;
|
2020-01-08 09:13:12 +00:00
|
|
|
private:
|
2020-01-18 20:15:49 +00:00
|
|
|
const ColumnsWithTypeAndName header_columns;
|
2020-02-02 00:53:11 +00:00
|
|
|
std::shared_ptr<SchemaRegistry> schema_registry;
|
2020-01-08 09:13:12 +00:00
|
|
|
using SchemaId = uint32_t;
|
|
|
|
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;
|
2020-02-02 00:53:11 +00:00
|
|
|
const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id);
|
2020-01-08 09:13:12 +00:00
|
|
|
|
|
|
|
avro::InputStreamPtr input_stream;
|
|
|
|
avro::DecoderPtr decoder;
|
|
|
|
};
|
2020-01-10 22:46:48 +00:00
|
|
|
#endif
|
|
|
|
|
2020-01-08 09:13:12 +00:00
|
|
|
}
|
|
|
|
#endif
|