Merge pull request #68836 from ClickHouse/fix-delta-lake-bug-in-schema-parsing

Fix complex types metadata parsing in DeltaLake
This commit is contained in:
Kseniia Sumarokova 2024-08-26 17:44:22 +00:00 committed by GitHub
commit 8b2db6276c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 105 additions and 6 deletions

View File

@ -112,3 +112,5 @@ wadllib==1.3.6
websocket-client==0.59.0 websocket-client==0.59.0
wheel==0.37.1 wheel==0.37.1
zipp==1.0.0 zipp==1.0.0
deltalake==0.16.0

View File

@ -425,8 +425,9 @@ struct DeltaLakeMetadataImpl
{ {
auto field = fields->getObject(static_cast<Int32>(i)); auto field = fields->getObject(static_cast<Int32>(i));
element_names.push_back(field->getValue<String>("name")); element_names.push_back(field->getValue<String>("name"));
auto required = field->getValue<bool>("required");
element_types.push_back(getFieldType(field, "type", required)); auto is_nullable = field->getValue<bool>("nullable");
element_types.push_back(getFieldType(field, "type", is_nullable));
} }
return std::make_shared<DataTypeTuple>(element_types, element_names); return std::make_shared<DataTypeTuple>(element_types, element_names);
@ -434,16 +435,16 @@ struct DeltaLakeMetadataImpl
if (type_name == "array") if (type_name == "array")
{ {
bool is_nullable = type->getValue<bool>("containsNull"); bool element_nullable = type->getValue<bool>("containsNull");
auto element_type = getFieldType(type, "elementType", is_nullable); auto element_type = getFieldType(type, "elementType", element_nullable);
return std::make_shared<DataTypeArray>(element_type); return std::make_shared<DataTypeArray>(element_type);
} }
if (type_name == "map") if (type_name == "map")
{ {
bool is_nullable = type->getValue<bool>("containsNull");
auto key_type = getFieldType(type, "keyType", /* is_nullable */false); auto key_type = getFieldType(type, "keyType", /* is_nullable */false);
auto value_type = getFieldType(type, "valueType", is_nullable); bool value_nullable = type->getValue<bool>("valueContainsNull");
auto value_type = getFieldType(type, "valueType", value_nullable);
return std::make_shared<DataTypeMap>(key_type, value_type); return std::make_shared<DataTypeMap>(key_type, value_type);
} }

View File

@ -29,6 +29,9 @@ from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window from pyspark.sql.window import Window
from minio.deleteobjects import DeleteObject from minio.deleteobjects import DeleteObject
import pyarrow as pa
import pyarrow.parquet as pq
from deltalake.writer import write_deltalake
from helpers.s3_tools import ( from helpers.s3_tools import (
prepare_s3_bucket, prepare_s3_bucket,
@ -728,3 +731,96 @@ SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.mini
) )
== 1 == 1
) )
def test_complex_types(started_cluster):
node = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
schema = pa.schema(
[
("id", pa.int32()),
("name", pa.string()),
(
"address",
pa.struct(
[
("street", pa.string()),
("city", pa.string()),
("state", pa.string()),
]
),
),
("interests", pa.list_(pa.string())),
(
"metadata",
pa.map_(
pa.string(), pa.string()
), # Map with string keys and string values
),
]
)
# Create sample data
data = [
pa.array([1, 2, 3], type=pa.int32()),
pa.array(["John Doe", "Jane Smith", "Jake Johnson"], type=pa.string()),
pa.array(
[
{"street": "123 Elm St", "city": "Springfield", "state": "IL"},
{"street": "456 Maple St", "city": "Shelbyville", "state": "IL"},
{"street": "789 Oak St", "city": "Ogdenville", "state": "IL"},
],
type=schema.field("address").type,
),
pa.array(
[
pa.array(["dancing", "coding", "hiking"]),
pa.array(["dancing", "coding", "hiking"]),
pa.array(["dancing", "coding", "hiking"]),
],
type=schema.field("interests").type,
),
pa.array(
[
{"key1": "value1", "key2": "value2"},
{"key1": "value3", "key2": "value4"},
{"key1": "value5", "key2": "value6"},
],
type=schema.field("metadata").type,
),
]
endpoint_url = f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}"
aws_access_key_id = "minio"
aws_secret_access_key = "minio123"
table_name = randomize_table_name("test_complex_types")
storage_options = {
"AWS_ENDPOINT_URL": endpoint_url,
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
path = f"s3://root/{table_name}"
table = pa.Table.from_arrays(data, schema=schema)
write_deltalake(path, table, storage_options=storage_options)
assert "1\n2\n3\n" in node.query(
f"SELECT id FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
assert (
"('123 Elm St','Springfield','IL')\n('456 Maple St','Shelbyville','IL')\n('789 Oak St','Ogdenville','IL')"
in node.query(
f"SELECT address FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
)
assert (
"{'key1':'value1','key2':'value2'}\n{'key1':'value3','key2':'value4'}\n{'key1':'value5','key2':'value6'}"
in node.query(
f"SELECT metadata FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
)