This commit is contained in:
kssenii 2023-04-12 14:38:39 +02:00
parent e32c98e412
commit 18723b1a44
3 changed files with 16 additions and 21 deletions

View File

@ -43,7 +43,7 @@ def get_spark():
.master("local")
)
return configure_spark_with_delta_pip(builder).master("local").getOrCreate()
return builder.master("local").getOrCreate()
@pytest.fixture(scope="module")
@ -61,9 +61,11 @@ def started_cluster():
prepare_s3_bucket(cluster)
if cluster.spark_session is not None:
cluster.spark_session.stop()
cluster.spark_session._instantiatedContext = None
pyspark.sql.SparkSession.builder.appName("spark_test").config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,io.delta:delta-core_2.12:2.2.0,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0",
).master("local").getOrCreate().stop()
cluster.spark_session = get_spark()
yield cluster

View File

@ -31,10 +31,6 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def get_spark():
builder = (
pyspark.sql.SparkSession.builder.appName("spark_test")
.config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0",
)
.config(
"org.apache.spark.sql.hudi.catalog.HoodieCatalog",
)
@ -45,13 +41,13 @@ def get_spark():
.config("spark.driver.memory", "20g")
.master("local")
)
return builder.master("local").getOrCreate()
return builder.getOrCreate()
@pytest.fixture(scope="module")
def started_cluster():
cluster = ClickHouseCluster(__file__)
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=["configs/config.d/named_collections.xml"],
@ -64,9 +60,10 @@ def started_cluster():
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
if cluster.spark_session is not None:
cluster.spark_session.stop()
cluster.spark_session._instantiatedContext = None
pyspark.sql.SparkSession.builder.appName("spark_test").config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,io.delta:delta-core_2.12:2.2.0,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0",
).master("local").getOrCreate().stop()
cluster.spark_session = get_spark()

View File

@ -34,10 +34,6 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def get_spark():
builder = (
pyspark.sql.SparkSession.builder.appName("spark_test")
.config(
"spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0",
)
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog",
@ -66,9 +62,10 @@ def started_cluster():
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
if cluster.spark_session is not None:
cluster.spark_session.stop()
cluster.spark_session._instantiatedContext = None
pyspark.sql.SparkSession.builder.appName("spark_test").config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,io.delta:delta-core_2.12:2.2.0,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0",
).master("local").getOrCreate().stop()
cluster.spark_session = get_spark()
@ -180,7 +177,6 @@ def test_single_iceberg_file(started_cluster, format_version):
write_iceberg_from_file(
spark, parquet_data_path, TABLE_NAME, format_version=format_version
)
time.sleep(500)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""