Avro nested fields - support complex types

This commit is contained in:
Andrew Onyshchuk 2020-04-25 17:30:17 -05:00
parent 8b1e504b19
commit e7ba75ce98
8 changed files with 192 additions and 63 deletions

View File

@ -153,6 +153,14 @@ static std::string nodeToJson(avro::NodePtr root_node)
return ss.str(); return ss.str();
} }
static std::string nodeName(avro::NodePtr node)
{
if (node->hasName())
return node->name().simpleName();
else
return avro::toString(node->type());
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type) AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{ {
WhichDataType target(target_type); WhichDataType target(target_type);
@ -170,25 +178,16 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
} }
break; break;
case avro::AVRO_INT: case avro::AVRO_INT:
return [target](IColumn & column, avro::Decoder & decoder) if (target_type->isValueRepresentedByNumber())
{ {
insertNumber(column, target, decoder.decodeInt()); return [target](IColumn & column, avro::Decoder & decoder)
};
case avro::AVRO_LONG:
if (target.isDateTime64())
{
auto date_time_scale = assert_cast<const DataTypeDateTime64 &>(*target_type).getScale();
auto logical_type = root_node->logicalType().type();
if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3)
|| (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6))
{ {
return [](IColumn & column, avro::Decoder & decoder) insertNumber(column, target, decoder.decodeInt());
{ };
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(decoder.decodeLong());
};
}
} }
else break;
case avro::AVRO_LONG:
if (target_type->isValueRepresentedByNumber())
{ {
return [target](IColumn & column, avro::Decoder & decoder) return [target](IColumn & column, avro::Decoder & decoder)
{ {
@ -197,20 +196,32 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
} }
break; break;
case avro::AVRO_FLOAT: case avro::AVRO_FLOAT:
return [target](IColumn & column, avro::Decoder & decoder) if (target_type->isValueRepresentedByNumber())
{ {
insertNumber(column, target, decoder.decodeFloat()); return [target](IColumn & column, avro::Decoder & decoder)
}; {
insertNumber(column, target, decoder.decodeFloat());
};
}
break;
case avro::AVRO_DOUBLE: case avro::AVRO_DOUBLE:
return [target](IColumn & column, avro::Decoder & decoder) if (target_type->isValueRepresentedByNumber())
{ {
insertNumber(column, target, decoder.decodeDouble()); return [target](IColumn & column, avro::Decoder & decoder)
}; {
insertNumber(column, target, decoder.decodeDouble());
};
}
break;
case avro::AVRO_BOOL: case avro::AVRO_BOOL:
return [target](IColumn & column, avro::Decoder & decoder) if (target_type->isValueRepresentedByNumber())
{ {
insertNumber(column, target, decoder.decodeBool()); return [target](IColumn & column, avro::Decoder & decoder)
}; {
insertNumber(column, target, decoder.decodeBool());
};
}
break;
case avro::AVRO_ARRAY: case avro::AVRO_ARRAY:
if (target.isArray()) if (target.isArray())
{ {
@ -337,6 +348,17 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
break; break;
} }
if (target.isNullable())
{
auto nested_deserialize = createDeserializeFn(root_node, removeNullable(target_type));
return [nested_deserialize](IColumn & column, avro::Decoder & decoder)
{
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
nested_deserialize(col.getNestedColumn(), decoder);
col.getNullMapData().push_back(0);
};
}
throw Exception( throw Exception(
"Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n" + nodeToJson(root_node), "Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n" + nodeToJson(root_node),
ErrorCodes::ILLEGAL_COLUMN); ErrorCodes::ILLEGAL_COLUMN);
@ -437,39 +459,68 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
} }
} }
void AvroDeserializer::createActions(const Block & header, const avro::NodePtr& node, std::string current_path) static inline std::string concatPath(const std::string & a, const std::string & b)
{ {
if (node->type() == avro::AVRO_RECORD) return a.empty() ? b : a + "." + b;
}
AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, const avro::NodePtr & node, const std::string & current_path)
{
if (node->type() == avro::AVRO_SYMBOLIC)
{ {
/// continue traversal only if some column name starts with current_path
auto keep_going = std::any_of(header.begin(), header.end(), [&current_path](const ColumnWithTypeAndName & col)
{
return col.name.starts_with(current_path);
});
auto resolved_node = avro::resolveSymbol(node);
if (keep_going)
return createAction(header, resolved_node, current_path);
else
return AvroDeserializer::Action(createSkipFn(resolved_node));
}
if (header.has(current_path))
{
auto target_column_idx = header.getPositionByName(current_path);
const auto & column = header.getByPosition(target_column_idx);
try
{
AvroDeserializer::Action action(target_column_idx, createDeserializeFn(node, column.type));
column_found[target_column_idx] = true;
return action;
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
throw;
}
}
else if (node->type() == avro::AVRO_RECORD)
{
std::vector<AvroDeserializer::Action> field_actions(node->leaves());
for (size_t i = 0; i < node->leaves(); ++i) for (size_t i = 0; i < node->leaves(); ++i)
{ {
const auto & field_node = node->leafAt(i); const auto & field_node = node->leafAt(i);
const auto & field_name = node->nameAt(i); const auto & field_name = node->nameAt(i);
auto field_path = current_path.empty() ? field_name : current_path + "." + field_name; field_actions[i] = createAction(header, field_node, concatPath(current_path, field_name));
createActions(header, field_node, field_path);
} }
return AvroDeserializer::Action::recordAction(field_actions);
}
else if (node->type() == avro::AVRO_UNION)
{
std::vector<AvroDeserializer::Action> branch_actions(node->leaves());
for (size_t i = 0; i < node->leaves(); ++i)
{
const auto & branch_node = node->leafAt(i);
const auto & branch_name = nodeName(branch_node);
branch_actions[i] = createAction(header, branch_node, concatPath(current_path, branch_name));
}
return AvroDeserializer::Action::unionAction(branch_actions);
} }
else else
{ {
if (header.has(current_path)) return AvroDeserializer::Action(createSkipFn(node));
{
auto target_column_idx = header.getPositionByName(current_path);
const auto & column = header.getByPosition(target_column_idx);
try
{
actions.emplace_back(target_column_idx, createDeserializeFn(node, column.type));
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
throw;
}
column_found[target_column_idx] = true;
}
else
{
actions.emplace_back(createSkipFn(node));
}
} }
} }
@ -482,7 +533,7 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem
} }
column_found.resize(header.columns()); column_found.resize(header.columns());
createActions(header, schema_root); row_action = createAction(header, schema_root);
for (size_t i = 0; i < header.columns(); ++i) for (size_t i = 0; i < header.columns(); ++i)
{ {
@ -493,11 +544,16 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem
} }
} }
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const
{ {
for (const auto& action : actions) ext.read_columns.assign(columns.size(), false);
row_action.execute(columns, decoder, ext);
for (size_t i = 0; i < ext.read_columns.size(); ++i)
{ {
action.execute(columns, decoder); if (!ext.read_columns[i])
{
columns[i]->insertDefault();
}
} }
} }
@ -510,12 +566,12 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_,
file_reader.init(); file_reader.init();
} }
bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext)
{ {
if (file_reader.hasMore()) if (file_reader.hasMore())
{ {
file_reader.decr(); file_reader.decr();
deserializer.deserializeRow(columns, file_reader.decoder()); deserializer.deserializeRow(columns, file_reader.decoder(), ext);
return true; return true;
} }
return false; return false;
@ -640,7 +696,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
decoder->init(*input_stream); decoder->init(*input_stream);
} }
bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{ {
if (in.eof()) if (in.eof())
{ {
@ -648,7 +704,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten
} }
SchemaId schema_id = readConfluentSchemaId(in); SchemaId schema_id = readConfluentSchemaId(in);
const auto & deserializer = getOrCreateDeserializer(schema_id); const auto & deserializer = getOrCreateDeserializer(schema_id);
deserializer.deserializeRow(columns, *decoder); deserializer.deserializeRow(columns, *decoder, ext);
decoder->drain(); decoder->drain();
return true; return true;
} }

View File

@ -23,7 +23,7 @@ class AvroDeserializer
{ {
public: public:
AvroDeserializer(const Block & header, avro::ValidSchema schema); AvroDeserializer(const Block & header, avro::ValidSchema schema);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const; void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;
private: private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>; using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
@ -33,13 +33,18 @@ private:
struct Action struct Action
{ {
enum Type { Deserialize, Skip }; enum Type {Noop, Deserialize, Skip, Record, Union};
Type type; Type type;
/// Deserialize /// Deserialize
int target_column_idx; int target_column_idx;
DeserializeFn deserialize_fn; DeserializeFn deserialize_fn;
/// Skip /// Skip
SkipFn skip_fn; SkipFn skip_fn;
/// Record | Union
std::vector<Action> actions;
Action() : type(Noop) {}
Action(int target_column_idx_, DeserializeFn deserialize_fn_) Action(int target_column_idx_, DeserializeFn deserialize_fn_)
: type(Deserialize) : type(Deserialize)
@ -50,27 +55,46 @@ private:
: type(Skip) : type(Skip)
, skip_fn(skip_fn_) {} , skip_fn(skip_fn_) {}
void execute(MutableColumns & columns, avro::Decoder & decoder) const static Action recordAction(std::vector<Action> field_actions) { return Action(Type::Record, field_actions); }
static Action unionAction(std::vector<Action> branch_actions) { return Action(Type::Union, branch_actions); }
void execute(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const
{ {
switch (type) switch (type)
{ {
case Noop:
break;
case Deserialize: case Deserialize:
deserialize_fn(*columns[target_column_idx], decoder); deserialize_fn(*columns[target_column_idx], decoder);
ext.read_columns[target_column_idx] = true;
break; break;
case Skip: case Skip:
skip_fn(decoder); skip_fn(decoder);
break; break;
case Record:
for (const auto& action : actions)
action.execute(columns, decoder, ext);
break;
case Union:
actions[decoder.decodeUnionIndex()].execute(columns, decoder, ext);
break;
} }
} }
private:
Action(Type type_, std::vector<Action> actions_)
: type(type_)
, actions(actions_) {}
}; };
/// Populate actions by recursively traversing root schema /// Populate actions by recursively traversing root schema
void createActions(const Block & header, const avro::NodePtr& node, std::string current_path = ""); AvroDeserializer::Action createAction(const Block & header, const avro::NodePtr & node, const std::string & current_path = "");
/// Bitmap of columns found in Avro schema /// Bitmap of columns found in Avro schema
std::vector<bool> column_found; std::vector<bool> column_found;
/// Deserialize/Skip actions for a row /// Deserialize/Skip actions for a row
std::vector<Action> actions; Action row_action;
/// Map from name of named Avro type (record, enum, fixed) to SkipFn. /// Map from name of named Avro type (record, enum, fixed) to SkipFn.
/// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList /// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList
std::map<avro::Name, SkipFn> symbolic_skip_fn_map; std::map<avro::Name, SkipFn> symbolic_skip_fn_map;

View File

@ -22,7 +22,23 @@
= nested = nested
1,"b1",2.2,2.3,"c3" 1,"b1",2.2,2.3,"c3"
2.3,"b1",1,"c3" 2.3,"b1",1,"c3"
not found not compatible
= nested_complex
\N
"b2_str"
"b2_str"
"default"
"b2_str"
"b2_str"
\N
"b2_str"
"b2_str"
"b3_str",\N
\N,\N
\N,3.1
\N
"level2_b2_str"
\N
= compression = compression
1000 1000
1000 1000

View File

@ -30,7 +30,19 @@ cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output
echo = nested echo = nested
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64, "b.a" String, "b.b" Double, "b.c" Double, c String' -q 'select * from table' cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64, "b.a" String, "b.b" Double, "b.c" Double, c String' -q 'select * from table'
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.c" Double, "b.a" String, a Int64, c String' -q 'select * from table' cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.c" Double, "b.a" String, a Int64, c String' -q 'select * from table'
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not found' -o cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not compatible' -o
echo = nested_complex
# special case union(null, T)
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.b2_null_str" Nullable(String)' -q 'select * from table'
# union branch to non-null with default
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b2_null_str.string\" String default 'default'" -q 'select * from table'
# union branch to nullable
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b2_null_str.string\" Nullable(String)" -q 'select * from table'
# multiple union branches simultaneously
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b3_null_str_double.string\" Nullable(String), \"b.b3_null_str_double.double\" Nullable(Double)" -q 'select * from table'
# and even nested recursive structures!
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b4_null_sub1.sub1.b2_null_str\" Nullable(String)" -q 'select * from table'
echo = compression echo = compression
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'

View File

@ -9,6 +9,7 @@ avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logic
avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro
avro-tools fromjson --schema-file references.avsc references.json > references.avro avro-tools fromjson --schema-file references.avsc references.json > references.avro
avro-tools fromjson --schema-file nested.avsc nested.json > nested.avro avro-tools fromjson --schema-file nested.avsc nested.json > nested.avro
avro-tools fromjson --schema-file nested_complex.avsc nested_complex.json > nested_complex.avro
#compression #compression
avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro

View File

@ -0,0 +1,17 @@
{
"type": "record",
"name": "main",
"fields": [
{"name": "a", "type": "long"},
{"name": "b", "type": {
"type": "record",
"name": "sub1",
"fields": [
{"name": "b1_str", "type": "string"},
{"name": "b2_null_str", "type": ["null", "string"]},
{"name": "b3_null_str_double", "type": ["null", "string", "double"]},
{"name": "b4_null_sub1", "type": ["null", "sub1"]}
]
}}
]
}

View File

@ -0,0 +1,3 @@
{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": null, "b3_null_str_double": {"string": "b3_str"}, "b4_null_sub1": null}}
{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": {"string": "b2_str"}, "b3_null_str_double": null, "b4_null_sub1": {"sub1": {"b1_str": "level2_b1_str", "b2_null_str": {"string": "level2_b2_str"}, "b3_null_str_double": null, "b4_null_sub1": null}}}}
{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": {"string": "b2_str"}, "b3_null_str_double": {"double": 3.1}, "b4_null_sub1": null}}