mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
121 lines
3.9 KiB
Python
Executable File
121 lines
3.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
|
|
import pyspark
|
|
from delta import * # pip install delta-spark
|
|
|
|
# Usage example:
|
|
# ./data-lakes-importer.py iceberg data.parquet result_path
|
|
|
|
|
|
def get_spark_for_iceberg(result_path):
|
|
builder = (
|
|
pyspark.sql.SparkSession.builder.appName("spark_test")
|
|
.config(
|
|
"spark.jars.packages",
|
|
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0",
|
|
)
|
|
.config(
|
|
"spark.sql.catalog.spark_catalog",
|
|
"org.apache.iceberg.spark.SparkSessionCatalog",
|
|
)
|
|
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
|
|
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
|
|
.config("spark.sql.catalog.spark_catalog.warehouse", result_path)
|
|
.master("local")
|
|
)
|
|
return builder.master("local").getOrCreate()
|
|
|
|
|
|
def get_spark_for_delta():
|
|
builder = (
|
|
pyspark.sql.SparkSession.builder.appName("spark_test")
|
|
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
|
|
.config(
|
|
"spark.sql.catalog.spark_catalog",
|
|
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
|
|
)
|
|
.master("local")
|
|
)
|
|
|
|
return configure_spark_with_delta_pip(builder).master("local").getOrCreate()
|
|
|
|
|
|
def get_spark_for_hudi():
|
|
builder = (
|
|
pyspark.sql.SparkSession.builder.appName("spark_test")
|
|
.config(
|
|
"spark.jars.packages",
|
|
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0",
|
|
)
|
|
.config(
|
|
"org.apache.spark.sql.hudi.catalog.HoodieCatalog",
|
|
)
|
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
|
.config(
|
|
"spark.sql.catalog.local", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
|
|
)
|
|
.config(
|
|
"spark.driver.memory", "20g"
|
|
) # .config('spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
|
|
.master("local")
|
|
)
|
|
return builder.master("local").getOrCreate()
|
|
|
|
|
|
def main():
|
|
data_lake_name = str(sys.argv[1]).strip()
|
|
file_path = sys.argv[2]
|
|
result_path = sys.argv[3]
|
|
|
|
if not file_path.startswith("/"):
|
|
print(f"Expected absolute path, got relative: {file_path}")
|
|
exit(1)
|
|
|
|
if not result_path.startswith("/"):
|
|
print(f"Expected absolute path, got relative: {result_path}")
|
|
exit(1)
|
|
|
|
spark = None
|
|
if data_lake_name == "iceberg":
|
|
spark = get_spark_for_iceberg(result_path)
|
|
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
|
|
spark.read.load(f"file://{file_path}").writeTo("iceberg_table").using(
|
|
"iceberg"
|
|
).create()
|
|
elif data_lake_name == "delta":
|
|
spark = get_spark_for_delta()
|
|
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
|
|
spark.read.load(f"file://{file_path}").write.mode("overwrite").option(
|
|
"compression", "none"
|
|
).format("delta").option("delta.columnMapping.mode", "name").save(result_path)
|
|
elif data_lake_name == "hudi":
|
|
spark = get_spark_for_hudi()
|
|
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
|
|
spark.read.load(f"file://{file_path}").write.mode("overwrite").option(
|
|
"compression", "none"
|
|
).format("hudi").option("hoodie.table.name", "hudi").option(
|
|
"hoodie.datasource.write.partitionpath.field", "partitionpath"
|
|
).option(
|
|
"hoodie.datasource.write.table.name", "hudi"
|
|
).option(
|
|
"hoodie.datasource.write.recordkey.field", "ts"
|
|
).option(
|
|
"hoodie.datasource.write.precombine.field", "ts"
|
|
).option(
|
|
"hoodie.datasource.write.operation", "insert_overwrite"
|
|
).save(
|
|
result_path
|
|
)
|
|
else:
|
|
print(
|
|
f"Unknown data lake name {data_lake_name}. Support only: 'iceberg', 'delta'"
|
|
)
|
|
exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|