Merge pull request #47434 from Avogar/avro-improvements

Support Decimals and Date32 in Avro format
This commit is contained in:
Alexey Milovidov 2023-03-18 22:16:34 +03:00 committed by GitHub
commit 26c17b61fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 176 additions and 33 deletions

View File

@ -1808,23 +1808,26 @@ ClickHouse Avro format supports reading and writing [Avro data files](https://av
The table below shows supported data types and how they match ClickHouse [data types](/docs/en/sql-reference/data-types/index.md) in `INSERT` and `SELECT` queries.
| Avro data type `INSERT` | ClickHouse data type | Avro data type `SELECT` |
|---------------------------------------------|-----------------------------------------------------------------------------------------------------------------|-------------------------------------------------|
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\ | 16\ |32)](/docs/en/sql-reference/data-types/int-uint.md), [UInt(8\|16\|32)](/docs/en/sql-reference/data-types/int-uint.md) | `int` |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/docs/en/sql-reference/data-types/int-uint.md), [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `long` |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/docs/en/sql-reference/data-types/float.md) | `float` |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/docs/en/sql-reference/data-types/float.md) | `double` |
| `bytes`, `string`, `fixed`, `enum` | [String](/docs/en/sql-reference/data-types/string.md) | `bytes` or `string` \* |
| `bytes`, `string`, `fixed` | [FixedString(N)](/docs/en/sql-reference/data-types/fixedstring.md) | `fixed(N)` |
| `enum` | [Enum(8\ | 16)](/docs/en/sql-reference/data-types/enum.md) | `enum` |
| `array(T)` | [Array(T)](/docs/en/sql-reference/data-types/array.md) | `array(T)` |
| `union(null, T)`, `union(T, null)` | [Nullable(T)](/docs/en/sql-reference/data-types/date.md) | `union(null, T)` |
| `null` | [Nullable(Nothing)](/docs/en/sql-reference/data-types/special-data-types/nothing.md) | `null` |
| `int (date)` \** | [Date](/docs/en/sql-reference/data-types/date.md) | `int (date)` \** |
| `long (timestamp-millis)` \** | [DateTime64(3)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-millis)` \* |
| `long (timestamp-micros)` \** | [DateTime64(6)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-micros)` \* |
| `int` | [IPv4](/docs/en/sql-reference/data-types/domains/ipv4.md) | `int` |
| `fixed(16)` | [IPv6](/docs/en/sql-reference/data-types/domains/ipv6.md) | `fixed(16)` |
| Avro data type `INSERT` | ClickHouse data type | Avro data type `SELECT` |
|---------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|-------------------------------|
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/docs/en/sql-reference/data-types/int-uint.md), [UInt(8\16\32)](/docs/en/sql-reference/data-types/int-uint.md) | `int` |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/docs/en/sql-reference/data-types/int-uint.md), [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `long` |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/docs/en/sql-reference/data-types/float.md) | `float` |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/docs/en/sql-reference/data-types/float.md) | `double` |
| `bytes`, `string`, `fixed`, `enum` | [String](/docs/en/sql-reference/data-types/string.md) | `bytes` or `string` \* |
| `bytes`, `string`, `fixed` | [FixedString(N)](/docs/en/sql-reference/data-types/fixedstring.md) | `fixed(N)` |
| `enum` | [Enum(8\16)](/docs/en/sql-reference/data-types/enum.md) | `enum` |
| `array(T)` | [Array(T)](/docs/en/sql-reference/data-types/array.md) | `array(T)` |
| `union(null, T)`, `union(T, null)` | [Nullable(T)](/docs/en/sql-reference/data-types/date.md) | `union(null, T)` |
| `null` | [Nullable(Nothing)](/docs/en/sql-reference/data-types/special-data-types/nothing.md) | `null` |
| `int (date)` \** | [Date](/docs/en/sql-reference/data-types/date.md), [Date32](docs/en/sql-reference/data-types/date32.md) | `int (date)` \** |
| `long (timestamp-millis)` \** | [DateTime64(3)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-millis)` \** |
| `long (timestamp-micros)` \** | [DateTime64(6)](/docs/en/sql-reference/data-types/datetime.md) | `long (timestamp-micros)` \** |
| `int` | [IPv4](/docs/en/sql-reference/data-types/domains/ipv4.md) | `int` |
| `fixed(16)` | [IPv6](/docs/en/sql-reference/data-types/domains/ipv6.md) | `fixed(16)` |
| `bytes (decimal)` \** | [Decimal(P, S)](/docs/en/sql-reference/data-types/decimal.md) | `bytes (decimal)` \** |
| `string (uuid)` \** | [UUID](/docs/en/sql-reference/data-types/uuid.md) | `string (uuid)` \** |
\* `bytes` is default, controlled by [output_format_avro_string_column_pattern](/docs/en/operations/settings/settings-formats.md/#output_format_avro_string_column_pattern)
\** [Avro logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types)

View File

@ -1473,6 +1473,7 @@ In Avro format ClickHouse reads its schema from the data and converts it to Clic
|------------------------------------|--------------------------------------------------------------------------------|
| `boolean` | [Bool](../sql-reference/data-types/boolean.md) |
| `int` | [Int32](../sql-reference/data-types/int-uint.md) |
| `int (date)` \* | [Date32](../sql-reference/data-types/date32.md) |
| `long` | [Int64](../sql-reference/data-types/int-uint.md) |
| `float` | [Float32](../sql-reference/data-types/float.md) |
| `double` | [Float64](../sql-reference/data-types/float.md) |
@ -1482,6 +1483,10 @@ In Avro format ClickHouse reads its schema from the data and converts it to Clic
| `array(T)` | [Array(T)](../sql-reference/data-types/array.md) |
| `union(null, T)`, `union(T, null)` | [Nullable(T)](../sql-reference/data-types/date.md) |
| `null` | [Nullable(Nothing)](../sql-reference/data-types/special-data-types/nothing.md) |
| `string (uuid)` \* | [UUID](../sql-reference/data-types/uuid.md) |
| `binary (decimal)` \* | [Decimal(P, S)](../sql-reference/data-types/decimal.md) |
\* [Avro logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types)
Other Avro types are not supported.

View File

@ -11,11 +11,14 @@
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadBufferFromString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeNothing.h>
@ -124,6 +127,7 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
case TypeIndex::Int16:
assert_cast<ColumnInt16 &>(column).insertValue(static_cast<Int16>(value));
break;
case TypeIndex::Date32: [[fallthrough]];
case TypeIndex::Int32:
assert_cast<ColumnInt32 &>(column).insertValue(static_cast<Int32>(value));
break;
@ -153,6 +157,40 @@ static void insertNumber(IColumn & column, WhichDataType type, T value)
}
}
template <typename DecimalType>
static AvroDeserializer::DeserializeFn createDecimalDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type)
{
auto logical_type = root_node->logicalType();
const auto & decimal_type = assert_cast<const DecimalType &>(*target_type);
if (decimal_type.getScale() != static_cast<UInt32>(logical_type.scale()) || decimal_type.getPrecision() != static_cast<UInt32>(logical_type.precision()))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot insert Avro decimal with scale {} and precision {} to ClickHouse Decimal with scale {} and precision {}",
logical_type.scale(),
logical_type.precision(),
decimal_type.getScale(),
decimal_type.getPrecision());
return [tmp = std::string(), target_type](IColumn & column, avro::Decoder & decoder) mutable
{
static constexpr size_t field_type_size = sizeof(typename DecimalType::FieldType);
decoder.decodeString(tmp);
if (tmp.size() != field_type_size)
throw ParsingException(
ErrorCodes::CANNOT_PARSE_UUID,
"Cannot parse type {}, expected binary data with size {}, got {}",
target_type->getName(),
field_type_size,
tmp.size());
typename DecimalType::FieldType field;
ReadBufferFromString buf(tmp);
readBinaryBigEndian(field.value, buf);
assert_cast<typename DecimalType::ColumnType &>(column).insertValue(field);
return true;
};
}
static std::string nodeToJson(avro::NodePtr root_node)
{
std::ostringstream ss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@ -169,7 +207,7 @@ static std::string nodeName(avro::NodePtr node)
return avro::toString(node->type());
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type)
{
if (target_type->lowCardinality())
{
@ -214,6 +252,14 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
return true;
};
}
if (target.isDecimal32())
return createDecimalDeserializeFn<DataTypeDecimal32>(root_node, target_type);
if (target.isDecimal64())
return createDecimalDeserializeFn<DataTypeDecimal64>(root_node, target_type);
if (target.isDecimal128())
return createDecimalDeserializeFn<DataTypeDecimal128>(root_node, target_type);
if (target.isDecimal256())
return createDecimalDeserializeFn<DataTypeDecimal256>(root_node, target_type);
break;
case avro::AVRO_INT:
if (target_type->isValueRepresentedByNumber())
@ -526,7 +572,7 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
target_type->getName(), avro::toString(root_node->type()), nodeToJson(root_node));
}
AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(const avro::NodePtr & root_node)
{
switch (root_node->type())
{
@ -1042,19 +1088,40 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
switch (node->type())
{
case avro::Type::AVRO_INT:
{
if (node->logicalType().type() == avro::LogicalType::DATE)
return {std::make_shared<DataTypeDate32>()};
return {std::make_shared<DataTypeInt32>()};
}
case avro::Type::AVRO_LONG:
{
auto logical_type = node->logicalType();
if (logical_type.type() == avro::LogicalType::TIMESTAMP_MILLIS)
return {std::make_shared<DataTypeDateTime64>(3)};
if (logical_type.type() == avro::LogicalType::TIMESTAMP_MICROS)
return {std::make_shared<DataTypeDateTime64>(6)};
return std::make_shared<DataTypeInt64>();
}
case avro::Type::AVRO_BOOL:
return DataTypeFactory::instance().get("Bool");
case avro::Type::AVRO_FLOAT:
return std::make_shared<DataTypeFloat32>();
case avro::Type::AVRO_DOUBLE:
return std::make_shared<DataTypeFloat64>();
case avro::Type::AVRO_STRING:
return std::make_shared<DataTypeString>();
case avro::Type::AVRO_STRING: [[fallthrough]];
case avro::Type::AVRO_BYTES:
{
auto logical_type = node->logicalType();
if (logical_type.type() == avro::LogicalType::UUID)
return std::make_shared<DataTypeUUID>();
if (logical_type.type() == avro::LogicalType::DECIMAL)
return createDecimal<DataTypeDecimal>(logical_type.precision(), logical_type.scale());
return std::make_shared<DataTypeString>();
}
case avro::Type::AVRO_ENUM:
{
if (node->names() < 128)

View File

@ -51,13 +51,13 @@ public:
AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;
private:
using DeserializeFn = std::function<bool(IColumn & column, avro::Decoder & decoder)>;
using DeserializeNestedFn = std::function<bool(IColumn & column, avro::Decoder & decoder)>;
private:
using SkipFn = std::function<void(avro::Decoder & decoder)>;
DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
SkipFn createSkipFn(avro::NodePtr root_node);
DeserializeFn createDeserializeFn(const avro::NodePtr & root_node, const DataTypePtr & target_type);
SkipFn createSkipFn(const avro::NodePtr & root_node);
struct Action
{
@ -86,14 +86,14 @@ private:
: type(Skip)
, skip_fn(skip_fn_) {}
Action(std::vector<size_t> nested_column_indexes_, std::vector<DeserializeFn> nested_deserializers_)
Action(const std::vector<size_t> & nested_column_indexes_, const std::vector<DeserializeFn> & nested_deserializers_)
: type(Nested)
, nested_column_indexes(nested_column_indexes_)
, nested_deserializers(nested_deserializers_) {}
static Action recordAction(std::vector<Action> field_actions) { return Action(Type::Record, field_actions); }
static Action recordAction(const std::vector<Action> & field_actions) { return Action(Type::Record, field_actions); }
static Action unionAction(std::vector<Action> branch_actions) { return Action(Type::Union, branch_actions); }
static Action unionAction(const std::vector<Action> & branch_actions) { return Action(Type::Union, branch_actions); }
void execute(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
@ -88,8 +89,30 @@ private:
WriteBuffer & out;
};
namespace
{
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment, const String & column_name)
template <typename DecimalType>
AvroSerializer::SchemaWithSerializeFn createDecimalSchemaWithSerializeFn(const DataTypePtr & data_type)
{
auto schema = avro::BytesSchema();
const auto & provided_type = assert_cast<const DecimalType &>(*data_type);
auto logical_type = avro::LogicalType(avro::LogicalType::DECIMAL);
logical_type.setScale(provided_type.getScale());
logical_type.setPrecision(provided_type.getPrecision());
schema.root()->setLogicalType(logical_type);
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const auto & col = assert_cast<const typename DecimalType::ColumnType &>(column);
WriteBufferFromOwnString buf;
writeBinaryBigEndian(col.getElement(row_num).value, buf);
encoder.encodeBytes(reinterpret_cast<const uint8_t *>(buf.str().data()), buf.str().size());
}};
}
}
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(const DataTypePtr & data_type, size_t & type_name_increment, const String & column_name)
{
++type_name_increment;
@ -167,6 +190,16 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
encoder.encodeInt(date);
}};
}
case TypeIndex::Date32:
{
auto schema = avro::IntSchema();
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::DATE));
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
Int32 date = assert_cast<const ColumnInt32 &>(column).getElement(row_num);
encoder.encodeInt(date);
}};
}
case TypeIndex::DateTime64:
{
auto schema = avro::LongSchema();
@ -185,6 +218,22 @@ AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeF
encoder.encodeLong(col.getElement(row_num));
}};
}
case TypeIndex::Decimal32:
{
return createDecimalSchemaWithSerializeFn<DataTypeDecimal32>(data_type);
}
case TypeIndex::Decimal64:
{
return createDecimalSchemaWithSerializeFn<DataTypeDecimal64>(data_type);
}
case TypeIndex::Decimal128:
{
return createDecimalSchemaWithSerializeFn<DataTypeDecimal128>(data_type);
}
case TypeIndex::Decimal256:
{
return createDecimalSchemaWithSerializeFn<DataTypeDecimal256>(data_type);
}
case TypeIndex::String:
if (traits->isStringAsString(column_name))
return {avro::StringSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)

View File

@ -27,7 +27,6 @@ public:
const avro::ValidSchema & getSchema() const { return valid_schema; }
void serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder);
private:
using SerializeFn = std::function<void(const IColumn & column, size_t row_num, avro::Encoder & encoder)>;
struct SchemaWithSerializeFn
{
@ -35,8 +34,9 @@ private:
SerializeFn serialize;
};
private:
/// Type names for different complex types (e.g. enums, fixed strings) must be unique. We use simple incremental number to give them different names.
SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment, const String & column_name);
SchemaWithSerializeFn createSchemaWithSerializeFn(const DataTypePtr & data_type, size_t & type_name_increment, const String & column_name);
std::vector<SerializeFn> serialize_fns;
avro::ValidSchema valid_schema;

View File

@ -422,9 +422,9 @@ float32 Float32
float64 Float64
0 0
1.2 0.7692307692307692
date Int32
0
1
date Date32
1970-01-01
1970-01-02
str String
fixed_string FixedString(3)
Str: 0 100

View File

@ -0,0 +1,5 @@
Date32 1942-08-16
Decimal(9, 4) 4242.4242
Decimal(18, 14) 4242.4242
Decimal(38, 34) 4242.4242
Decimal(76, 64) 4242.4242

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "select toInt32(-10000)::Date32 as d format Avro" | $CLICKHOUSE_LOCAL --input-format=Avro -q "select toTypeName(d), d from table"
$CLICKHOUSE_LOCAL -q "select 4242.4242::Decimal32(4) as d format Avro" | $CLICKHOUSE_LOCAL --input-format=Avro -q "select toTypeName(d), d from table"
$CLICKHOUSE_LOCAL -q "select 4242.4242::Decimal64(14) as d format Avro" | $CLICKHOUSE_LOCAL --input-format=Avro -q "select toTypeName(d), d from table"
$CLICKHOUSE_LOCAL -q "select 4242.4242::Decimal128(34) as d format Avro" | $CLICKHOUSE_LOCAL --input-format=Avro -q "select toTypeName(d), d from table"
$CLICKHOUSE_LOCAL -q "select 4242.4242::Decimal256(64) as d format Avro" | $CLICKHOUSE_LOCAL --input-format=Avro -q "select toTypeName(d), d from table"