From a75ce4477bc8e60c848947e2d78140650a561ba8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 23 Jan 2020 05:01:58 +0300 Subject: [PATCH] Fixed error in Avro format --- .../Formats/Impl/AvroRowOutputFormat.cpp | 23 +++++++++++-------- .../Formats/Impl/AvroRowOutputFormat.h | 4 +++- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index b0375c7e2ae..c32f46552b1 100644 --- a/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -78,8 +78,10 @@ private: }; -AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type) +AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment) { + ++type_name_increment; + switch (data_type->getTypeId()) { case TypeIndex::UInt8: @@ -169,7 +171,8 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF }}; case TypeIndex::FixedString: { - auto schema = avro::FixedSchema(data_type->getSizeOfValueInMemory(), "fixed"); + auto size = data_type->getSizeOfValueInMemory(); + auto schema = avro::FixedSchema(size, "fixed" + toString(size)); return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder) { const StringRef & s = assert_cast(column).getDataAt(row_num); @@ -178,7 +181,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF } case TypeIndex::Enum8: { - auto schema = avro::EnumSchema("enum8"); + auto schema = avro::EnumSchema("enum8_" + toString(type_name_increment)); /// type names must be different for different types. std::unordered_map enum_mapping; const auto & enum_values = assert_cast(*data_type).getValues(); for (size_t i = 0; i < enum_values.size(); ++i) @@ -194,7 +197,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF } case TypeIndex::Enum16: { - auto schema = avro::EnumSchema("enum16"); + auto schema = avro::EnumSchema("enum16" + toString(type_name_increment)); std::unordered_map enum_mapping; const auto & enum_values = assert_cast(*data_type).getValues(); for (size_t i = 0; i < enum_values.size(); ++i) @@ -211,7 +214,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF case TypeIndex::Array: { const auto & array_type = assert_cast(*data_type); - auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType()); + auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType(), type_name_increment); auto schema = avro::ArraySchema(nested_mapping.schema); return {schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) { @@ -237,7 +240,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF case TypeIndex::Nullable: { auto nested_type = removeNullable(data_type); - auto nested_mapping = createSchemaWithSerializeFn(nested_type); + auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment); if (nested_type->getTypeId() == TypeIndex::Nothing) { return nested_mapping; @@ -266,7 +269,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF case TypeIndex::LowCardinality: { const auto & nested_type = removeLowCardinality(data_type); - auto nested_mapping = createSchemaWithSerializeFn(nested_type); + auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment); return {nested_mapping.schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) { const auto & col = assert_cast(column); @@ -285,11 +288,13 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF AvroSerializer::AvroSerializer(const ColumnsWithTypeAndName & columns) { avro::RecordSchema record_schema("row"); + + size_t type_name_increment = 0; for (auto & column : columns) { try { - auto field_mapping = createSchemaWithSerializeFn(column.type); + auto field_mapping = createSchemaWithSerializeFn(column.type, type_name_increment); serialize_fns.push_back(field_mapping.serialize); //TODO: verify name starts with A-Za-z_ record_schema.addField(column.name, field_mapping.schema); @@ -312,7 +317,7 @@ void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro: } } -static avro::Codec getCodec(const std::string& codec_name) +static avro::Codec getCodec(const std::string & codec_name) { if (codec_name == "") { diff --git a/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h index efe63c1a72f..4d404337d74 100644 --- a/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h +++ b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -32,7 +32,9 @@ private: avro::Schema schema; SerializeFn serialize; }; - static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type); + + /// Type names for different complex types (e.g. enums, fixed strings) must be unique. We use simple incremental number to give them different names. + static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment); std::vector serialize_fns; avro::ValidSchema schema;