diff --git a/docker/test/integration/runner/requirements.txt b/docker/test/integration/runner/requirements.txt index 8a77d8abf77..7ca15ea86b0 100644 --- a/docker/test/integration/runner/requirements.txt +++ b/docker/test/integration/runner/requirements.txt @@ -111,3 +111,5 @@ wadllib==1.3.6 websocket-client==0.59.0 wheel==0.37.1 zipp==1.0.0 +deltalake==0.16.0 + diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index c896a760597..14b18809a0d 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -422,8 +422,9 @@ struct DeltaLakeMetadataImpl { auto field = fields->getObject(static_cast(i)); element_names.push_back(field->getValue("name")); - auto required = field->getValue("required"); - element_types.push_back(getFieldType(field, "type", required)); + + auto is_nullable = field->getValue("nullable"); + element_types.push_back(getFieldType(field, "type", is_nullable)); } return std::make_shared(element_types, element_names); @@ -431,16 +432,16 @@ struct DeltaLakeMetadataImpl if (type_name == "array") { - bool is_nullable = type->getValue("containsNull"); - auto element_type = getFieldType(type, "elementType", is_nullable); + bool element_nullable = type->getValue("containsNull"); + auto element_type = getFieldType(type, "elementType", element_nullable); return std::make_shared(element_type); } if (type_name == "map") { - bool is_nullable = type->getValue("containsNull"); auto key_type = getFieldType(type, "keyType", /* is_nullable */false); - auto value_type = getFieldType(type, "valueType", is_nullable); + bool value_nullable = type->getValue("valueContainsNull"); + auto value_type = getFieldType(type, "valueType", value_nullable); return std::make_shared(key_type, value_type); } diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index d3dd7cfe52a..ff2a4614e49 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -27,6 +27,9 @@ from datetime import datetime from pyspark.sql.functions import monotonically_increasing_id, row_number from pyspark.sql.window import Window from minio.deleteobjects import DeleteObject +import pyarrow as pa +import pyarrow.parquet as pq +from deltalake.writer import write_deltalake from helpers.s3_tools import ( prepare_s3_bucket, @@ -709,3 +712,96 @@ SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.mini ) == 1 ) + + +def test_complex_types(started_cluster): + node = started_cluster.instances["node1"] + minio_client = started_cluster.minio_client + bucket = started_cluster.minio_bucket + + schema = pa.schema( + [ + ("id", pa.int32()), + ("name", pa.string()), + ( + "address", + pa.struct( + [ + ("street", pa.string()), + ("city", pa.string()), + ("state", pa.string()), + ] + ), + ), + ("interests", pa.list_(pa.string())), + ( + "metadata", + pa.map_( + pa.string(), pa.string() + ), # Map with string keys and string values + ), + ] + ) + + # Create sample data + data = [ + pa.array([1, 2, 3], type=pa.int32()), + pa.array(["John Doe", "Jane Smith", "Jake Johnson"], type=pa.string()), + pa.array( + [ + {"street": "123 Elm St", "city": "Springfield", "state": "IL"}, + {"street": "456 Maple St", "city": "Shelbyville", "state": "IL"}, + {"street": "789 Oak St", "city": "Ogdenville", "state": "IL"}, + ], + type=schema.field("address").type, + ), + pa.array( + [ + pa.array(["dancing", "coding", "hiking"]), + pa.array(["dancing", "coding", "hiking"]), + pa.array(["dancing", "coding", "hiking"]), + ], + type=schema.field("interests").type, + ), + pa.array( + [ + {"key1": "value1", "key2": "value2"}, + {"key1": "value3", "key2": "value4"}, + {"key1": "value5", "key2": "value6"}, + ], + type=schema.field("metadata").type, + ), + ] + + endpoint_url = f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}" + aws_access_key_id = "minio" + aws_secret_access_key = "minio123" + table_name = randomize_table_name("test_complex_types") + + storage_options = { + "AWS_ENDPOINT_URL": endpoint_url, + "AWS_ACCESS_KEY_ID": aws_access_key_id, + "AWS_SECRET_ACCESS_KEY": aws_secret_access_key, + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + path = f"s3://root/{table_name}" + table = pa.Table.from_arrays(data, schema=schema) + + write_deltalake(path, table, storage_options=storage_options) + + assert "1\n2\n3\n" in node.query( + f"SELECT id FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')" + ) + assert ( + "('123 Elm St','Springfield','IL')\n('456 Maple St','Shelbyville','IL')\n('789 Oak St','Ogdenville','IL')" + in node.query( + f"SELECT address FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')" + ) + ) + assert ( + "{'key1':'value1','key2':'value2'}\n{'key1':'value3','key2':'value4'}\n{'key1':'value5','key2':'value6'}" + in node.query( + f"SELECT metadata FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')" + ) + )