Fixed error in Avro format

This commit is contained in:
Alexey Milovidov 2020-01-23 05:01:58 +03:00
parent 9f0230d4ba
commit a75ce4477b
2 changed files with 17 additions and 10 deletions

View File

@ -78,8 +78,10 @@ private:
};
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type)
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment)
{
++type_name_increment;
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
@ -169,7 +171,8 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}};
case TypeIndex::FixedString:
{
auto schema = avro::FixedSchema(data_type->getSizeOfValueInMemory(), "fixed");
auto size = data_type->getSizeOfValueInMemory();
auto schema = avro::FixedSchema(size, "fixed" + toString(size));
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const StringRef & s = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num);
@ -178,7 +181,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}
case TypeIndex::Enum8:
{
auto schema = avro::EnumSchema("enum8");
auto schema = avro::EnumSchema("enum8_" + toString(type_name_increment)); /// type names must be different for different types.
std::unordered_map<DataTypeEnum8::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum8 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
@ -194,7 +197,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}
case TypeIndex::Enum16:
{
auto schema = avro::EnumSchema("enum16");
auto schema = avro::EnumSchema("enum16" + toString(type_name_increment));
std::unordered_map<DataTypeEnum16::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum16 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
@ -211,7 +214,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
case TypeIndex::Array:
{
const auto & array_type = assert_cast<const DataTypeArray &>(*data_type);
auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType());
auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType(), type_name_increment);
auto schema = avro::ArraySchema(nested_mapping.schema);
return {schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
@ -237,7 +240,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment);
if (nested_type->getTypeId() == TypeIndex::Nothing)
{
return nested_mapping;
@ -266,7 +269,7 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
case TypeIndex::LowCardinality:
{
const auto & nested_type = removeLowCardinality(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment);
return {nested_mapping.schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const auto & col = assert_cast<const ColumnLowCardinality &>(column);
@ -285,11 +288,13 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
AvroSerializer::AvroSerializer(const ColumnsWithTypeAndName & columns)
{
avro::RecordSchema record_schema("row");
size_t type_name_increment = 0;
for (auto & column : columns)
{
try
{
auto field_mapping = createSchemaWithSerializeFn(column.type);
auto field_mapping = createSchemaWithSerializeFn(column.type, type_name_increment);
serialize_fns.push_back(field_mapping.serialize);
//TODO: verify name starts with A-Za-z_
record_schema.addField(column.name, field_mapping.schema);
@ -312,7 +317,7 @@ void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro:
}
}
static avro::Codec getCodec(const std::string& codec_name)
static avro::Codec getCodec(const std::string & codec_name)
{
if (codec_name == "")
{

View File

@ -32,7 +32,9 @@ private:
avro::Schema schema;
SerializeFn serialize;
};
static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type);
/// Type names for different complex types (e.g. enums, fixed strings) must be unique. We use simple incremental number to give them different names.
static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment);
std::vector<SerializeFn> serialize_fns;
avro::ValidSchema schema;