Implement full Avro Union support for Avro Format SerDe

This commit is contained in:
Jiří Kozlovský 2024-09-18 00:06:59 +02:00
parent 72f6af4fa1
commit d9689ea43f
No known key found for this signature in database
GPG Key ID: E782145AFFD36D78
8 changed files with 247 additions and 55 deletions

View File

@ -2108,35 +2108,40 @@ ClickHouse Avro format supports reading and writing [Avro data files](https://av
The table below shows supported data types and how they match ClickHouse [data types](/docs/en/sql-reference/data-types/index.md) in `INSERT` and `SELECT` queries.
| Avro data type `INSERT` | ClickHouse data type | Avro data type `SELECT` |
|---------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|-------------------------------|
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/docs/en/sql-reference/data-types/int-uint.md), [UInt(8\16\32)](/docs/en/sql-reference/data-types/int-uint.md) | `int` |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/docs/en/sql-reference/data-types/int-uint.md), [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `long` |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/docs/en/sql-reference/data-types/float.md) | `float` |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/docs/en/sql-reference/data-types/float.md) | `double` |
| `bytes`, `string`, `fixed`, `enum` | [String](/docs/en/sql-reference/data-types/string.md) | `bytes` or `string` \* |
| `bytes`, `string`, `fixed` | [FixedString(N)](/docs/en/sql-reference/data-types/fixedstring.md) | `fixed(N)` |
| `enum` | [Enum(8\16)](/docs/en/sql-reference/data-types/enum.md) | `enum` |
| `array(T)` | [Array(T)](/docs/en/sql-reference/data-types/array.md) | `array(T)` |
| `map(V, K)` | [Map(V, K)](/docs/en/sql-reference/data-types/map.md) | `map(string, K)` |
| `union(null, T)`, `union(T, null)` | [Nullable(T)](/docs/en/sql-reference/data-types/date.md) | `union(null, T)` |
| `null` | [Nullable(Nothing)](/docs/en/sql-reference/data-types/special-data-types/nothing.md) | `null` |
| `int (date)` \** | [Date](/docs/en/sql-reference/data-types/date.md), [Date32](docs/en/sql-reference/data-types/date32.md) | `int (date)` \** |
| `long (timestamp-millis)` \** | [DateTime64(3)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-millis)` \** |
| `long (timestamp-micros)` \** | [DateTime64(6)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-micros)` \** |
| `bytes (decimal)` \** | [DateTime64(N)](/docs/en/sql-reference/data-types/datetime.md) | `bytes (decimal)` \** |
| `int` | [IPv4](/docs/en/sql-reference/data-types/ipv4.md) | `int` |
| `fixed(16)` | [IPv6](/docs/en/sql-reference/data-types/ipv6.md) | `fixed(16)` |
| `bytes (decimal)` \** | [Decimal(P, S)](/docs/en/sql-reference/data-types/decimal.md) | `bytes (decimal)` \** |
| `string (uuid)` \** | [UUID](/docs/en/sql-reference/data-types/uuid.md) | `string (uuid)` \** |
| `fixed(16)` | [Int128/UInt128](/docs/en/sql-reference/data-types/int-uint.md) | `fixed(16)` |
| `fixed(32)` | [Int256/UInt256](/docs/en/sql-reference/data-types/int-uint.md) | `fixed(32)` |
| `record` | [Tuple](/docs/en/sql-reference/data-types/tuple.md) | `record` |
| Avro data type `INSERT` | ClickHouse data type | Avro data type `SELECT` |
|---------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/docs/en/sql-reference/data-types/int-uint.md), [UInt(8\16\32)](/docs/en/sql-reference/data-types/int-uint.md) | `int` |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/docs/en/sql-reference/data-types/int-uint.md), [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `long` |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/docs/en/sql-reference/data-types/float.md) | `float` |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/docs/en/sql-reference/data-types/float.md) | `double` |
| `bytes`, `string`, `fixed`, `enum` | [String](/docs/en/sql-reference/data-types/string.md) | `bytes` or `string` \* |
| `bytes`, `string`, `fixed` | [FixedString(N)](/docs/en/sql-reference/data-types/fixedstring.md) | `fixed(N)` |
| `enum` | [Enum(8\16)](/docs/en/sql-reference/data-types/enum.md) | `enum` |
| `array(T)` | [Array(T)](/docs/en/sql-reference/data-types/array.md) | `array(T)` |
| `map(V, K)` | [Map(V, K)](/docs/en/sql-reference/data-types/map.md) | `map(string, K)` |
| `union(null, T)`, `union(T, null)` | [Nullable(T)](/docs/en/sql-reference/data-types/date.md) | `union(null, T)` |
| `union(T1, T2, …)` \** | [Variant(T1, T2, …)](/docs/en/sql-reference/data-types/variant.md) | `union(T1, T2, …)` \** |
| `null` | [Nullable(Nothing)](/docs/en/sql-reference/data-types/special-data-types/nothing.md) | `null` |
| `int (date)` \**\* | [Date](/docs/en/sql-reference/data-types/date.md), [Date32](docs/en/sql-reference/data-types/date32.md) | `int (date)` \**\* |
| `long (timestamp-millis)` \**\* | [DateTime64(3)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-millis)` \**\* |
| `long (timestamp-micros)` \**\* | [DateTime64(6)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-micros)` \**\* |
| `bytes (decimal)` \**\* | [DateTime64(N)](/docs/en/sql-reference/data-types/datetime.md) | `bytes (decimal)` \**\* |
| `int` | [IPv4](/docs/en/sql-reference/data-types/ipv4.md) | `int` |
| `fixed(16)` | [IPv6](/docs/en/sql-reference/data-types/ipv6.md) | `fixed(16)` |
| `bytes (decimal)` \**\* | [Decimal(P, S)](/docs/en/sql-reference/data-types/decimal.md) | `bytes (decimal)` \**\* |
| `string (uuid)` \**\* | [UUID](/docs/en/sql-reference/data-types/uuid.md) | `string (uuid)` \**\* |
| `fixed(16)` | [Int128/UInt128](/docs/en/sql-reference/data-types/int-uint.md) | `fixed(16)` |
| `fixed(32)` | [Int256/UInt256](/docs/en/sql-reference/data-types/int-uint.md) | `fixed(32)` |
| `record` | [Tuple](/docs/en/sql-reference/data-types/tuple.md) | `record` |
\* `bytes` is default, controlled by [output_format_avro_string_column_pattern](/docs/en/operations/settings/settings-formats.md/#output_format_avro_string_column_pattern)
\** [Avro logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types)
\** [Variant type](/docs/en/sql-reference/data-types/variant) implicitly accepts `null` as a field value, so for example the Avro `union(T1, T2, null)` will be converted to `Variant(T1, T2)`.
As a result, when producing Avro from ClickHouse, we have to always include the `null` type to the Avro `union` type set as we don't know if any value is actually `null` during the schema inference.
\**\* [Avro logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types)
Unsupported Avro logical data types: `time-millis`, `time-micros`, `duration`

View File

@ -1,5 +1,4 @@
#include "AvroRowInputFormat.h"
#include "DataTypes/DataTypeLowCardinality.h"
#if USE_AVRO
#include <numeric>
@ -21,20 +20,20 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFixedString.h>
#include "DataTypes/DataTypeLowCardinality.h"
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include "DataTypes/DataTypeVariant.h"
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnTuple.h>
@ -44,12 +43,10 @@
#include <DataFile.hh>
#include <Decoder.hh>
#include <Node.hh>
#include <NodeConcepts.hh>
#include <NodeImpl.hh>
#include <Types.hh>
#include <ValidSchema.hh>
#include <Poco/Buffer.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/Net/HTTPBasicCredentials.h>
@ -388,7 +385,6 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro
return true;
};
}
/// FIXME Support UNION has more than two datatypes.
else if (
root_node->leaves() == 2
&& (root_node->leafAt(0)->type() == avro::AVRO_NULL || root_node->leafAt(1)->type() == avro::AVRO_NULL))
@ -438,6 +434,64 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro
target_type->getName());
}
}
if (target.isVariant())
{
const auto & variant_type = assert_cast<const DataTypeVariant &>(*target_type);
const auto & nested_types = variant_type.getVariants();
using AvroUnionIndex = size_t;
std::map<AvroUnionIndex, ColumnVariant::Discriminator> union_index_to_global_discriminator;
std::vector<DeserializeFn> nested_deserializers;
nested_deserializers.reserve(root_node->leaves());
bool union_has_null = false;
for (size_t i = 0; i != root_node->leaves(); ++i)
{
const auto & avro_node = root_node->leafAt(static_cast<int>(i));
if (avro_node->type() == avro::AVRO_NULL)
{
union_has_null = true;
nested_deserializers.emplace_back();
union_index_to_global_discriminator.insert_or_assign(i, ColumnVariant::NULL_DISCRIMINATOR);
continue;
}
const auto variant = AvroSchemaReader::avroNodeToDataType(avro_node);
nested_deserializers.emplace_back(createDeserializeFn(avro_node, variant));
auto corresponding_discriminator = variant_type.tryGetVariantDiscriminator(variant->getName());
if (!corresponding_discriminator)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Destination {} and Avro Union are not compatible. The {} type is missing from the destination Variant types. If this is an issue, then let the Input Format infer the schema from the Avro message instead of providing custom Variant type.", variant_type.getName(), variant->getName());
union_index_to_global_discriminator.insert_or_assign(i, std::move(corresponding_discriminator.value()));
}
if (root_node->leaves() != nested_types.size() + (union_has_null ? 1 : 0))
throw Exception(ErrorCodes::INCORRECT_DATA, "The number of (non-null) union types in Avro record ({}) doesn't match the number of types in destination Variant type ({}).", root_node->leaves() - (union_has_null ? 1 : 0), nested_types.size());
return [union_has_null,
deserializers = std::move(nested_deserializers),
discriminators_map = std::move(union_index_to_global_discriminator)](IColumn & column, avro::Decoder & decoder)
{
auto & column_variant = assert_cast<ColumnVariant &>(column);
const AvroUnionIndex union_index = decoder.decodeUnionIndex();
const auto global_discriminator = discriminators_map.at(union_index);
if (union_has_null && global_discriminator == ColumnVariant::NULL_DISCRIMINATOR)
{
column_variant.insertDefault();
return true;
}
const auto local_discriminator = column_variant.localDiscriminatorByGlobal(global_discriminator);
auto & variant = column_variant.getVariantByLocalDiscriminator(local_discriminator);
deserializers[union_index](variant, decoder);
column_variant.getLocalDiscriminators().push_back(local_discriminator);
column_variant.getOffsets().push_back(variant.size() - 1);
return true;
};
}
break;
}
case avro::AVRO_NULL:
@ -1258,11 +1312,15 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
case avro::Type::AVRO_NULL:
return std::make_shared<DataTypeNothing>();
case avro::Type::AVRO_UNION:
{
// Treat union[T] as just T
if (node->leaves() == 1)
{
return avroNodeToDataType(node->leafAt(0));
}
else if (
// Treat union[T, NULL] and union[NULL, T] as Nullable(T)
if (
node->leaves() == 2
&& (node->leafAt(0)->type() == avro::Type::AVRO_NULL || node->leafAt(1)->type() == avro::Type::AVRO_NULL))
{
@ -1270,8 +1328,23 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
auto nested_type = avroNodeToDataType(node->leafAt(nested_leaf_index));
return nested_type->canBeInsideNullable() ? makeNullable(nested_type) : nested_type;
}
/// FIXME Support UNION has more than two datatypes.
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Avro type UNION is not supported for inserting.");
// Treat union[T1, T2, …] as Variant(T1, T2)
const int avro_union_size = static_cast<int>(node->leaves());
DataTypes nested_types;
nested_types.reserve(avro_union_size);
for (int i = 0; i != avro_union_size; ++i)
{
// We skip the null union type in Variant, since it is encoded using the null discriminator (implicitly all Variants can "contain null").
if (node->leafAt(i)->type() == avro::Type::AVRO_NULL) continue;
const auto & avro_node = node->leafAt(i);
nested_types.push_back(avroNodeToDataType(avro_node));
}
return std::make_shared<DataTypeVariant>(nested_types);
}
case avro::Type::AVRO_SYMBOLIC:
return avroNodeToDataType(avro::resolveSymbol(node));
case avro::Type::AVRO_RECORD:

View File

@ -16,6 +16,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeVariant.h>
#include <DataTypes/DataTypeMap.h>
#include <Columns/ColumnArray.h>
@ -126,7 +127,6 @@ AvroSerializer::SchemaWithSerializeFn createBigIntegerSchemaWithSerializeFn(cons
}
}
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(const DataTypePtr & data_type, size_t & type_name_increment, const String & column_name)
{
++type_name_increment;
@ -368,26 +368,23 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
{
return nested_mapping;
}
else
avro::UnionSchema union_schema;
union_schema.addType(avro::NullSchema());
union_schema.addType(nested_mapping.schema);
return {union_schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
avro::UnionSchema union_schema;
union_schema.addType(avro::NullSchema());
union_schema.addType(nested_mapping.schema);
return {union_schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (!col.isNullAt(row_num))
{
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (!col.isNullAt(row_num))
{
encoder.encodeUnionIndex(1);
nested_mapping.serialize(col.getNestedColumn(), row_num, encoder);
}
else
{
encoder.encodeUnionIndex(0);
encoder.encodeNull();
}
}};
}
encoder.encodeUnionIndex(1);
nested_mapping.serialize(col.getNestedColumn(), row_num, encoder);
}
else
{
encoder.encodeUnionIndex(0);
encoder.encodeNull();
}
}};
}
case TypeIndex::LowCardinality:
{
@ -401,6 +398,45 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
}
case TypeIndex::Nothing:
return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }};
case TypeIndex::Variant:
{
const auto & variant_type = assert_cast<const DataTypeVariant &>(*data_type);
avro::UnionSchema union_schema;
const auto & nested_types = variant_type.getVariants();
std::vector<SerializeFn> nested_serializers;
nested_serializers.reserve(nested_types.size());
for (const auto & nested_type : nested_types)
{
const auto [schema, serialize] = createSchemaWithSerializeFn(nested_type, type_name_increment, column_name);
union_schema.addType(schema);
nested_serializers.push_back(serialize);
}
// Since Variants have no schema-guaranteed nullability, we need to always include the null as one of the options in Avro Union.
// This is because Variant is considered Null in case it doesn't have any of the variants defined.
const auto nullUnionIndex = nested_types.size();
union_schema.addType(avro::NullSchema());
return {static_cast<avro::Schema>(union_schema), [serializers = std::move(nested_serializers), nullUnionIndex](const IColumn & column, const size_t row_num, avro::Encoder & encoder)
{
const auto & col = assert_cast<const ColumnVariant &>(column);
const auto global_discriminator = col.globalDiscriminatorAt(row_num);
if (global_discriminator == ColumnVariant::NULL_DISCRIMINATOR)
{
encoder.encodeUnionIndex(nullUnionIndex);
encoder.encodeNull();
}
else
{
encoder.encodeUnionIndex(global_discriminator);
serializers[global_discriminator](col.getVariantByGlobalDiscriminator(global_discriminator), row_num, encoder);
}
}};
}
case TypeIndex::Tuple:
{
const auto & tuple_type = assert_cast<const DataTypeTuple &>(*data_type);

View File

@ -1,12 +1,10 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <unordered_map>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <DataFile.hh>

View File

@ -0,0 +1,8 @@
name String
union Variant(Float64, String)
union_with_null Variant(Float64, String)
union_in_array Array(Variant(Float64, String))
union_in_map Map(String, Variant(Float64, String))
record1 42 \N [42,'test1'] {'key1':42,'key2':'value1'}
record2 variant_string -3.1415926535 ['test2',23.5] {'key3':'value2','key4':15.7}
record3 15.5 not null [11,'test3'] {'key5':100,'key6':'value3'}

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_DIR=$CUR_DIR/data_avro
$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/union_in_complex_types.avro')"
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/union_in_complex_types.avro')"

View File

@ -0,0 +1,59 @@
import avro.schema
import avro.datafile
import avro.io
import io
# Define the schema
schema = avro.schema.parse(
"""
{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "name", "type": "string"},
{"name": "union", "type": ["double", "string"]},
{"name": "union_with_null", "type": ["double", "string", "null"]},
{"name": "union_in_array", "type": {
"type": "array",
"items": ["double", "string"]
}},
{"name": "union_in_map", "type": {
"type": "map",
"values": ["double", "string"]
}}
]
}
"""
)
# Create sample data
records = [
{
"name": "record1",
"union": 42.0,
"union_with_null": None,
"union_in_array": [42.0, "test1"],
"union_in_map": {"key1": 42.0, "key2": "value1"},
},
{
"name": "record2",
"union": "variant_string",
"union_with_null": -3.1415926535,
"union_in_array": ["test2", 23.5],
"union_in_map": {"key3": "value2", "key4": 15.7},
},
{
"name": "record3",
"union": 15.5,
"union_with_null": "not null",
"union_in_array": [11.0, "test3"],
"union_in_map": {"key5": 100.0, "key6": "value3"},
},
]
# Write the data to an Avro file
with open("union_in_complex_types.avro", "wb") as avro_file:
writer = avro.datafile.DataFileWriter(avro_file, avro.io.DatumWriter(), schema)
for record in records:
writer.append(record)
writer.close()