diff --git a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp index 2b8b7082515..40a81b59c36 100644 --- a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp @@ -304,7 +304,9 @@ MutableColumns parseAvro( /** * Each version of table metadata is stored in a `metadata` directory and - * has format: v.metadata.json, where V - metadata version. + * has one of 2 formats: + * 1) v.metadata.json, where V - metadata version. + * 2) -.metadata.json, where V - metadata version */ std::pair getMetadataFileAndVersion(const StorageS3::Configuration & configuration) { @@ -322,7 +324,14 @@ std::pair getMetadataFileAndVersion(const StorageS3::Configuratio for (const auto & path : metadata_files) { String file_name(path.begin() + path.find_last_of('/') + 1, path.end()); - String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.')); + String version_str; + /// v.metadata.json + if (file_name.starts_with('v')) + version_str = String(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.')); + /// -.metadata.json + else + version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-')); + if (!std::all_of(version_str.begin(), version_str.end(), isdigit)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); metadata_files_with_versions.emplace_back(std::stoi(version_str), path); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 11198a7175b..f6bea26cb15 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -9,6 +9,8 @@ import json import pytest import time import glob +import uuid +import os from pyspark.sql.types import ( StructType, @@ -515,3 +517,37 @@ def test_metadata_file_selection(started_cluster, format_version): create_iceberg_table(instance, TABLE_NAME) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 + + +@pytest.mark.parametrize("format_version", ["1", "2"]) +def test_metadata_file_format_with_uuid(started_cluster, format_version): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + minio_client = started_cluster.minio_client + bucket = started_cluster.minio_bucket + TABLE_NAME = "test_metadata_selection_" + format_version + + spark.sql( + f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')" + ) + + for i in range(50): + spark.sql( + f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)" + ) + + print(os.listdir(f"/iceberg_data/default/{TABLE_NAME}/metadata/")) + for i in range(50): + os.rename( + f"/iceberg_data/default/{TABLE_NAME}/metadata/v{i + 1}.metadata.json", + f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{uuid.uuid4()}.metadata.json", + ) + print(os.listdir(f"/iceberg_data/default/{TABLE_NAME}/metadata/")) + + files = upload_directory( + minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + ) + + create_iceberg_table(instance, TABLE_NAME) + + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500