ClickHouse/tests/integration/test_storage_hudi/test.py

277 lines
8.4 KiB
Python
Raw Normal View History

2022-08-30 09:14:05 +00:00
import logging
2023-03-21 11:51:14 +00:00
import pytest
2022-08-30 09:14:05 +00:00
import os
2022-08-30 17:38:57 +00:00
import json
2022-08-30 09:14:05 +00:00
import helpers.client
from helpers.cluster import ClickHouseCluster
2022-08-31 09:26:53 +00:00
from helpers.test_tools import TSV
2023-03-21 11:51:14 +00:00
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
2022-08-30 09:14:05 +00:00
2023-03-21 11:51:14 +00:00
import pyspark
2023-03-29 18:01:21 +00:00
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
DateType,
TimestampType,
BooleanType,
ArrayType,
)
from pyspark.sql.functions import current_timestamp
from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
2022-09-06 18:05:33 +00:00
2023-03-21 11:51:14 +00:00
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
2023-03-21 21:17:59 +00:00
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_files")
2022-08-30 09:14:05 +00:00
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
2023-03-29 18:01:21 +00:00
cluster.add_instance(
"node1",
main_configs=["configs/config.d/named_collections.xml"],
with_minio=True,
)
2022-08-30 09:14:05 +00:00
logging.info("Starting cluster...")
cluster.start()
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
yield cluster
2022-09-06 18:05:33 +00:00
2022-08-30 09:14:05 +00:00
finally:
cluster.shutdown()
2022-09-06 18:05:33 +00:00
2022-08-30 09:14:05 +00:00
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
2023-03-21 11:51:14 +00:00
def get_spark():
builder = (
pyspark.sql.SparkSession.builder.appName("spark_test")
.config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0",
)
.config(
"org.apache.spark.sql.hudi.catalog.HoodieCatalog",
)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(
"spark.sql.catalog.local", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
)
.config("spark.driver.memory", "20g")
.master("local")
)
return builder.master("local").getOrCreate()
2023-03-29 18:01:21 +00:00
def write_hudi_from_df(spark, table_name, df, result_path, mode="overwrite"):
if mode is "overwrite":
hudi_write_mode = "insert_overwrite"
else:
hudi_write_mode = "upsert"
df.write.mode(mode).option("compression", "none").option(
2023-03-21 11:51:14 +00:00
"compression", "none"
2023-03-29 18:01:21 +00:00
).format("hudi").option("hoodie.table.name", table_name).option(
2023-03-21 11:51:14 +00:00
"hoodie.datasource.write.partitionpath.field", "partitionpath"
).option(
2023-03-29 18:01:21 +00:00
"hoodie.datasource.write.table.name", table_name
2023-03-21 11:51:14 +00:00
).option(
2023-03-29 18:01:21 +00:00
"hoodie.datasource.write.operation", hudi_write_mode
2023-03-21 11:51:14 +00:00
).option(
"hoodie.parquet.compression.codec", "snappy"
).option(
"hoodie.hfile.compression.algorithm", "uncompressed"
).option(
"hoodie.datasource.write.recordkey.field", "a"
).option(
"hoodie.datasource.write.precombine.field", "a"
).save(
result_path
2022-09-06 18:05:33 +00:00
)
2022-11-09 10:04:53 +00:00
2023-03-29 18:01:21 +00:00
def write_hudi_from_file(spark, table_name, path, result_path):
spark.conf.set("spark.sql.debug.maxToStringFields", 100000)
df = spark.read.load(f"file://{path}")
write_hudi_from_df(spark, table_name, df, result_path)
def generate_data(spark, start, end):
a = spark.range(start, end, 1).toDF("a")
b = spark.range(start + 1, end + 1, 1).toDF("b")
b = b.withColumn("b", b["b"].cast(StringType()))
a = a.withColumn(
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
)
b = b.withColumn(
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
)
df = a.join(b, on=["row_index"]).drop("row_index")
return df
def create_hudi_table(node, table_name):
node.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=Hudi(s3, filename = '{table_name}/')"""
)
def create_initial_data_file(node, query, table_name, compression_method="none"):
node.query(
f"""
INSERT INTO TABLE FUNCTION
file('{table_name}.parquet')
SETTINGS
output_format_parquet_compression_method='{compression_method}',
s3_truncate_on_insert=1 {query}
FORMAT Parquet"""
)
result_path = f"{USER_FILES_PATH}/{table_name}.parquet"
return result_path
def test_single_hudi_file(started_cluster):
2023-03-21 11:51:14 +00:00
instance = started_cluster.instances["node1"]
2023-03-29 18:01:21 +00:00
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
spark = get_spark()
TABLE_NAME = "test_single_hudi_file"
inserted_data = "SELECT number as a, toString(number) as b FROM numbers(100)"
parquet_data_path = create_initial_data_file(instance, inserted_data, TABLE_NAME)
write_hudi_from_file(spark, TABLE_NAME, parquet_data_path, f"/{TABLE_NAME}")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
assert files[0].endswith(".parquet")
2023-03-21 11:51:14 +00:00
2023-03-29 18:01:21 +00:00
create_hudi_table(instance, TABLE_NAME)
assert instance.query(f"SELECT a, b FROM {TABLE_NAME}") == instance.query(
inserted_data
2022-11-10 03:25:40 +00:00
)
2022-08-31 09:47:46 +00:00
2023-03-29 18:01:21 +00:00
def test_multiple_hudi_files(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
2023-03-21 11:51:14 +00:00
spark = get_spark()
2023-03-29 18:01:21 +00:00
TABLE_NAME = "test_multiple_hudi_files"
2023-03-29 21:54:44 +00:00
write_hudi_from_df(
spark, TABLE_NAME, generate_data(spark, 0, 100), f"/{TABLE_NAME}"
)
2023-03-29 18:01:21 +00:00
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 1
create_hudi_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
2022-11-17 11:54:13 +00:00
2023-03-29 18:01:21 +00:00
write_hudi_from_df(
2023-03-29 21:54:44 +00:00
spark,
TABLE_NAME,
generate_data(spark, 100, 200),
f"/{TABLE_NAME}",
mode="append",
2023-03-29 18:01:21 +00:00
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 2
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
2023-03-29 21:54:44 +00:00
assert instance.query(
f"SELECT a, b FROM {TABLE_NAME} ORDER BY 1"
) == instance.query("SELECT number, toString(number + 1) FROM numbers(200)")
2023-03-29 18:01:21 +00:00
write_hudi_from_df(
2023-03-29 21:54:44 +00:00
spark,
TABLE_NAME,
generate_data(spark, 100, 300),
f"/{TABLE_NAME}",
mode="append",
2023-03-29 18:01:21 +00:00
)
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
2023-03-29 21:54:44 +00:00
# assert len(files) == 3
2023-03-29 18:01:21 +00:00
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
2023-03-29 21:54:44 +00:00
assert instance.query(
f"SELECT a, b FROM {TABLE_NAME} ORDER BY 1"
) == instance.query("SELECT number, toString(number + 1) FROM numbers(300)")
2023-03-29 18:01:21 +00:00
def test_types(started_cluster):
instance = started_cluster.instances["node1"]
2023-03-21 11:51:14 +00:00
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
2023-03-29 18:01:21 +00:00
spark = get_spark()
TABLE_NAME = "test_types"
2023-03-29 18:01:21 +00:00
data = [
(
123,
"string",
datetime.strptime("2000-01-01", "%Y-%m-%d"),
["str1", "str2"],
True,
)
]
schema = StructType(
[
StructField("a", IntegerType()),
StructField("b", StringType()),
StructField("c", DateType()),
StructField("d", ArrayType(StringType())),
StructField("e", BooleanType()),
]
2023-03-21 11:51:14 +00:00
)
2023-03-29 18:01:21 +00:00
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
2023-03-29 21:54:44 +00:00
write_hudi_from_df(spark, TABLE_NAME, df, f"/{TABLE_NAME}", mode="overwrite")
2023-03-29 18:01:21 +00:00
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
create_hudi_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1
assert (
instance.query(f"SELECT * FROM {TABLE_NAME}").strip()
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
)
2023-03-29 18:01:21 +00:00
2023-03-29 21:54:44 +00:00
# table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{TABLE_NAME}/', 'minio', 'minio123')"
# assert (
2023-03-29 18:01:21 +00:00
# instance.query(f"SELECT * FROM {table_function}").strip()
# == "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
2023-03-29 21:54:44 +00:00
# )
2023-03-29 18:01:21 +00:00
2023-03-29 21:54:44 +00:00
# assert instance.query(f"DESCRIBE {table_function} FORMAT TSV") == TSV(
2023-03-29 18:01:21 +00:00
# [
# ["a", "Nullable(Int32)"],
# ["b", "Nullable(String)"],
# ["c", "Nullable(Date32)"],
# ["d", "Array(Nullable(String))"],
# ["e", "Nullable(Bool)"],
# ]
2023-03-29 21:54:44 +00:00
# )