Support Iceberg metadata files for metastore tables

This commit is contained in:
avogar 2023-11-15 17:45:07 +00:00
parent 99a69d98de
commit 28522d56b7
2 changed files with 47 additions and 2 deletions

View File

@ -304,7 +304,9 @@ MutableColumns parseAvro(
/** /**
* Each version of table metadata is stored in a `metadata` directory and * Each version of table metadata is stored in a `metadata` directory and
* has format: v<V>.metadata.json, where V - metadata version. * has one of 2 formats:
* 1) v<V>.metadata.json, where V - metadata version.
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
*/ */
std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuration & configuration) std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuration & configuration)
{ {
@ -322,7 +324,14 @@ std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuratio
for (const auto & path : metadata_files) for (const auto & path : metadata_files)
{ {
String file_name(path.begin() + path.find_last_of('/') + 1, path.end()); String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.')); String version_str;
/// v<V>.metadata.json
if (file_name.starts_with('v'))
version_str = String(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
/// <V>-<random-uuid>.metadata.json
else
version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-'));
if (!std::all_of(version_str.begin(), version_str.end(), isdigit)) if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path); metadata_files_with_versions.emplace_back(std::stoi(version_str), path);

View File

@ -9,6 +9,8 @@ import json
import pytest import pytest
import time import time
import glob import glob
import uuid
import os
from pyspark.sql.types import ( from pyspark.sql.types import (
StructType, StructType,
@ -515,3 +517,37 @@ def test_metadata_file_selection(started_cluster, format_version):
create_iceberg_table(instance, TABLE_NAME) create_iceberg_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_metadata_file_format_with_uuid(started_cluster, format_version):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata_selection_" + format_version
spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
)
for i in range(50):
spark.sql(
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)"
)
print(os.listdir(f"/iceberg_data/default/{TABLE_NAME}/metadata/"))
for i in range(50):
os.rename(
f"/iceberg_data/default/{TABLE_NAME}/metadata/v{i + 1}.metadata.json",
f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{uuid.uuid4()}.metadata.json",
)
print(os.listdir(f"/iceberg_data/default/{TABLE_NAME}/metadata/"))
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500