This commit is contained in:
kssenii 2023-03-29 20:01:21 +02:00
parent 0300142888
commit 71d02d76f5
6 changed files with 205 additions and 55 deletions

View File

@ -39,7 +39,7 @@ namespace
* To find needed parts we need to find out latest part file for every partition.
* Part format is usually parquet, but can differ.
*/
Strings processMetadataFiles(const std::vector<String> & keys, const String & format)
Strings processMetadataFiles(const std::vector<String> & keys, const std::string & base_directory)
{
auto * log = &Poco::Logger::get("HudiMetadataParser");
@ -50,32 +50,23 @@ namespace
};
std::unordered_map<String, FileInfo> latest_parts; /// Partition path (directory) -> latest part file info.
/// Make format lowercase.
const auto expected_extension= "." + Poco::toLower(format);
/// Filter only files with specific format.
auto keys_filter = [&](const String & key) { return std::filesystem::path(key).extension() == expected_extension; };
/// For each partition path take only latest file.
for (const auto & key : keys | std::views::filter(keys_filter))
for (const auto & key : keys)
{
const auto key_path = std::filesystem::path(key);
/// Every filename contains metadata split by "_", timestamp is after last "_".
const auto delim = key.find_last_of('_') + 1;
if (delim == std::string::npos)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format of metadata files");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key);
const auto timestamp = parse<UInt64>(key.substr(delim + 1));
const auto file_path = key.substr(base_directory.size());
const auto [it, inserted] = latest_parts.emplace(/* partition_path */key_path.parent_path(), FileInfo{});
LOG_TEST(log, "Having path {}", file_path);
const auto [it, inserted] = latest_parts.emplace(/* partition_path */fs::path(key).parent_path(), FileInfo{});
if (inserted)
{
it->second = FileInfo{key_path.filename(), timestamp};
}
it->second = FileInfo{file_path, timestamp};
else if (it->second.timestamp < timestamp)
{
it->second = {key_path.filename(), timestamp};
}
it->second = {file_path, timestamp};
}
LOG_TRACE(log, "Having {} result partitions", latest_parts.size());
@ -90,8 +81,8 @@ namespace
template <typename Configuration, typename MetadataReadHelper>
Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr)
{
const Strings files = MetadataReadHelper::listFiles(configuration);
return processMetadataFiles(files, "parquet");
const Strings files = MetadataReadHelper::listFiles(configuration, "", Poco::toLower(configuration.format));
return processMetadataFiles(files, configuration.getPath());
}
#if USE_AWS_S3

View File

@ -67,7 +67,10 @@ private:
else
configuration.keys = keys;
LOG_TRACE(&Poco::Logger::get("DataLake"), "New configuration path: {}", configuration.getPath());
LOG_TRACE(
&Poco::Logger::get("DataLake"),
"New configuration path: {}, keys: {}",
configuration.getPath(), fmt::join(keys, ", "));
configuration.connect(local_context);
return configuration;

View File

@ -44,12 +44,10 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
request.SetBucket(bucket);
request.SetPrefix(std::filesystem::path(table_path) / prefix);
bool is_finished{false};
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
@ -66,8 +64,7 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();
if (std::filesystem::path(filename).extension() == suffix)
if (filename.ends_with(suffix))
res.push_back(filename);
}

View File

@ -18,12 +18,13 @@ def upload_directory(minio_client, bucket_name, local_path, s3_path):
)
result_files.append(result_s3_path)
else:
upload_directory(
files = upload_directory(
minio_client,
bucket_name,
os.path.join(local_path, local_file),
os.path.join(s3_path, local_file),
)
result_files.extend(files)
return result_files

View File

@ -0,0 +1,9 @@
<clickhouse>
<named_collections>
<s3>
<url>http://minio1:9001/root/</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</named_collections>
</clickhouse>

View File

@ -9,9 +9,23 @@ from helpers.test_tools import TSV
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
import pyspark
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
DateType,
TimestampType,
BooleanType,
ArrayType,
)
from pyspark.sql.functions import current_timestamp
from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
TABLE_NAME = "test_hudi_table"
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_files")
@ -19,7 +33,11 @@ USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_fil
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", with_minio=True)
cluster.add_instance(
"node1",
main_configs=["configs/config.d/named_collections.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
@ -63,18 +81,20 @@ def get_spark():
return builder.master("local").getOrCreate()
def write_hudi(spark, path, result_path):
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
spark.read.load(f"file://{path}").write.mode("overwrite").option(
def write_hudi_from_df(spark, table_name, df, result_path, mode="overwrite"):
if mode is "overwrite":
hudi_write_mode = "insert_overwrite"
else:
hudi_write_mode = "upsert"
df.write.mode(mode).option("compression", "none").option(
"compression", "none"
).option("compression", "none").format("hudi").option(
"hoodie.table.name", TABLE_NAME
).option(
).format("hudi").option("hoodie.table.name", table_name).option(
"hoodie.datasource.write.partitionpath.field", "partitionpath"
).option(
"hoodie.datasource.write.table.name", TABLE_NAME
"hoodie.datasource.write.table.name", table_name
).option(
"hoodie.datasource.write.operation", "insert_overwrite"
"hoodie.datasource.write.operation", hudi_write_mode
).option(
"hoodie.parquet.compression.codec", "snappy"
).option(
@ -88,32 +108,161 @@ def write_hudi(spark, path, result_path):
)
def test_basic(started_cluster):
instance = started_cluster.instances["node1"]
def write_hudi_from_file(spark, table_name, path, result_path):
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
df = spark.read.load(f"file://{path}")
write_hudi_from_df(spark, table_name, df, result_path)
data_path = f"/var/lib/clickhouse/user_files/{TABLE_NAME}.parquet"
inserted_data = "SELECT number, toString(number) FROM numbers(100)"
instance.query(
f"INSERT INTO TABLE FUNCTION file('{data_path}', 'Parquet', 'a Int32, b String') SETTINGS output_format_parquet_compression_method='none' {inserted_data} FORMAT Parquet"
def generate_data(spark, start, end):
a = spark.range(start, end, 1).toDF("a")
b = spark.range(start + 1, end + 1, 1).toDF("b")
b = b.withColumn("b", b["b"].cast(StringType()))
a = a.withColumn(
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
)
b = b.withColumn(
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
)
data_path = f"{USER_FILES_PATH}/{TABLE_NAME}.parquet"
result_path = f"/{TABLE_NAME}_result"
df = a.join(b, on=["row_index"]).drop("row_index")
return df
spark = get_spark()
write_hudi(spark, data_path, result_path)
def create_hudi_table(node, table_name):
node.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=Hudi(s3, filename = '{table_name}/')"""
)
def create_initial_data_file(node, query, table_name, compression_method="none"):
node.query(
f"""
INSERT INTO TABLE FUNCTION
file('{table_name}.parquet')
SETTINGS
output_format_parquet_compression_method='{compression_method}',
s3_truncate_on_insert=1 {query}
FORMAT Parquet"""
)
result_path = f"{USER_FILES_PATH}/{table_name}.parquet"
return result_path
def test_single_hudi_file(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
paths = upload_directory(minio_client, bucket, result_path, "")
assert len(paths) == 1
assert paths[0].endswith(".parquet")
spark = get_spark()
TABLE_NAME = "test_single_hudi_file"
instance.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} ENGINE=Hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{TABLE_NAME}_result/__HIVE_DEFAULT_PARTITION__/', 'minio', 'minio123')"""
)
inserted_data = "SELECT number as a, toString(number) as b FROM numbers(100)"
parquet_data_path = create_initial_data_file(instance, inserted_data, TABLE_NAME)
write_hudi_from_file(spark, TABLE_NAME, parquet_data_path, f"/{TABLE_NAME}")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
assert files[0].endswith(".parquet")
create_hudi_table(instance, TABLE_NAME)
assert instance.query(f"SELECT a, b FROM {TABLE_NAME}") == instance.query(
inserted_data
)
def test_multiple_hudi_files(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_multiple_hudi_files"
write_hudi_from_df(spark, TABLE_NAME, generate_data(spark, 0, 100), f"/{TABLE_NAME}")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
create_hudi_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
write_hudi_from_df(
spark, TABLE_NAME, generate_data(spark, 100, 200), f"/{TABLE_NAME}", mode="append"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 2
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
assert instance.query(f"SELECT a, b FROM {TABLE_NAME} ORDER BY 1") == instance.query(
"SELECT number, toString(number + 1) FROM numbers(200)"
)
write_hudi_from_df(
spark, TABLE_NAME, generate_data(spark, 100, 300), f"/{TABLE_NAME}", mode="append"
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
#assert len(files) == 3
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
assert instance.query(f"SELECT a, b FROM {TABLE_NAME} ORDER BY 1") == instance.query(
"SELECT number, toString(number + 1) FROM numbers(300)"
)
def test_types(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_types"
data = [
(
123,
"string",
datetime.strptime("2000-01-01", "%Y-%m-%d"),
["str1", "str2"],
True,
)
]
schema = StructType(
[
StructField("a", IntegerType()),
StructField("b", StringType()),
StructField("c", DateType()),
StructField("d", ArrayType(StringType())),
StructField("e", BooleanType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
write_hudi_from_df(
spark, TABLE_NAME, df, f"/{TABLE_NAME}", mode="overwrite"
)
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
create_hudi_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1
assert (
instance.query(f"SELECT * FROM {TABLE_NAME}").strip()
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
)
#table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{TABLE_NAME}/', 'minio', 'minio123')"
#assert (
# instance.query(f"SELECT * FROM {table_function}").strip()
# == "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
#)
#assert instance.query(f"DESCRIBE {table_function} FORMAT TSV") == TSV(
# [
# ["a", "Nullable(Int32)"],
# ["b", "Nullable(String)"],
# ["c", "Nullable(Date32)"],
# ["d", "Array(Nullable(String))"],
# ["e", "Nullable(Bool)"],
# ]
#)