#!/usr/bin/env python3

import os
import sys
import pyspark
from delta import *  # pip install delta-spark

# Usage example:
# ./data-lakes-importer.py iceberg data.parquet result_path


def get_spark_for_iceberg(result_path):
    builder = (
        pyspark.sql.SparkSession.builder.appName("spark_test")
        .config(
            "spark.jars.packages",
            "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0",
        )
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.iceberg.spark.SparkSessionCatalog",
        )
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.spark_catalog.type", "hadoop")
        .config("spark.sql.catalog.spark_catalog.warehouse", result_path)
        .master("local")
    )
    return builder.master("local").getOrCreate()


def get_spark_for_delta():
    builder = (
        pyspark.sql.SparkSession.builder.appName("spark_test")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        .master("local")
    )

    return configure_spark_with_delta_pip(builder).master("local").getOrCreate()


def get_spark_for_hudi():
    builder = (
        pyspark.sql.SparkSession.builder.appName("spark_test")
        .config(
            "spark.jars.packages",
            "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0",
        )
        .config(
            "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
        )
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config(
            "spark.sql.catalog.local", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
        )
        .config(
            "spark.driver.memory", "20g"
        )  # .config('spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
        .master("local")
    )
    return builder.master("local").getOrCreate()


def main():
    data_lake_name = str(sys.argv[1]).strip()
    file_path = sys.argv[2]
    result_path = sys.argv[3]

    if not file_path.startswith("/"):
        print(f"Expected absolute path, got relative: {file_path}")
        exit(1)

    if not result_path.startswith("/"):
        print(f"Expected absolute path, got relative: {result_path}")
        exit(1)

    spark = None
    if data_lake_name == "iceberg":
        spark = get_spark_for_iceberg(result_path)
        spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
        spark.read.load(f"file://{file_path}").writeTo("iceberg_table").using(
            "iceberg"
        ).create()
    elif data_lake_name == "delta":
        spark = get_spark_for_delta()
        spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
        spark.read.load(f"file://{file_path}").write.mode("overwrite").option(
            "compression", "none"
        ).format("delta").option("delta.columnMapping.mode", "name").save(result_path)
    elif data_lake_name == "hudi":
        spark = get_spark_for_hudi()
        spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
        spark.read.load(f"file://{file_path}").write.mode("overwrite").option(
            "compression", "none"
        ).format("hudi").option("hoodie.table.name", "hudi").option(
            "hoodie.datasource.write.partitionpath.field", "partitionpath"
        ).option(
            "hoodie.datasource.write.table.name", "hudi"
        ).option(
            "hoodie.datasource.write.recordkey.field", "ts"
        ).option(
            "hoodie.datasource.write.precombine.field", "ts"
        ).option(
            "hoodie.datasource.write.operation", "insert_overwrite"
        ).save(
            result_path
        )
    else:
        print(
            f"Unknown data lake name {data_lake_name}. Support only: 'iceberg', 'delta'"
        )
        exit(1)


if __name__ == "__main__":
    main()