Add Avro nested fields insert support

This commit is contained in:
Andrew Onyshchuk 2020-04-18 15:15:39 -05:00
parent abbeb13cf0
commit 14647ed03b
8 changed files with 113 additions and 50 deletions

View File

@ -102,10 +102,6 @@ private:
ReadBuffer & in;
};
static void deserializeNoop(IColumn &, avro::Decoder &)
{
}
/// Insert value with conversion to the column of target type.
template <typename T>
static void insertNumber(IColumn & column, WhichDataType type, T value)
@ -441,8 +437,43 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
}
}
void AvroDeserializer::createActions(const Block & header, const avro::NodePtr& node, std::string current_path)
{
if (node->type() == avro::AVRO_RECORD)
{
for (size_t i = 0; i < node->leaves(); ++i)
{
const auto& field_node = node->leafAt(i);
const auto& field_name = node->nameAt(i);
auto field_path = current_path.empty() ? field_name : current_path + "." + field_name;
createActions(header, field_node, field_path);
}
}
else
{
if (header.has(current_path))
{
auto target_column_idx = header.getPositionByName(current_path);
const auto& column = header.getByPosition(target_column_idx);
try
{
actions.emplace_back(target_column_idx, createDeserializeFn(node, column.type));
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
throw;
}
column_found[target_column_idx] = true;
}
else
{
actions.emplace_back(createSkipFn(node));
}
}
}
AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema)
AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema)
{
const auto & schema_root = schema.root();
if (schema_root->type() != avro::AVRO_RECORD)
@ -450,48 +481,23 @@ AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro:
throw Exception("Root schema must be a record", ErrorCodes::TYPE_MISMATCH);
}
field_mapping.resize(schema_root->leaves(), -1);
column_found.resize(header.columns());
createActions(header, schema_root);
for (size_t i = 0; i < schema_root->leaves(); ++i)
for (size_t i = 0; i < header.columns(); ++i)
{
skip_fns.push_back(createSkipFn(schema_root->leafAt(i)));
deserialize_fns.push_back(&deserializeNoop);
}
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & column = columns[i];
size_t field_index = 0;
if (!schema_root->nameIndex(column.name, field_index))
if (!column_found[i])
{
throw Exception("Field " + column.name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN);
throw Exception("Field " + header.getByPosition(i).name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN);
}
auto field_schema = schema_root->leafAt(field_index);
try
{
deserialize_fns[field_index] = createDeserializeFn(field_schema, column.type);
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
throw;
}
field_mapping[field_index] = i;
}
}
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const
{
for (size_t i = 0; i < field_mapping.size(); i++)
for (const auto& action : actions)
{
if (field_mapping[i] >= 0)
{
deserialize_fns[i](*columns[field_mapping[i]], decoder);
}
else
{
skip_fns[i](decoder);
}
action.execute(columns, decoder);
}
}
@ -499,7 +505,7 @@ void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder &
AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, params_)
, file_reader(std::make_unique<InputStreamReadBufferAdapter>(in_))
, deserializer(header_.getColumnsWithTypeAndName(), file_reader.dataSchema())
, deserializer(output.getHeader(), file_reader.dataSchema())
{
file_reader.init();
}
@ -626,8 +632,7 @@ static uint32_t readConfluentSchemaId(ReadBuffer & in)
AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_.cloneEmpty(), in_, params_)
, header_columns(header_.getColumnsWithTypeAndName())
: IRowInputFormat(header_, in_, params_)
, schema_registry(getConfluentSchemaRegistry(format_settings_))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(in))
, decoder(avro::binaryDecoder())
@ -655,7 +660,7 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc
if (it == deserializer_cache.end())
{
auto schema = schema_registry->getSchema(schema_id);
AvroDeserializer deserializer(header_columns, schema);
AvroDeserializer deserializer(output.getHeader(), schema);
it = deserializer_cache.emplace(schema_id, deserializer).first;
}
return it->second;

View File

@ -22,7 +22,7 @@ namespace DB
class AvroDeserializer
{
public:
AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema);
AvroDeserializer(const Block & header, avro::ValidSchema schema);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const;
private:
@ -31,15 +31,46 @@ private:
static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
SkipFn createSkipFn(avro::NodePtr root_node);
/// Map from field index in Avro schema to column number in block header. Or -1 if there is no corresponding column.
std::vector<int> field_mapping;
struct Action
{
enum Type { Deserialize, Skip };
Type type;
/// Deserialize
int target_column_idx;
DeserializeFn deserialize_fn;
/// Skip
SkipFn skip_fn;
/// How to skip the corresponding field in Avro schema.
std::vector<SkipFn> skip_fns;
Action(int target_column_idx_, DeserializeFn deserialize_fn_)
: type(Deserialize)
, target_column_idx(target_column_idx_)
, deserialize_fn(deserialize_fn_) {}
/// How to deserialize the corresponding field in Avro schema.
std::vector<DeserializeFn> deserialize_fns;
Action(SkipFn skip_fn_)
: type(Skip)
, skip_fn(skip_fn_) {}
void execute(MutableColumns & columns, avro::Decoder & decoder) const
{
switch(type)
{
case Deserialize:
deserialize_fn(*columns[target_column_idx], decoder);
break;
case Skip:
skip_fn(decoder);
break;
}
}
};
/// Populate actions by recursively traversing root schema
void createActions(const Block & header, const avro::NodePtr& node, std::string current_path = "");
/// Bitmap of columns found in Avro schema
std::vector<bool> column_found;
/// Deserialize/Skip actions for a row
std::vector<Action> actions;
/// Map from name of named Avro type (record, enum, fixed) to SkipFn.
/// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList
std::map<avro::Name, SkipFn> symbolic_skip_fn_map;
@ -73,7 +104,6 @@ public:
class SchemaRegistry;
private:
const ColumnsWithTypeAndName header_columns;
std::shared_ptr<SchemaRegistry> schema_registry;
using SchemaId = uint32_t;
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;

View File

@ -19,6 +19,10 @@
= references
"a1","c1"
"a2","c2"
= nested
1,"b1",2.2,2.3,"c3"
2.3,"b1",1,"c3"
not found
= compression
1000
1000

View File

@ -27,6 +27,11 @@ cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --out
echo = references
cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a String, c String" -q 'select * from table'
echo = nested
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64, "b.a" String, "b.b" Double, "b.c" Double, c String' -q 'select * from table'
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.c" Double, "b.a" String, a Int64, c String' -q 'select * from table'
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not found' -o
echo = compression
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat $DATA_DIR/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
@ -68,4 +73,4 @@ ${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
# type supported via conversion
${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c
${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c | tr -d ' '

View File

@ -8,6 +8,7 @@ avro-tools fromjson --schema-file complex.avsc complex.json > complex.avro
avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logical_types.avro
avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro
avro-tools fromjson --schema-file references.avsc references.json > references.avro
avro-tools fromjson --schema-file nested.avsc nested.json > nested.avro
#compression
avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro

Binary file not shown.

View File

@ -0,0 +1,17 @@
{
"type": "record",
"name": "main",
"fields": [
{"name": "a", "type": "long"},
{"name": "b", "type": {
"type": "record",
"name": "sub1",
"fields": [
{"name": "a", "type": "string"},
{"name": "b", "type": "double"},
{"name": "c", "type": "double"}
]
}},
{"name": "c", "type": "string"}
]
}

View File

@ -0,0 +1 @@
{"a":1, "b": {"a":"b1", "b": 2.2, "c": 2.3}, "c": "c3"}