mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Better
This commit is contained in:
parent
9b3d0ec86d
commit
18a9a670c3
@ -6,6 +6,7 @@
|
||||
#if USE_AWS_S3
|
||||
#include <Storages/DataLakes/S3MetadataReader.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <ranges>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -71,19 +72,19 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
* \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}}
|
||||
* "
|
||||
*/
|
||||
void handleJSON(const JSON & json, std::set<String> & result)
|
||||
void handleJSON(const JSON & json, const String & prefix, std::set<String> & result)
|
||||
{
|
||||
if (json.has("add"))
|
||||
{
|
||||
const auto path = json["add"]["path"].getString();
|
||||
const auto [_, inserted] = result.insert(path);
|
||||
const auto [_, inserted] = result.insert(fs::path(prefix) / path);
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
|
||||
}
|
||||
else if (json.has("remove"))
|
||||
{
|
||||
const auto path = json["remove"]["path"].getString();
|
||||
const bool erase = result.erase(path);
|
||||
const bool erase = result.erase(fs::path(prefix) / path);
|
||||
if (!erase)
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path);
|
||||
}
|
||||
@ -120,7 +121,7 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
continue;
|
||||
|
||||
const JSON json(json_str);
|
||||
handleJSON(json, result);
|
||||
handleJSON(json, configuration.getPath(), result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@ -136,8 +137,8 @@ DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataPar
|
||||
template <typename Configuration, typename MetadataReadHelper>
|
||||
Strings DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
auto data_files = impl->processMetadataFiles(configuration, context);
|
||||
return Strings(data_files.begin(), data_files.end());
|
||||
auto result = impl->processMetadataFiles(configuration, context);
|
||||
return Strings(result.begin(), result.end());
|
||||
}
|
||||
|
||||
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser();
|
||||
|
@ -72,7 +72,7 @@ private:
|
||||
LOG_TRACE(
|
||||
&Poco::Logger::get("DataLake"),
|
||||
"New configuration path: {}, keys: {}",
|
||||
configuration.getPath(), fmt::join(keys, ", "));
|
||||
configuration.getPath(), fmt::join(configuration.keys, ", "));
|
||||
|
||||
configuration.connect(local_context);
|
||||
return configuration;
|
||||
@ -80,10 +80,7 @@ private:
|
||||
|
||||
static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context)
|
||||
{
|
||||
auto files = MetadataParser().getFiles(configuration, local_context);
|
||||
for (auto & file : files)
|
||||
file = std::filesystem::path(configuration.getPath()) / file;
|
||||
return files;
|
||||
return MetadataParser().getFiles(configuration, local_context);
|
||||
}
|
||||
|
||||
void updateConfigurationImpl(ContextPtr local_context)
|
||||
|
@ -79,7 +79,7 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
* {
|
||||
* "format-version" : 1,
|
||||
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
|
||||
* "location" : "/iceberg_data/default/test_single_iceberg_file",
|
||||
* "location" : "/iceberg_data/db/table_name",
|
||||
* "last-updated-ms" : 1680206743150,
|
||||
* "last-column-id" : 2,
|
||||
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
|
||||
@ -98,7 +98,7 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
|
||||
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
|
||||
* },
|
||||
* "manifest-list" : "/iceberg_data/default/test_single_iceberg_file/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
|
||||
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
|
||||
* "schema-id" : 0
|
||||
* } ],
|
||||
* "statistics" : [ ],
|
||||
@ -148,7 +148,7 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
*
|
||||
* `manifest list` has the following contents:
|
||||
* ┌─manifest_path────────────────────────────────────────────────────────────────────────────────────────┬─manifest_length─┬─partition_spec_id─┬───added_snapshot_id─┬─added_data_files_count─┬─existing_data_files_count─┬─deleted_data_files_count─┬─partitions─┬─added_rows_count─┬─existing_rows_count─┬─deleted_rows_count─┐
|
||||
* │ /iceberg_data/default/test_single_iceberg_file/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro │ 5813 │ 0 │ 2819310504515118887 │ 1 │ 0 │ 0 │ [] │ 100 │ 0 │ 0 │
|
||||
* │ /iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro │ 5813 │ 0 │ 2819310504515118887 │ 1 │ 0 │ 0 │ [] │ 100 │ 0 │ 0 │
|
||||
* └──────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┴───────────────────┴─────────────────────┴────────────────────────┴───────────────────────────┴──────────────────────────┴────────────┴──────────────────┴─────────────────────┴────────────────────┘
|
||||
*/
|
||||
void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context)
|
||||
@ -180,17 +180,23 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
}
|
||||
|
||||
/**
|
||||
* Manifest file has the following format: '/iceberg_data/default/test_single_iceberg_file/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
|
||||
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
|
||||
*
|
||||
* `manifest file` is different in format version V1 and V2 and has the following contents:
|
||||
* Format version V1:
|
||||
* ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 2819310504515118887 │ ('/iceberg_data/default/test_single_iceberg_file/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │
|
||||
* │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │
|
||||
* └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
* Format version V2:
|
||||
* ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/default/test_single_iceberg_file/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │
|
||||
* │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │
|
||||
* └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
* In case of partitioned data we'll have extra directory partition=value:
|
||||
* ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │
|
||||
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │
|
||||
* └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
*/
|
||||
Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
@ -236,12 +242,8 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
}
|
||||
|
||||
const auto * str_col = assert_cast<const ColumnString *>(col_str.get());
|
||||
size_t col_size = str_col->size();
|
||||
for (size_t i = 0; i < col_size; ++i)
|
||||
{
|
||||
std::filesystem::path path(str_col->getDataAt(i).toView());
|
||||
keys.emplace_back(path.parent_path().filename() / path.filename()); /// partition/file name
|
||||
}
|
||||
for (size_t i = 0; i < str_col->size(); ++i)
|
||||
keys.emplace_back(str_col->getDataAt(i).toView());
|
||||
}
|
||||
|
||||
return keys;
|
||||
|
@ -74,10 +74,15 @@ def write_delta_from_file(spark, path, result_path, mode="overwrite"):
|
||||
).option("delta.columnMapping.mode", "name").save(result_path)
|
||||
|
||||
|
||||
def write_delta_from_df(spark, df, result_path, mode="overwrite"):
|
||||
df.write.mode(mode).option("compression", "none").format("delta").option(
|
||||
"delta.columnMapping.mode", "name"
|
||||
).save(result_path)
|
||||
def write_delta_from_df(spark, df, result_path, mode="overwrite", partition_by=None):
|
||||
if partition_by is None:
|
||||
df.write.mode(mode).option("compression", "none").format("delta").option(
|
||||
"delta.columnMapping.mode", "name"
|
||||
).save(result_path)
|
||||
else:
|
||||
df.write.mode(mode).option("compression", "none").format("delta").option(
|
||||
"delta.columnMapping.mode", "name"
|
||||
).partitionBy("a").save(result_path)
|
||||
|
||||
|
||||
def generate_data(spark, start, end):
|
||||
@ -149,6 +154,28 @@ def test_single_log_file(started_cluster):
|
||||
)
|
||||
|
||||
|
||||
def test_partition_by(started_cluster):
|
||||
instance = started_cluster.instances["node1"]
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
spark = get_spark()
|
||||
TABLE_NAME = "test_partition_by"
|
||||
|
||||
write_delta_from_df(
|
||||
spark,
|
||||
generate_data(spark, 0, 10),
|
||||
f"/{TABLE_NAME}",
|
||||
mode="overwrite",
|
||||
partition_by="a",
|
||||
)
|
||||
|
||||
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
||||
assert len(files) == 11 # 10 partitions and 1 metadata file
|
||||
|
||||
create_delta_table(instance, TABLE_NAME)
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10
|
||||
|
||||
|
||||
def test_multiple_log_files(started_cluster):
|
||||
instance = started_cluster.instances["node1"]
|
||||
minio_client = started_cluster.minio_client
|
||||
|
@ -83,21 +83,33 @@ def get_spark():
|
||||
|
||||
|
||||
def write_iceberg_from_file(
|
||||
spark, path, table_name, mode="overwrite", format_version="1"
|
||||
spark, path, table_name, mode="overwrite", format_version="1", partition_by=None
|
||||
):
|
||||
if mode == "overwrite":
|
||||
spark.read.load(f"file://{path}").writeTo(table_name).tableProperty(
|
||||
"format-version", format_version
|
||||
).using("iceberg").create()
|
||||
if partition_by is None:
|
||||
spark.read.load(f"file://{path}").writeTo(table_name).tableProperty(
|
||||
"format-version", format_version
|
||||
).using("iceberg").create()
|
||||
else:
|
||||
spark.read.load(f"file://{path}").writeTo(table_name).partitionedBy(
|
||||
partition_by
|
||||
).tableProperty("format-version", format_version).using("iceberg").create()
|
||||
else:
|
||||
spark.read.load(f"file://{path}").writeTo(table_name).append()
|
||||
|
||||
|
||||
def write_iceberg_from_df(spark, df, table_name, mode="overwrite", format_version="1"):
|
||||
def write_iceberg_from_df(
|
||||
spark, df, table_name, mode="overwrite", format_version="1", partition_by=None
|
||||
):
|
||||
if mode == "overwrite":
|
||||
df.writeTo(table_name).tableProperty("format-version", format_version).using(
|
||||
"iceberg"
|
||||
).create()
|
||||
if partition_by is None:
|
||||
df.writeTo(table_name).tableProperty(
|
||||
"format-version", format_version
|
||||
).using("iceberg").create()
|
||||
else:
|
||||
df.writeTo(table_name).tableProperty(
|
||||
"format-version", format_version
|
||||
).partitionedBy(partition_by).using("iceberg").create()
|
||||
else:
|
||||
df.writeTo(table_name).append()
|
||||
|
||||
@ -165,6 +177,32 @@ def test_single_iceberg_file(started_cluster, format_version):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
def test_partition_by(started_cluster, format_version):
|
||||
instance = started_cluster.instances["node1"]
|
||||
minio_client = started_cluster.minio_client
|
||||
bucket = started_cluster.minio_bucket
|
||||
spark = get_spark()
|
||||
TABLE_NAME = "test_partition_by_" + format_version
|
||||
|
||||
write_iceberg_from_df(
|
||||
spark,
|
||||
generate_data(spark, 0, 10),
|
||||
TABLE_NAME,
|
||||
mode="overwrite",
|
||||
format_version=format_version,
|
||||
partition_by="a",
|
||||
)
|
||||
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
|
||||
)
|
||||
assert len(files) == 14 # 10 partitiions + 4 metadata files
|
||||
|
||||
create_iceberg_table(instance, TABLE_NAME)
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
def test_multiple_iceberg_files(started_cluster, format_version):
|
||||
instance = started_cluster.instances["node1"]
|
||||
@ -174,7 +212,11 @@ def test_multiple_iceberg_files(started_cluster, format_version):
|
||||
TABLE_NAME = "test_multiple_iceberg_files_" + format_version
|
||||
|
||||
write_iceberg_from_df(
|
||||
spark, generate_data(spark, 0, 100), TABLE_NAME, mode="overwrite", format_version=format_version,
|
||||
spark,
|
||||
generate_data(spark, 0, 100),
|
||||
TABLE_NAME,
|
||||
mode="overwrite",
|
||||
format_version=format_version,
|
||||
)
|
||||
|
||||
files = upload_directory(
|
||||
@ -191,7 +233,11 @@ def test_multiple_iceberg_files(started_cluster, format_version):
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
||||
|
||||
write_iceberg_from_df(
|
||||
spark, generate_data(spark, 100, 200), TABLE_NAME, mode="append", format_version=format_version
|
||||
spark,
|
||||
generate_data(spark, 100, 200),
|
||||
TABLE_NAME,
|
||||
mode="append",
|
||||
format_version=format_version,
|
||||
)
|
||||
files = upload_directory(
|
||||
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", ""
|
||||
@ -232,7 +278,9 @@ def test_types(started_cluster, format_version):
|
||||
)
|
||||
df = spark.createDataFrame(data=data, schema=schema)
|
||||
df.printSchema()
|
||||
write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version=format_version)
|
||||
write_iceberg_from_df(
|
||||
spark, df, TABLE_NAME, mode="overwrite", format_version=format_version
|
||||
)
|
||||
|
||||
upload_directory(minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", "")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user