diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 52ceaf063b7..567f94d19fe 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -102,10 +102,6 @@ private: ReadBuffer & in; }; -static void deserializeNoop(IColumn &, avro::Decoder &) -{ -} - /// Insert value with conversion to the column of target type. template static void insertNumber(IColumn & column, WhichDataType type, T value) @@ -441,8 +437,43 @@ AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node) } } +void AvroDeserializer::createActions(const Block & header, const avro::NodePtr& node, std::string current_path) +{ + if (node->type() == avro::AVRO_RECORD) + { + for (size_t i = 0; i < node->leaves(); ++i) + { + const auto & field_node = node->leafAt(i); + const auto & field_name = node->nameAt(i); + auto field_path = current_path.empty() ? field_name : current_path + "." + field_name; + createActions(header, field_node, field_path); + } + } + else + { + if (header.has(current_path)) + { + auto target_column_idx = header.getPositionByName(current_path); + const auto & column = header.getByPosition(target_column_idx); + try + { + actions.emplace_back(target_column_idx, createDeserializeFn(node, column.type)); + } + catch (Exception & e) + { + e.addMessage("column " + column.name); + throw; + } + column_found[target_column_idx] = true; + } + else + { + actions.emplace_back(createSkipFn(node)); + } + } +} -AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema) +AvroDeserializer::AvroDeserializer(const Block & header, avro::ValidSchema schema) { const auto & schema_root = schema.root(); if (schema_root->type() != avro::AVRO_RECORD) @@ -450,48 +481,23 @@ AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro: throw Exception("Root schema must be a record", ErrorCodes::TYPE_MISMATCH); } - field_mapping.resize(schema_root->leaves(), -1); + column_found.resize(header.columns()); + createActions(header, schema_root); - for (size_t i = 0; i < schema_root->leaves(); ++i) + for (size_t i = 0; i < header.columns(); ++i) { - skip_fns.push_back(createSkipFn(schema_root->leafAt(i))); - deserialize_fns.push_back(&deserializeNoop); - } - - for (size_t i = 0; i < columns.size(); ++i) - { - const auto & column = columns[i]; - size_t field_index = 0; - if (!schema_root->nameIndex(column.name, field_index)) + if (!column_found[i]) { - throw Exception("Field " + column.name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN); + throw Exception("Field " + header.getByPosition(i).name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN); } - auto field_schema = schema_root->leafAt(field_index); - try - { - deserialize_fns[field_index] = createDeserializeFn(field_schema, column.type); - } - catch (Exception & e) - { - e.addMessage("column " + column.name); - throw; - } - field_mapping[field_index] = i; } } void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const { - for (size_t i = 0; i < field_mapping.size(); i++) + for (const auto& action : actions) { - if (field_mapping[i] >= 0) - { - deserialize_fns[i](*columns[field_mapping[i]], decoder); - } - else - { - skip_fns[i](decoder); - } + action.execute(columns, decoder); } } @@ -499,7 +505,7 @@ void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) : IRowInputFormat(header_, in_, params_) , file_reader(std::make_unique(in_)) - , deserializer(header_.getColumnsWithTypeAndName(), file_reader.dataSchema()) + , deserializer(output.getHeader(), file_reader.dataSchema()) { file_reader.init(); } @@ -626,8 +632,7 @@ static uint32_t readConfluentSchemaId(ReadBuffer & in) AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) - : IRowInputFormat(header_.cloneEmpty(), in_, params_) - , header_columns(header_.getColumnsWithTypeAndName()) + : IRowInputFormat(header_, in_, params_) , schema_registry(getConfluentSchemaRegistry(format_settings_)) , input_stream(std::make_unique(in)) , decoder(avro::binaryDecoder()) @@ -655,7 +660,7 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc if (it == deserializer_cache.end()) { auto schema = schema_registry->getSchema(schema_id); - AvroDeserializer deserializer(header_columns, schema); + AvroDeserializer deserializer(output.getHeader(), schema); it = deserializer_cache.emplace(schema_id, deserializer).first; } return it->second; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index b54c8ecede5..6245d704e74 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -22,7 +22,7 @@ namespace DB class AvroDeserializer { public: - AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema); + AvroDeserializer(const Block & header, avro::ValidSchema schema); void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const; private: @@ -31,15 +31,46 @@ private: static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type); SkipFn createSkipFn(avro::NodePtr root_node); - /// Map from field index in Avro schema to column number in block header. Or -1 if there is no corresponding column. - std::vector field_mapping; + struct Action + { + enum Type { Deserialize, Skip }; + Type type; + /// Deserialize + int target_column_idx; + DeserializeFn deserialize_fn; + /// Skip + SkipFn skip_fn; - /// How to skip the corresponding field in Avro schema. - std::vector skip_fns; + Action(int target_column_idx_, DeserializeFn deserialize_fn_) + : type(Deserialize) + , target_column_idx(target_column_idx_) + , deserialize_fn(deserialize_fn_) {} - /// How to deserialize the corresponding field in Avro schema. - std::vector deserialize_fns; + Action(SkipFn skip_fn_) + : type(Skip) + , skip_fn(skip_fn_) {} + void execute(MutableColumns & columns, avro::Decoder & decoder) const + { + switch (type) + { + case Deserialize: + deserialize_fn(*columns[target_column_idx], decoder); + break; + case Skip: + skip_fn(decoder); + break; + } + } + }; + + /// Populate actions by recursively traversing root schema + void createActions(const Block & header, const avro::NodePtr& node, std::string current_path = ""); + + /// Bitmap of columns found in Avro schema + std::vector column_found; + /// Deserialize/Skip actions for a row + std::vector actions; /// Map from name of named Avro type (record, enum, fixed) to SkipFn. /// This is to avoid infinite recursion when Avro schema contains self-references. e.g. LinkedList std::map symbolic_skip_fn_map; @@ -73,7 +104,6 @@ public: class SchemaRegistry; private: - const ColumnsWithTypeAndName header_columns; std::shared_ptr schema_registry; using SchemaId = uint32_t; std::unordered_map deserializer_cache; diff --git a/tests/queries/0_stateless/01060_avro.reference b/tests/queries/0_stateless/01060_avro.reference index a21e7a3a101..0550967a224 100644 --- a/tests/queries/0_stateless/01060_avro.reference +++ b/tests/queries/0_stateless/01060_avro.reference @@ -19,6 +19,10 @@ = references "a1","c1" "a2","c2" += nested +1,"b1",2.2,2.3,"c3" +2.3,"b1",1,"c3" +not found = compression 1000 1000 diff --git a/tests/queries/0_stateless/01060_avro.sh b/tests/queries/0_stateless/01060_avro.sh index 15e97abfa52..a64b2884731 100755 --- a/tests/queries/0_stateless/01060_avro.sh +++ b/tests/queries/0_stateless/01060_avro.sh @@ -27,6 +27,11 @@ cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --out echo = references cat $DATA_DIR/references.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a String, c String" -q 'select * from table' +echo = nested +cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64, "b.a" String, "b.b" Double, "b.c" Double, c String' -q 'select * from table' +cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b.c" Double, "b.a" String, a Int64, c String' -q 'select * from table' +cat $DATA_DIR/nested.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S '"b" Double' -q 'select * from table' 2>&1 | grep -i 'not found' -o + echo = compression cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' cat $DATA_DIR/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' @@ -68,4 +73,4 @@ ${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro ${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table' # type supported via conversion -${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c \ No newline at end of file +${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c | tr -d ' ' \ No newline at end of file diff --git a/tests/queries/0_stateless/data_avro/generate_avro.sh b/tests/queries/0_stateless/data_avro/generate_avro.sh index 6ec26efc049..b6ec75ad4dd 100755 --- a/tests/queries/0_stateless/data_avro/generate_avro.sh +++ b/tests/queries/0_stateless/data_avro/generate_avro.sh @@ -8,6 +8,7 @@ avro-tools fromjson --schema-file complex.avsc complex.json > complex.avro avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logical_types.avro avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro avro-tools fromjson --schema-file references.avsc references.json > references.avro +avro-tools fromjson --schema-file nested.avsc nested.json > nested.avro #compression avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro diff --git a/tests/queries/0_stateless/data_avro/nested.avro b/tests/queries/0_stateless/data_avro/nested.avro new file mode 100644 index 00000000000..1415c45d328 Binary files /dev/null and b/tests/queries/0_stateless/data_avro/nested.avro differ diff --git a/tests/queries/0_stateless/data_avro/nested.avsc b/tests/queries/0_stateless/data_avro/nested.avsc new file mode 100644 index 00000000000..966dc6defb3 --- /dev/null +++ b/tests/queries/0_stateless/data_avro/nested.avsc @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "main", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": { + "type": "record", + "name": "sub1", + "fields": [ + {"name": "a", "type": "string"}, + {"name": "b", "type": "double"}, + {"name": "c", "type": "double"} + ] + }}, + {"name": "c", "type": "string"} + ] + } \ No newline at end of file diff --git a/tests/queries/0_stateless/data_avro/nested.json b/tests/queries/0_stateless/data_avro/nested.json new file mode 100644 index 00000000000..63a7bc40e4b --- /dev/null +++ b/tests/queries/0_stateless/data_avro/nested.json @@ -0,0 +1 @@ +{"a":1, "b": {"a":"b1", "b": 2.2, "c": 2.3}, "c": "c3"}