This commit is contained in:
kssenii 2023-04-05 20:32:37 +02:00
parent be13ce76f4
commit c59d7a4bb3
3 changed files with 35 additions and 29 deletions

View File

@ -31,7 +31,6 @@ from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_conte
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "_instances/node1/database/user_files")
@pytest.fixture(scope="module")
@ -119,7 +118,9 @@ def create_delta_table(node, table_name):
)
def create_initial_data_file(node, query, table_name, compression_method="none"):
def create_initial_data_file(
cluster, node, query, table_name, compression_method="none"
):
node.query(
f"""
INSERT INTO TABLE FUNCTION
@ -129,17 +130,13 @@ def create_initial_data_file(node, query, table_name, compression_method="none")
s3_truncate_on_insert=1 {query}
FORMAT Parquet"""
)
result_path = f"{USER_FILES_PATH}/{table_name}.parquet"
user_files_path = os.path.join(
SCRIPT_DIR, f"{cluster.instances_dir_name}/node1/database/user_files"
)
result_path = f"{user_files_path}/{table_name}.parquet"
return result_path
def print_recursive(path):
for root, dirs, files in os.walk(path):
for basename in files:
filename = os.path.join(root, basename)
print(f"Found file {filename}")
def test_single_log_file(started_cluster):
instance = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
@ -148,9 +145,10 @@ def test_single_log_file(started_cluster):
TABLE_NAME = "test_single_log_file"
inserted_data = "SELECT number, toString(number + 1) FROM numbers(100)"
parquet_data_path = create_initial_data_file(instance, inserted_data, TABLE_NAME)
parquet_data_path = create_initial_data_file(
started_cluster, instance, inserted_data, TABLE_NAME
)
print_recursive(SCRIPT_DIR)
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) == 2 # 1 metadata files + 1 data file
@ -304,7 +302,10 @@ def test_metadata(started_cluster):
TABLE_NAME = "test_metadata"
parquet_data_path = create_initial_data_file(
instance, "SELECT number, toString(number) FROM numbers(100)", TABLE_NAME
started_cluster,
instance,
"SELECT number, toString(number) FROM numbers(100)",
TABLE_NAME,
)
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")

View File

@ -26,7 +26,6 @@ from pyspark.sql.window import Window
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_files")
@pytest.fixture(scope="module")
@ -145,7 +144,9 @@ def create_hudi_table(node, table_name):
)
def create_initial_data_file(node, query, table_name, compression_method="none"):
def create_initial_data_file(
cluster, node, query, table_name, compression_method="none"
):
node.query(
f"""
INSERT INTO TABLE FUNCTION
@ -155,7 +156,10 @@ def create_initial_data_file(node, query, table_name, compression_method="none")
s3_truncate_on_insert=1 {query}
FORMAT Parquet"""
)
result_path = f"{USER_FILES_PATH}/{table_name}.parquet"
user_files_path = os.path.join(
SCRIPT_DIR, f"{cluster.instances_dir_name}/node1/database/user_files"
)
result_path = f"{user_files_path}/{table_name}.parquet"
return result_path
@ -167,7 +171,9 @@ def test_single_hudi_file(started_cluster):
TABLE_NAME = "test_single_hudi_file"
inserted_data = "SELECT number as a, toString(number) as b FROM numbers(100)"
parquet_data_path = create_initial_data_file(instance, inserted_data, TABLE_NAME)
parquet_data_path = create_initial_data_file(
started_cluster, instance, inserted_data, TABLE_NAME
)
write_hudi_from_file(spark, TABLE_NAME, parquet_data_path, f"/{TABLE_NAME}")
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")

View File

@ -29,7 +29,6 @@ from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
USER_FILES_PATH = os.path.join(SCRIPT_DIR, "./_instances/node1/database/user_files")
@pytest.fixture(scope="module")
@ -140,7 +139,9 @@ def create_iceberg_table(node, table_name):
)
def create_initial_data_file(node, query, table_name, compression_method="none"):
def create_initial_data_file(
cluster, node, query, table_name, compression_method="none"
):
node.query(
f"""
INSERT INTO TABLE FUNCTION
@ -150,17 +151,13 @@ def create_initial_data_file(node, query, table_name, compression_method="none")
s3_truncate_on_insert=1 {query}
FORMAT Parquet"""
)
result_path = f"{USER_FILES_PATH}/{table_name}.parquet"
user_files_path = os.path.join(
SCRIPT_DIR, f"{cluster.instances_dir_name}/node1/database/user_files"
)
result_path = f"{user_files_path}/{table_name}.parquet"
return result_path
def print_recursive(path):
for root, dirs, files in os.walk(path):
for basename in files:
filename = os.path.join(root, basename)
print(f"Found file {filename}")
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_single_iceberg_file(started_cluster, format_version):
instance = started_cluster.instances["node1"]
@ -170,8 +167,10 @@ def test_single_iceberg_file(started_cluster, format_version):
TABLE_NAME = "test_single_iceberg_file_" + format_version
inserted_data = "SELECT number, toString(number) FROM numbers(100)"
parquet_data_path = create_initial_data_file(instance, inserted_data, TABLE_NAME)
print_recursive(SCRIPT_DIR)
parquet_data_path = create_initial_data_file(
started_cluster, instance, inserted_data, TABLE_NAME
)
write_iceberg_from_file(
spark, parquet_data_path, TABLE_NAME, format_version=format_version
)