Fixes for hudi

This commit is contained in:
kssenii 2023-03-30 18:25:54 +02:00
parent c6837c6801
commit 60efa3cae7
4 changed files with 69 additions and 29 deletions

View File

@ -18,7 +18,7 @@ namespace ErrorCodes
namespace
{
/**
* Documentation links:
* Useful links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/

View File

@ -21,8 +21,9 @@ namespace ErrorCodes
namespace
{
/**
* Documentation links:
* Useful links:
* - https://hudi.apache.org/tech-specs/
* - https://hudi.apache.org/docs/file_layouts/
*/
/**
@ -35,44 +36,54 @@ namespace
* Data file name format:
* [File Id]_[File Write Token]_[Transaction timestamp].[File Extension]
*
* To find needed parts we need to find out latest part file for every partition.
* Part format is usually parquet, but can differ.
* To find needed parts we need to find out latest part file for every file group for every partition.
* Explanation why:
* Hudi reads in and overwrites the entire table/partition with each update.
* Hudi controls the number of file groups under a single partition according to the
* hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group.
* Each file group is identified by File Id.
*/
Strings processMetadataFiles(const std::vector<String> & keys, const std::string & base_directory)
{
auto * log = &Poco::Logger::get("HudiMetadataParser");
using Partition = std::string;
using FileID = std::string;
struct FileInfo
{
String filename;
UInt64 timestamp;
String key;
UInt64 timestamp = 0;
};
std::unordered_map<String, FileInfo> latest_parts; /// Partition path (directory) -> latest part file info.
std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> data_files;
/// For each partition path take only latest file.
for (const auto & key : keys)
{
const auto delim = key.find_last_of('_') + 1;
if (delim == std::string::npos)
auto key_file = std::filesystem::path(key);
Strings file_parts;
splitInto<'_'>(file_parts, key_file.stem());
if (file_parts.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key);
const auto timestamp = parse<UInt64>(key.substr(delim + 1));
const auto file_path = key.substr(base_directory.size());
const auto partition = key_file.parent_path().stem();
const auto file_id = file_parts[0];
const auto timestamp = parse<UInt64>(file_parts[2]);
LOG_TEST(log, "Having path {}", file_path);
const auto [it, inserted] = latest_parts.emplace(/* partition_path */std::filesystem::path(key).parent_path(), FileInfo{});
if (inserted)
it->second = FileInfo{file_path, timestamp};
else if (it->second.timestamp < timestamp)
it->second = {file_path, timestamp};
auto & file_info = data_files[partition][file_id];
if (file_info.timestamp == 0 || file_info.timestamp < timestamp)
{
file_info.key = key.substr(base_directory.size());
file_info.timestamp = timestamp;
}
}
LOG_TRACE(log, "Having {} result partitions", latest_parts.size());
Strings result;
for (const auto & [_, file_info] : latest_parts)
result.push_back(file_info.filename);
for (const auto & [partition, partition_data] : data_files)
{
LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size());
for (const auto & [file_id, file_data] : partition_data)
result.push_back(file_data.key);
}
return result;
}
}

View File

@ -139,7 +139,7 @@ def test_single_log_file(started_cluster):
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
assert len(files) == 2 # 1 metadata files + 1 data file
create_delta_table(instance, TABLE_NAME)
@ -160,7 +160,7 @@ def test_multiple_log_files(started_cluster):
spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="overwrite"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
assert len(files) == 2 # 1 metadata files + 1 data file
s3_objects = list(
minio_client.list_objects(bucket, f"/{TABLE_NAME}/_delta_log/", recursive=True)
@ -174,7 +174,7 @@ def test_multiple_log_files(started_cluster):
spark, generate_data(spark, 100, 200), f"/{TABLE_NAME}", mode="append"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 2
assert len(files) == 4 # 2 metadata files + 2 data files
s3_objects = list(
minio_client.list_objects(bucket, f"/{TABLE_NAME}/_delta_log/", recursive=True)

View File

@ -95,6 +95,12 @@ def write_hudi_from_df(spark, table_name, df, result_path, mode="overwrite"):
"hoodie.datasource.write.table.name", table_name
).option(
"hoodie.datasource.write.operation", hudi_write_mode
).option(
"hoodie.datasource.compaction.async.enable", "true"
).option(
"hoodie.compact.inline", "false"
).option(
"hoodie.compact.inline.max.delta.commits", "10"
).option(
"hoodie.parquet.compression.codec", "snappy"
).option(
@ -114,9 +120,9 @@ def write_hudi_from_file(spark, table_name, path, result_path):
write_hudi_from_df(spark, table_name, df, result_path)
def generate_data(spark, start, end):
def generate_data(spark, start, end, append=1):
a = spark.range(start, end, 1).toDF("a")
b = spark.range(start + 1, end + 1, 1).toDF("b")
b = spark.range(start + append, end + append, 1).toDF("b")
b = b.withColumn("b", b["b"].cast(StringType()))
a = a.withColumn(
@ -213,13 +219,36 @@ def test_multiple_hudi_files(started_cluster):
mode="append",
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
# assert len(files) == 3
assert len(files) == 3
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
assert instance.query(
f"SELECT a, b FROM {TABLE_NAME} ORDER BY 1"
) == instance.query("SELECT number, toString(number + 1) FROM numbers(300)")
assert int(instance.query(f"SELECT b FROM {TABLE_NAME} WHERE a = 100")) == 101
write_hudi_from_df(
spark,
TABLE_NAME,
generate_data(spark, 100, 101, append=0),
f"/{TABLE_NAME}",
mode="append",
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
assert int(instance.query(f"SELECT b FROM {TABLE_NAME} WHERE a = 100")) == 100
write_hudi_from_df(
spark,
TABLE_NAME,
generate_data(spark, 100, 1000000, append=0),
f"/{TABLE_NAME}",
mode="append",
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1000000
def test_types(started_cluster):
instance = started_cluster.instances["node1"]