Hudi tests

This commit is contained in:
kssenii 2023-03-21 12:51:14 +01:00
parent f776f4ff46
commit eceb54b001
44 changed files with 82 additions and 966 deletions

View File

@ -1,75 +1,25 @@
import logging
import pytest
import os
import json
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
import pyspark
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def prepare_s3_bucket(started_cluster):
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
def upload_test_table(started_cluster):
bucket = started_cluster.minio_bucket
for address, dirs, files in os.walk(SCRIPT_DIR + "/test_table"):
address_without_prefix = address[len(SCRIPT_DIR) :]
for name in files:
started_cluster.minio_client.fput_object(
bucket,
os.path.join(address_without_prefix, name),
os.path.join(address, name),
)
TABLE_NAME = "test_hudi_table"
USER_FILES_PATH = "/ClickHouse/tests/integration/test_storage_hudi/_instances/node1/database/user_files"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("main_server", with_minio=True)
cluster.add_instance("node1", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
@ -77,9 +27,6 @@ def started_cluster():
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
upload_test_table(cluster)
logging.info("Test table uploaded")
yield cluster
finally:
@ -96,108 +43,75 @@ def run_query(instance, query, stdin=None, settings=None):
return result
def test_create_query(started_cluster):
instance = started_cluster.instances["main_server"]
def get_spark():
builder = (
pyspark.sql.SparkSession.builder.appName("spark_test")
.config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0",
)
.config(
"org.apache.spark.sql.hudi.catalog.HoodieCatalog",
)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(
"spark.sql.catalog.local", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
)
.config("spark.driver.memory", "20g")
.master("local")
)
return builder.master("local").getOrCreate()
def write_hudi(spark, path, result_path):
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
spark.read.load(f"file://{path}").write.mode("overwrite").option(
"compression", "none"
).option("compression", "none").format("hudi").option(
"hoodie.table.name", TABLE_NAME
).option(
"hoodie.datasource.write.partitionpath.field", "partitionpath"
).option(
"hoodie.datasource.write.table.name", TABLE_NAME
).option(
"hoodie.datasource.write.operation", "insert_overwrite"
).option(
"hoodie.parquet.compression.codec", "snappy"
).option(
"hoodie.hfile.compression.algorithm", "uncompressed"
).option(
"hoodie.datasource.write.recordkey.field", "a"
).option(
"hoodie.datasource.write.precombine.field", "a"
).save(
result_path
)
def test_basic(started_cluster):
instance = started_cluster.instances["node1"]
data_path = f"/var/lib/clickhouse/user_files/{TABLE_NAME}.parquet"
inserted_data = "SELECT number, toString(number) FROM numbers(100)"
instance.query(
f"INSERT INTO TABLE FUNCTION file('{data_path}', 'Parquet', 'a Int32, b String') {inserted_data} FORMAT Parquet SETTINGS output_format_parquet_compression_method='snappy'"
)
data_path = f"{USER_FILES_PATH}/{TABLE_NAME}.parquet"
result_path = f"/{TABLE_NAME}_result"
spark = get_spark()
write_hudi(spark, data_path, result_path)
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
upload_directory(minio_client, bucket, result_path, "")
create_query = f"""CREATE TABLE hudi ENGINE=Hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123')"""
run_query(instance, create_query)
def test_select_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
columns = [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"begin_lat",
"begin_lon",
"driver",
"end_lat",
"end_lon",
"fare",
"partitionpath",
"rider",
"ts",
"uuid",
]
# create query in case table doesn't exist
create_query = f"""CREATE TABLE IF NOT EXISTS hudi ENGINE=Hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123')"""
run_query(instance, create_query)
select_query = "SELECT {} FROM hudi FORMAT TSV"
select_table_function_query = "SELECT {col} FROM hudi('http://{ip}:{port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV"
for column_name in columns:
result = run_query(instance, select_query.format(column_name)).splitlines()
assert len(result) > 0
for column_name in columns:
result = run_query(
instance,
select_table_function_query.format(
col=column_name,
ip=started_cluster.minio_ip,
port=started_cluster.minio_port,
bucket=bucket,
),
).splitlines()
assert len(result) > 0
# test if all partition paths is presented in result
distinct_select_query = (
"SELECT DISTINCT partitionpath FROM hudi ORDER BY partitionpath FORMAT TSV"
instance.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} ENGINE=Hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{TABLE_NAME}_result/', 'minio', 'minio123')"""
)
distinct_select_table_function_query = "SELECT DISTINCT partitionpath FROM hudi('http://{ip}:{port}/{bucket}/test_table/', 'minio', 'minio123') ORDER BY partitionpath FORMAT TSV"
result = run_query(instance, distinct_select_query)
result_table_function = run_query(
instance,
distinct_select_table_function_query.format(
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
),
)
expected = [
"americas/brazil/sao_paulo",
"americas/united_states/san_francisco",
"asia/india/chennai",
]
assert TSV(result) == TSV(expected)
assert TSV(result_table_function) == TSV(expected)
def test_describe_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
result = instance.query(
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
)
assert result == TSV(
[
["_hoodie_commit_time", "Nullable(String)"],
["_hoodie_commit_seqno", "Nullable(String)"],
["_hoodie_record_key", "Nullable(String)"],
["_hoodie_partition_path", "Nullable(String)"],
["_hoodie_file_name", "Nullable(String)"],
["begin_lat", "Nullable(Float64)"],
["begin_lon", "Nullable(Float64)"],
["driver", "Nullable(String)"],
["end_lat", "Nullable(Float64)"],
["end_lon", "Nullable(Float64)"],
["fare", "Nullable(Float64)"],
["partitionpath", "Nullable(String)"],
["rider", "Nullable(String)"],
["ts", "Nullable(Int64)"],
["uuid", "Nullable(String)"],
]
assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query(
inserted_data
)

View File

@ -1,165 +0,0 @@
{
"partitionToWriteStats" : {
"americas/brazil/sao_paulo" : [ {
"fileId" : "8a9a08bb-8cbc-4ec9-a2d4-8a6cdcaebbad-0",
"path" : "americas/brazil/sao_paulo/8a9a08bb-8cbc-4ec9-a2d4-8a6cdcaebbad-0_0-73-83_20220830083647456.parquet",
"prevCommit" : "null",
"numWrites" : 3,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 3,
"totalWriteBytes" : 437831,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "americas/brazil/sao_paulo",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 437831,
"minEventTime" : null,
"maxEventTime" : null
} ],
"americas/united_states/san_francisco" : [ {
"fileId" : "34b1b177-f0af-467b-9214-473ead268e55-0",
"path" : "americas/united_states/san_francisco/34b1b177-f0af-467b-9214-473ead268e55-0_1-73-84_20220830083647456.parquet",
"prevCommit" : "null",
"numWrites" : 5,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 5,
"totalWriteBytes" : 438186,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "americas/united_states/san_francisco",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 438186,
"minEventTime" : null,
"maxEventTime" : null
} ],
"asia/india/chennai" : [ {
"fileId" : "92aa634e-d83f-4057-a385-ea3b22e5d6e1-0",
"path" : "asia/india/chennai/92aa634e-d83f-4057-a385-ea3b22e5d6e1-0_2-73-85_20220830083647456.parquet",
"prevCommit" : "null",
"numWrites" : 2,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 2,
"totalWriteBytes" : 437623,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "asia/india/chennai",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 437623,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{\"type\":\"record\",\"name\":\"test_table_record\",\"namespace\":\"hoodie.test_table\",\"fields\":[{\"name\":\"begin_lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"begin_lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"end_lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"end_lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"partitionpath\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"uuid\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
},
"operationType" : "UPSERT",
"writeStats" : [ {
"fileId" : "8a9a08bb-8cbc-4ec9-a2d4-8a6cdcaebbad-0",
"path" : "americas/brazil/sao_paulo/8a9a08bb-8cbc-4ec9-a2d4-8a6cdcaebbad-0_0-73-83_20220830083647456.parquet",
"prevCommit" : "null",
"numWrites" : 3,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 3,
"totalWriteBytes" : 437831,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "americas/brazil/sao_paulo",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 437831,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "34b1b177-f0af-467b-9214-473ead268e55-0",
"path" : "americas/united_states/san_francisco/34b1b177-f0af-467b-9214-473ead268e55-0_1-73-84_20220830083647456.parquet",
"prevCommit" : "null",
"numWrites" : 5,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 5,
"totalWriteBytes" : 438186,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "americas/united_states/san_francisco",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 438186,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "92aa634e-d83f-4057-a385-ea3b22e5d6e1-0",
"path" : "asia/india/chennai/92aa634e-d83f-4057-a385-ea3b22e5d6e1-0_2-73-85_20220830083647456.parquet",
"prevCommit" : "null",
"numWrites" : 2,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 2,
"totalWriteBytes" : 437623,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "asia/india/chennai",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 437623,
"minEventTime" : null,
"maxEventTime" : null
} ],
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalLogFilesCompacted" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesSize" : 0,
"totalScanTime" : 0,
"totalCreateTime" : 563,
"totalUpsertTime" : 0,
"minAndMaxEventTime" : {
"Optional.empty" : {
"val" : null,
"present" : false
}
},
"writePartitionPaths" : [ "americas/brazil/sao_paulo", "americas/united_states/san_francisco", "asia/india/chennai" ],
"fileIdAndRelativePaths" : {
"92aa634e-d83f-4057-a385-ea3b22e5d6e1-0" : "asia/india/chennai/92aa634e-d83f-4057-a385-ea3b22e5d6e1-0_2-73-85_20220830083647456.parquet",
"34b1b177-f0af-467b-9214-473ead268e55-0" : "americas/united_states/san_francisco/34b1b177-f0af-467b-9214-473ead268e55-0_1-73-84_20220830083647456.parquet",
"8a9a08bb-8cbc-4ec9-a2d4-8a6cdcaebbad-0" : "americas/brazil/sao_paulo/8a9a08bb-8cbc-4ec9-a2d4-8a6cdcaebbad-0_0-73-83_20220830083647456.parquet"
}
}

View File

@ -1,161 +0,0 @@
{
"partitionToWriteStats" : {
"americas/brazil/sao_paulo" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 3,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ],
"americas/united_states/san_francisco" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 5,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ],
"asia/india/chennai" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 2,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
"extraMetadata" : { },
"operationType" : "UPSERT",
"writeStats" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 3,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 5,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 2,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ],
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalLogFilesCompacted" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesSize" : 0,
"totalScanTime" : 0,
"totalCreateTime" : 0,
"totalUpsertTime" : 0,
"minAndMaxEventTime" : {
"Optional.empty" : {
"val" : null,
"present" : false
}
},
"writePartitionPaths" : [ "americas/brazil/sao_paulo", "americas/united_states/san_francisco", "asia/india/chennai" ],
"fileIdAndRelativePaths" : {
"" : null
}
}

View File

@ -1,21 +0,0 @@
#Updated at 2022-08-30T08:36:49.089844Z
#Tue Aug 30 08:36:49 UTC 2022
hoodie.table.type=COPY_ON_WRITE
hoodie.table.metadata.partitions=files
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=partitionpath
hoodie.archivelog.folder=archived
hoodie.timeline.layout.version=1
hoodie.table.checksum=2702201862
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.timeline.timezone=LOCAL
hoodie.table.recordkey.fields=uuid
hoodie.table.name=test_table
hoodie.partition.metafile.use.base.format=false
hoodie.datasource.write.hive_style_partitioning=false
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.table.base.file.format=PARQUET
hoodie.database.name=
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.version=5

View File

@ -1,116 +0,0 @@
{
"partitionToWriteStats" : {
"files" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "files-0000",
"path" : null,
"prevCommit" : "00000000000000",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 1,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
"extraMetadata" : { },
"operationType" : "UPSERT_PREPPED",
"writeStats" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "files-0000",
"path" : null,
"prevCommit" : "00000000000000",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 1,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ],
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalLogFilesCompacted" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesSize" : 0,
"totalScanTime" : 0,
"totalCreateTime" : 0,
"totalUpsertTime" : 0,
"minAndMaxEventTime" : {
"Optional.empty" : {
"val" : null,
"present" : false
}
},
"writePartitionPaths" : [ "files" ],
"fileIdAndRelativePaths" : {
"" : null,
"files-0000" : null
}
}

View File

@ -1,116 +0,0 @@
{
"partitionToWriteStats" : {
"files" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "files-0000",
"path" : null,
"prevCommit" : "00000000000000",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 4,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
"extraMetadata" : { },
"operationType" : "UPSERT_PREPPED",
"writeStats" : [ {
"fileId" : "",
"path" : null,
"prevCommit" : "null",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
}, {
"fileId" : "files-0000",
"path" : null,
"prevCommit" : "00000000000000",
"numWrites" : 0,
"numDeletes" : 0,
"numUpdateWrites" : 4,
"numInserts" : 0,
"totalWriteBytes" : 0,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : null,
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 0,
"minEventTime" : null,
"maxEventTime" : null
} ],
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalLogFilesCompacted" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesSize" : 0,
"totalScanTime" : 0,
"totalCreateTime" : 0,
"totalUpsertTime" : 0,
"minAndMaxEventTime" : {
"Optional.empty" : {
"val" : null,
"present" : false
}
},
"writePartitionPaths" : [ "files" ],
"fileIdAndRelativePaths" : {
"" : null,
"files-0000" : null
}
}

View File

@ -1,14 +0,0 @@
#Properties saved on 2022-08-30T08:36:47.657528Z
#Tue Aug 30 08:36:47 UTC 2022
hoodie.compaction.payload.class=org.apache.hudi.metadata.HoodieMetadataPayload
hoodie.table.type=MERGE_ON_READ
hoodie.archivelog.folder=archived
hoodie.timeline.layout.version=1
hoodie.table.checksum=1983687495
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.recordkey.fields=key
hoodie.table.name=test_table_metadata
hoodie.populate.meta.fields=false
hoodie.table.keygenerator.class=org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator
hoodie.table.base.file.format=HFILE
hoodie.table.version=5

View File

@ -1,4 +0,0 @@
#partition metadata
#Tue Aug 30 08:36:48 UTC 2022
commitTime=00000000000000
partitionDepth=1

View File

@ -1,4 +0,0 @@
#partition metadata
#Tue Aug 30 08:36:50 UTC 2022
commitTime=20220830083647456
partitionDepth=3

View File

@ -1,4 +0,0 @@
#partition metadata
#Tue Aug 30 08:36:50 UTC 2022
commitTime=20220830083647456
partitionDepth=3

View File

@ -1,4 +0,0 @@
#partition metadata
#Tue Aug 30 08:36:50 UTC 2022
commitTime=20220830083647456
partitionDepth=3

View File

@ -56,8 +56,9 @@ def get_spark_for_hudi():
.config(
"spark.sql.catalog.local", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
)
.config("spark.driver.memory", "20g") \
# .config('spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
.config(
"spark.driver.memory", "20g"
) # .config('spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
.master("local")
)
return builder.master("local").getOrCreate()
@ -98,6 +99,10 @@ def main():
"hoodie.datasource.write.partitionpath.field", "partitionpath"
).option(
"hoodie.datasource.write.table.name", "hudi"
).option(
"hoodie.datasource.write.recordkey.field", "ts"
).option(
"hoodie.datasource.write.precombine.field", "ts"
).option(
"hoodie.datasource.write.operation", "insert_overwrite"
).save(