Merge pull request #10502 from oandrew/avro-nested2

Avro nested fields - support complex types
This commit is contained in:
alexey-milovidov 2020-04-27 13:44:58 +03:00 committed by GitHub
commit 436e4e3f5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 64 deletions

View File

@ -119,7 +119,6 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
case TypeIndex::UInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(value);
break;
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::UInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
@ -141,6 +140,15 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
case TypeIndex::Float64:
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
case TypeIndex::Decimal32:
assert_cast<ColumnDecimal<Decimal32> &>(column).insertValue(value);
break;
case TypeIndex::Decimal64:
assert_cast<ColumnDecimal<Decimal64> &>(column).insertValue(value);
break;
case TypeIndex::DateTime64:
assert_cast<ColumnDecimal<DateTime64> &>(column).insertValue(value);
break;
default:
throw Exception("Type is not compatible with Avro", ErrorCodes::ILLEGAL_COLUMN);
}
@ -153,6 +161,14 @@ static std::string nodeToJson(avro::NodePtr root_node)
return ss.str();
}
static std::string nodeName(avro::NodePtr node)
{
if (node->hasName())
return node->name().simpleName();
else
return avro::toString(node->type());
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
WhichDataType target(target_type);
@ -170,25 +186,16 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
}
break;
case avro::AVRO_INT:
return [target](IColumn & column, avro::Decoder & decoder)
if (target_type->isValueRepresentedByNumber())
{
insertNumber(column, target, decoder.decodeInt());
};
case avro::AVRO_LONG:
if (target.isDateTime64())
{
auto date_time_scale = assert_cast<const DataTypeDateTime64 &>(*target_type).getScale();
auto logical_type = root_node->logicalType().type();
if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3)
|| (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6))
return [target](IColumn & column, avro::Decoder & decoder)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(decoder.decodeLong());
};
}
insertNumber(column, target, decoder.decodeInt());
};
}
else
break;
case avro::AVRO_LONG:
if (target_type->isValueRepresentedByNumber())
{
return [target](IColumn & column, avro::Decoder & decoder)
{
@ -197,20 +204,32 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
}
break;
case avro::AVRO_FLOAT:
return [target](IColumn & column, avro::Decoder & decoder)
if (target_type->isValueRepresentedByNumber())
{
insertNumber(column, target, decoder.decodeFloat());
};
return [target](IColumn & column, avro::Decoder & decoder)
{
insertNumber(column, target, decoder.decodeFloat());
};
}
break;
case avro::AVRO_DOUBLE:
return [target](IColumn & column, avro::Decoder & decoder)
if (target_type->isValueRepresentedByNumber())
{
insertNumber(column, target, decoder.decodeDouble());
};
return [target](IColumn & column, avro::Decoder & decoder)
{
insertNumber(column, target, decoder.decodeDouble());
};
}
break;
case avro::AVRO_BOOL:
return [target](IColumn & column, avro::Decoder & decoder)
if (target_type->isValueRepresentedByNumber())
{
insertNumber(column, target, decoder.decodeBool());
};
return [target](IColumn & column, avro::Decoder & decoder)
{
insertNumber(column, target, decoder.decodeBool());
};
}
break;
case avro::AVRO_ARRAY:
if (target.isArray())
{
@ -337,6 +356,17 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
break;
}
if (target.isNullable())
{
auto nested_deserialize = createDeserializeFn(root_node, removeNullable(target_type));
return [nested_deserialize](IColumn & column, avro::Decoder & decoder)
{
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
nested_deserialize(col.getNestedColumn(), decoder);
col.getNullMapData().push_back(0);
};
}
throw Exception(
"Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n" + nodeToJson(root_node),
ErrorCodes::ILLEGAL_COLUMN);
@ -437,39 +467,68 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
}
}
void AvroDeserializer::createActions(const Block & header, const avro::NodePtr& node, std::string current_path)
static inline std::string concatPath(const std::string & a, const std::string & b)
{
if (node->type() == avro::AVRO_RECORD)
return a.empty() ? b : a + "." + b;
}
AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, const avro::NodePtr & node, const std::string & current_path)
{
if (node->type() == avro::AVRO_SYMBOLIC)
{
/// continue traversal only if some column name starts with current_path
auto keep_going = std::any_of(header.begin(), header.end(), [&current_path](const ColumnWithTypeAndName & col)
{
return col.name.starts_with(current_path);
});
auto resolved_node = avro::resolveSymbol(node);
if (keep_going)
return createAction(header, resolved_node, current_path);
else
return AvroDeserializer::Action(createSkipFn(resolved_node));
}
if (header.has(current_path))
{
auto target_column_idx = header.getPositionByName(current_path);
const auto & column = header.getByPosition(target_column_idx);
try
{
AvroDeserializer::Action action(target_column_idx, createDeserializeFn(node, column.type));
column_found[target_column_idx] = true;
return action;
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
throw;
}
}
else if (node->type() == avro::AVRO_RECORD)
{
std::vector<AvroDeserializer::Action> field_actions(node->leaves());
for (size_t i = 0; i < node->leaves(); ++i)
{
const auto & field_node = node->leafAt(i);
const auto & field_name = node->nameAt(i);
auto field_path = current_path.empty() ? field_name : current_path + "." + field_name;
createActions(header, field_node, field_path);
field_actions[i] = createAction(header, field_node, concatPath(current_path, field_name));
}
return AvroDeserializer::Action::recordAction(field_actions);
}
else if (node->type() == avro::AVRO_UNION)
{
std::vector<AvroDeserializer::Action> branch_actions(node->leaves());
for (size_t i = 0; i < node->leaves(); ++i)
{
const auto & branch_node = node->leafAt(i);
const auto & branch_name = nodeName(branch_node);
branch_actions[i] = createAction(header, branch_node, concatPath(current_path, branch_name));
}
return AvroDeserializer::Action::unionAction(branch_actions);
}
else
{
if (header.has(current_path))
{
auto target_column_idx = header.getPositionByName(current_path);
const auto & column = header.getByPosition(target_column_idx);
try
{
actions.emplace_back(target_column_idx, createDeserializeFn(node, column.type));
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
throw;
}
column_found[target_column_idx] = true;
}
else
{
actions.emplace_back(createSkipFn(node));
}
return AvroDeserializer::Action(createSkipFn(node));
}
}
@ -482,7 +541,7 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem
}
column_found.resize(header.columns());
createActions(header, schema_root);
row_action = createAction(header, schema_root);
for (size_t i = 0; i < header.columns(); ++i)
{
@ -493,11 +552,16 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem
}
}
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const
{
for (const auto& action : actions)
ext.read_columns.assign(columns.size(), false);
row_action.execute(columns, decoder, ext);
for (size_t i = 0; i < ext.read_columns.size(); ++i)
{
action.execute(columns, decoder);
if (!ext.read_columns[i])
{
columns[i]->insertDefault();
}
}
}
@ -510,12 +574,12 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_,
file_reader.init();
}
bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext)
{
if (file_reader.hasMore())
{
file_reader.decr();
deserializer.deserializeRow(columns, file_reader.decoder());
deserializer.deserializeRow(columns, file_reader.decoder(), ext);
return true;
}
return false;
@ -640,7 +704,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
decoder->init(*input_stream);
}
bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (in.eof())
{
@ -648,7 +712,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten
}
SchemaId schema_id = readConfluentSchemaId(in);
const auto & deserializer = getOrCreateDeserializer(schema_id);
deserializer.deserializeRow(columns, *decoder);
deserializer.deserializeRow(columns, *decoder, ext);
decoder->drain();
return true;
}

View File

@ -23,7 +23,7 @@ class AvroDeserializer
{
public:
AvroDeserializer(const Block & header, avro::ValidSchema schema);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const;
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;
private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
@ -33,13 +33,18 @@ private:
struct Action
{
enum Type { Deserialize, Skip };
enum Type {Noop, Deserialize, Skip, Record, Union};
Type type;
/// Deserialize
int target_column_idx;
DeserializeFn deserialize_fn;
/// Skip
SkipFn skip_fn;
/// Record | Union
std::vector<Action> actions;
Action() : type(Noop) {}
Action(int target_column_idx_, DeserializeFn deserialize_fn_)
: type(Deserialize)
@ -50,27 +55,46 @@ private:
: type(Skip)
, skip_fn(skip_fn_) {}
void execute(MutableColumns & columns, avro::Decoder & decoder) const
static Action recordAction(std::vector<Action> field_actions) { return Action(Type::Record, field_actions); }
static Action unionAction(std::vector<Action> branch_actions) { return Action(Type::Union, branch_actions); }
void execute(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const
{
switch (type)
{
case Noop:
break;
case Deserialize:
deserialize_fn(*columns[target_column_idx], decoder);
ext.read_columns[target_column_idx] = true;
break;
case Skip:
skip_fn(decoder);
break;
case Record:
for (const auto & action : actions)
action.execute(columns, decoder, ext);
break;
case Union:
actions[decoder.decodeUnionIndex()].execute(columns, decoder, ext);
break;
}
}
private:
Action(Type type_, std::vector<Action> actions_)
: type(type_)
, actions(actions_) {}
};
/// Populate actions by recursively traversing root schema
void createActions(const Block & header, const avro::NodePtr& node, std::string current_path = "");
AvroDeserializer::Action createAction(const Block & header, const avro::NodePtr & node, const std::string & current_path = "");
/// Bitmap of columns found in Avro schema
std::vector<bool> column_found;
/// Deserialize/Skip actions for a row
std::vector<Action> actions;
Action row_action;
/// Map from name of named Avro type (record, enum, fixed) to SkipFn.
/// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList
std::map<avro::Name, SkipFn> symbolic_skip_fn_map;

View File

@ -22,7 +22,23 @@
= nested
1,"b1",2.2,2.3,"c3"
2.3,"b1",1,"c3"
not found
not compatible
= nested_complex
\N
"b2_str"
"b2_str"
"default"
"b2_str"
"b2_str"
\N
"b2_str"
"b2_str"
"b3_str",\N
\N,\N
\N,3.1
\N
"level2_b2_str"
\N
= compression
1000
1000

View File

@ -30,7 +30,19 @@ cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output
echo = nested
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64, "b.a" String, "b.b" Double, "b.c" Double, c String' -q 'select * from table'
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.c" Double, "b.a" String, a Int64, c String' -q 'select * from table'
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not found' -o
cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not compatible' -o
echo = nested_complex
# special case union(null, T)
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.b2_null_str" Nullable(String)' -q 'select * from table'
# union branch to non-null with default
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b2_null_str.string\" String default 'default'" -q 'select * from table'
# union branch to nullable
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b2_null_str.string\" Nullable(String)" -q 'select * from table'
# multiple union branches simultaneously
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b3_null_str_double.string\" Nullable(String), \"b.b3_null_str_double.double\" Nullable(Double)" -q 'select * from table'
# and even nested recursive structures!
cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b4_null_sub1.sub1.b2_null_str\" Nullable(String)" -q 'select * from table'
echo = compression
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'

View File

@ -9,6 +9,7 @@ avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logic
avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro
avro-tools fromjson --schema-file references.avsc references.json > references.avro
avro-tools fromjson --schema-file nested.avsc nested.json > nested.avro
avro-tools fromjson --schema-file nested_complex.avsc nested_complex.json > nested_complex.avro
#compression
avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro

View File

@ -0,0 +1,17 @@
{
"type": "record",
"name": "main",
"fields": [
{"name": "a", "type": "long"},
{"name": "b", "type": {
"type": "record",
"name": "sub1",
"fields": [
{"name": "b1_str", "type": "string"},
{"name": "b2_null_str", "type": ["null", "string"]},
{"name": "b3_null_str_double", "type": ["null", "string", "double"]},
{"name": "b4_null_sub1", "type": ["null", "sub1"]}
]
}}
]
}

View File

@ -0,0 +1,3 @@
{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": null, "b3_null_str_double": {"string": "b3_str"}, "b4_null_sub1": null}}
{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": {"string": "b2_str"}, "b3_null_str_double": null, "b4_null_sub1": {"sub1": {"b1_str": "level2_b1_str", "b2_null_str": {"string": "level2_b2_str"}, "b3_null_str_double": null, "b4_null_sub1": null}}}}
{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": {"string": "b2_str"}, "b3_null_str_double": {"double": 3.1}, "b4_null_sub1": null}}