diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 4bd3d5c2ada..b7015a11af9 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -119,7 +119,6 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::UInt32: assert_cast(column).insertValue(value); break; - case TypeIndex::DateTime64: [[fallthrough]]; case TypeIndex::UInt64: assert_cast(column).insertValue(value); break; @@ -141,6 +140,15 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::Float64: assert_cast(column).insertValue(value); break; + case TypeIndex::Decimal32: + assert_cast &>(column).insertValue(value); + break; + case TypeIndex::Decimal64: + assert_cast &>(column).insertValue(value); + break; + case TypeIndex::DateTime64: + assert_cast &>(column).insertValue(value); + break; default: throw Exception("Type is not compatible with Avro", ErrorCodes::ILLEGAL_COLUMN); } @@ -153,6 +161,14 @@ static std::string nodeToJson(avro::NodePtr root_node) return ss.str(); } +static std::string nodeName(avro::NodePtr node) +{ + if (node->hasName()) + return node->name().simpleName(); + else + return avro::toString(node->type()); +} + AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type) { WhichDataType target(target_type); @@ -170,25 +186,16 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node } break; case avro::AVRO_INT: - return [target](IColumn & column, avro::Decoder & decoder) + if (target_type->isValueRepresentedByNumber()) { - insertNumber(column, target, decoder.decodeInt()); - }; - case avro::AVRO_LONG: - if (target.isDateTime64()) - { - auto date_time_scale = assert_cast(*target_type).getScale(); - auto logical_type = root_node->logicalType().type(); - if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3) - || (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6)) + return [target](IColumn & column, avro::Decoder & decoder) { - return [](IColumn & column, avro::Decoder & decoder) - { - assert_cast(column).insertValue(decoder.decodeLong()); - }; - } + insertNumber(column, target, decoder.decodeInt()); + }; } - else + break; + case avro::AVRO_LONG: + if (target_type->isValueRepresentedByNumber()) { return [target](IColumn & column, avro::Decoder & decoder) { @@ -197,20 +204,32 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node } break; case avro::AVRO_FLOAT: - return [target](IColumn & column, avro::Decoder & decoder) + if (target_type->isValueRepresentedByNumber()) { - insertNumber(column, target, decoder.decodeFloat()); - }; + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeFloat()); + }; + } + break; case avro::AVRO_DOUBLE: - return [target](IColumn & column, avro::Decoder & decoder) + if (target_type->isValueRepresentedByNumber()) { - insertNumber(column, target, decoder.decodeDouble()); - }; + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeDouble()); + }; + } + break; case avro::AVRO_BOOL: - return [target](IColumn & column, avro::Decoder & decoder) + if (target_type->isValueRepresentedByNumber()) { - insertNumber(column, target, decoder.decodeBool()); - }; + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeBool()); + }; + } + break; case avro::AVRO_ARRAY: if (target.isArray()) { @@ -337,6 +356,17 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node break; } + if (target.isNullable()) + { + auto nested_deserialize = createDeserializeFn(root_node, removeNullable(target_type)); + return [nested_deserialize](IColumn & column, avro::Decoder & decoder) + { + ColumnNullable & col = assert_cast(column); + nested_deserialize(col.getNestedColumn(), decoder); + col.getNullMapData().push_back(0); + }; + } + throw Exception( "Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n" + nodeToJson(root_node), ErrorCodes::ILLEGAL_COLUMN); @@ -437,39 +467,68 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node) } } -void AvroDeserializer::createActions(const Block & header, const avro::NodePtr& node, std::string current_path) +static inline std::string concatPath(const std::string & a, const std::string & b) { - if (node->type() == avro::AVRO_RECORD) + return a.empty() ? b : a + "." + b; +} + +AvroDeserializer::Action AvroDeserializer::createAction(const Block & header, const avro::NodePtr & node, const std::string & current_path) +{ + if (node->type() == avro::AVRO_SYMBOLIC) { + /// continue traversal only if some column name starts with current_path + auto keep_going = std::any_of(header.begin(), header.end(), [¤t_path](const ColumnWithTypeAndName & col) + { + return col.name.starts_with(current_path); + }); + auto resolved_node = avro::resolveSymbol(node); + if (keep_going) + return createAction(header, resolved_node, current_path); + else + return AvroDeserializer::Action(createSkipFn(resolved_node)); + } + + if (header.has(current_path)) + { + auto target_column_idx = header.getPositionByName(current_path); + const auto & column = header.getByPosition(target_column_idx); + try + { + AvroDeserializer::Action action(target_column_idx, createDeserializeFn(node, column.type)); + column_found[target_column_idx] = true; + return action; + } + catch (Exception & e) + { + e.addMessage("column " + column.name); + throw; + } + } + else if (node->type() == avro::AVRO_RECORD) + { + std::vector field_actions(node->leaves()); for (size_t i = 0; i < node->leaves(); ++i) { const auto & field_node = node->leafAt(i); const auto & field_name = node->nameAt(i); - auto field_path = current_path.empty() ? field_name : current_path + "." + field_name; - createActions(header, field_node, field_path); + field_actions[i] = createAction(header, field_node, concatPath(current_path, field_name)); } + return AvroDeserializer::Action::recordAction(field_actions); + } + else if (node->type() == avro::AVRO_UNION) + { + std::vector branch_actions(node->leaves()); + for (size_t i = 0; i < node->leaves(); ++i) + { + const auto & branch_node = node->leafAt(i); + const auto & branch_name = nodeName(branch_node); + branch_actions[i] = createAction(header, branch_node, concatPath(current_path, branch_name)); + } + return AvroDeserializer::Action::unionAction(branch_actions); } else { - if (header.has(current_path)) - { - auto target_column_idx = header.getPositionByName(current_path); - const auto & column = header.getByPosition(target_column_idx); - try - { - actions.emplace_back(target_column_idx, createDeserializeFn(node, column.type)); - } - catch (Exception & e) - { - e.addMessage("column " + column.name); - throw; - } - column_found[target_column_idx] = true; - } - else - { - actions.emplace_back(createSkipFn(node)); - } + return AvroDeserializer::Action(createSkipFn(node)); } } @@ -482,7 +541,7 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem } column_found.resize(header.columns()); - createActions(header, schema_root); + row_action = createAction(header, schema_root); for (size_t i = 0; i < header.columns(); ++i) { @@ -493,11 +552,16 @@ AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schem } } -void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const +void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const { - for (const auto& action : actions) + ext.read_columns.assign(columns.size(), false); + row_action.execute(columns, decoder, ext); + for (size_t i = 0; i < ext.read_columns.size(); ++i) { - action.execute(columns, decoder); + if (!ext.read_columns[i]) + { + columns[i]->insertDefault(); + } } } @@ -510,12 +574,12 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, file_reader.init(); } -bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) +bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext) { if (file_reader.hasMore()) { file_reader.decr(); - deserializer.deserializeRow(columns, file_reader.decoder()); + deserializer.deserializeRow(columns, file_reader.decoder(), ext); return true; } return false; @@ -640,7 +704,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( decoder->init(*input_stream); } -bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) +bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) { if (in.eof()) { @@ -648,7 +712,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten } SchemaId schema_id = readConfluentSchemaId(in); const auto & deserializer = getOrCreateDeserializer(schema_id); - deserializer.deserializeRow(columns, *decoder); + deserializer.deserializeRow(columns, *decoder, ext); decoder->drain(); return true; } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 6245d704e74..38d44812f7e 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -23,7 +23,7 @@ class AvroDeserializer { public: AvroDeserializer(const Block & header, avro::ValidSchema schema); - void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const; + void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const; private: using DeserializeFn = std::function; @@ -33,13 +33,18 @@ private: struct Action { - enum Type { Deserialize, Skip }; + enum Type {Noop, Deserialize, Skip, Record, Union}; Type type; /// Deserialize int target_column_idx; DeserializeFn deserialize_fn; /// Skip SkipFn skip_fn; + /// Record | Union + std::vector actions; + + + Action() : type(Noop) {} Action(int target_column_idx_, DeserializeFn deserialize_fn_) : type(Deserialize) @@ -50,27 +55,46 @@ private: : type(Skip) , skip_fn(skip_fn_) {} - void execute(MutableColumns & columns, avro::Decoder & decoder) const + static Action recordAction(std::vector field_actions) { return Action(Type::Record, field_actions); } + + static Action unionAction(std::vector branch_actions) { return Action(Type::Union, branch_actions); } + + + void execute(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const { switch (type) { + case Noop: + break; case Deserialize: deserialize_fn(*columns[target_column_idx], decoder); + ext.read_columns[target_column_idx] = true; break; case Skip: skip_fn(decoder); break; + case Record: + for (const auto & action : actions) + action.execute(columns, decoder, ext); + break; + case Union: + actions[decoder.decodeUnionIndex()].execute(columns, decoder, ext); + break; } } + private: + Action(Type type_, std::vector actions_) + : type(type_) + , actions(actions_) {} }; /// Populate actions by recursively traversing root schema - void createActions(const Block & header, const avro::NodePtr& node, std::string current_path = ""); + AvroDeserializer::Action createAction(const Block & header, const avro::NodePtr & node, const std::string & current_path = ""); /// Bitmap of columns found in Avro schema std::vector column_found; /// Deserialize/Skip actions for a row - std::vector actions; + Action row_action; /// Map from name of named Avro type (record, enum, fixed) to SkipFn. /// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList std::map symbolic_skip_fn_map; diff --git a/tests/queries/0_stateless/01060_avro.reference b/tests/queries/0_stateless/01060_avro.reference index 0550967a224..192a86ca9bb 100644 --- a/tests/queries/0_stateless/01060_avro.reference +++ b/tests/queries/0_stateless/01060_avro.reference @@ -22,7 +22,23 @@ = nested 1,"b1",2.2,2.3,"c3" 2.3,"b1",1,"c3" -not found +not compatible += nested_complex +\N +"b2_str" +"b2_str" +"default" +"b2_str" +"b2_str" +\N +"b2_str" +"b2_str" +"b3_str",\N +\N,\N +\N,3.1 +\N +"level2_b2_str" +\N = compression 1000 1000 diff --git a/tests/queries/0_stateless/01060_avro.sh b/tests/queries/0_stateless/01060_avro.sh index a64b2884731..71f27c8491a 100755 --- a/tests/queries/0_stateless/01060_avro.sh +++ b/tests/queries/0_stateless/01060_avro.sh @@ -30,7 +30,19 @@ cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output echo = nested cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64, "b.a" String, "b.b" Double, "b.c" Double, c String' -q 'select * from table' cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.c" Double, "b.a" String, a Int64, c String' -q 'select * from table' -cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not found' -o +cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not compatible' -o + +echo = nested_complex +# special case union(null, T) +cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.b2_null_str" Nullable(String)' -q 'select * from table' +# union branch to non-null with default +cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b2_null_str.string\" String default 'default'" -q 'select * from table' +# union branch to nullable +cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b2_null_str.string\" Nullable(String)" -q 'select * from table' +# multiple union branches simultaneously +cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b3_null_str_double.string\" Nullable(String), \"b.b3_null_str_double.double\" Nullable(Double)" -q 'select * from table' +# and even nested recursive structures! +cat $DATA_DIR/nested_complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "\"b.b4_null_sub1.sub1.b2_null_str\" Nullable(String)" -q 'select * from table' echo = compression cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' diff --git a/tests/queries/0_stateless/data_avro/generate_avro.sh b/tests/queries/0_stateless/data_avro/generate_avro.sh index b6ec75ad4dd..0bd8dad773b 100755 --- a/tests/queries/0_stateless/data_avro/generate_avro.sh +++ b/tests/queries/0_stateless/data_avro/generate_avro.sh @@ -9,6 +9,7 @@ avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logic avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro avro-tools fromjson --schema-file references.avsc references.json > references.avro avro-tools fromjson --schema-file nested.avsc nested.json > nested.avro +avro-tools fromjson --schema-file nested_complex.avsc nested_complex.json > nested_complex.avro #compression avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro diff --git a/tests/queries/0_stateless/data_avro/nested_complex.avro b/tests/queries/0_stateless/data_avro/nested_complex.avro new file mode 100644 index 00000000000..47caae12c40 Binary files /dev/null and b/tests/queries/0_stateless/data_avro/nested_complex.avro differ diff --git a/tests/queries/0_stateless/data_avro/nested_complex.avsc b/tests/queries/0_stateless/data_avro/nested_complex.avsc new file mode 100644 index 00000000000..522392863a8 --- /dev/null +++ b/tests/queries/0_stateless/data_avro/nested_complex.avsc @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "main", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": { + "type": "record", + "name": "sub1", + "fields": [ + {"name": "b1_str", "type": "string"}, + {"name": "b2_null_str", "type": ["null", "string"]}, + {"name": "b3_null_str_double", "type": ["null", "string", "double"]}, + {"name": "b4_null_sub1", "type": ["null", "sub1"]} + ] + }} + ] + } \ No newline at end of file diff --git a/tests/queries/0_stateless/data_avro/nested_complex.json b/tests/queries/0_stateless/data_avro/nested_complex.json new file mode 100644 index 00000000000..068a6a2e110 --- /dev/null +++ b/tests/queries/0_stateless/data_avro/nested_complex.json @@ -0,0 +1,3 @@ +{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": null, "b3_null_str_double": {"string": "b3_str"}, "b4_null_sub1": null}} +{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": {"string": "b2_str"}, "b3_null_str_double": null, "b4_null_sub1": {"sub1": {"b1_str": "level2_b1_str", "b2_null_str": {"string": "level2_b2_str"}, "b3_null_str_double": null, "b4_null_sub1": null}}}} +{"a": 1, "b": {"b1_str": "b1_str", "b2_null_str": {"string": "b2_str"}, "b3_null_str_double": {"double": 3.1}, "b4_null_sub1": null}} \ No newline at end of file