From a29ded494186165469ab207c5fdbc9edb31d9c2a Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Mon, 18 Nov 2024 17:37:24 +0000 Subject: [PATCH] add test for iceberg --- .../configs/config.d/cluster.xml | 20 ++ .../configs/config.d/query_log.xml | 6 + .../integration/test_storage_iceberg/test.py | 175 ++++++++++++++++-- 3 files changed, 181 insertions(+), 20 deletions(-) create mode 100644 tests/integration/test_storage_iceberg/configs/config.d/cluster.xml create mode 100644 tests/integration/test_storage_iceberg/configs/config.d/query_log.xml diff --git a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml new file mode 100644 index 00000000000..54c08b27abe --- /dev/null +++ b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml b/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml new file mode 100644 index 00000000000..a63e91f41fb --- /dev/null +++ b/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml @@ -0,0 +1,6 @@ + + + system + query_log
+
+
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 36aba550dbd..2d22208b477 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -73,14 +73,38 @@ def started_cluster(): cluster.add_instance( "node1", main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", "configs/config.d/named_collections.xml", "configs/config.d/filesystem_caches.xml", ], user_configs=["configs/users.d/users.xml"], with_minio=True, with_azurite=True, - stay_alive=True, with_hdfs=with_hdfs, + stay_alive=True, + ) + cluster.add_instance( + "node2", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + "configs/config.d/filesystem_caches.xml", + ], + user_configs=["configs/users.d/users.xml"], + stay_alive=True, + ) + cluster.add_instance( + "node3", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + "configs/config.d/filesystem_caches.xml", + ], + user_configs=["configs/users.d/users.xml"], + stay_alive=True, ) logging.info("Starting cluster...") @@ -182,6 +206,7 @@ def get_creation_expression( cluster, format="Parquet", table_function=False, + run_on_cluster=False, **kwargs, ): if storage_type == "s3": @@ -189,35 +214,56 @@ def get_creation_expression( bucket = kwargs["bucket"] else: bucket = cluster.minio_bucket - print(bucket) - if table_function: - return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + + if run_on_cluster: + assert table_function + return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + if table_function: + return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + elif storage_type == "azure": - if table_function: + if run_on_cluster: + assert table_function return f""" - icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) """ else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + if table_function: + return f""" + icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + elif storage_type == "hdfs": - if table_function: + if run_on_cluster: + assert table_function return f""" - icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') + icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') """ else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" + if table_function: + return f""" + icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" + elif storage_type == "local": + assert not run_on_cluster + if table_function: return f""" icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) @@ -227,6 +273,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});""" + else: raise Exception(f"Unknown iceberg storage type: {storage_type}") @@ -492,6 +539,94 @@ def test_types(started_cluster, format_version, storage_type): ) +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"]) +def test_cluster_table_function(started_cluster, format_version, storage_type): + if is_arm() and storage_type == "hdfs": + pytest.skip("Disabled test IcebergHDFS for aarch64") + + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = ( + "test_iceberg_cluster_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + def add_df(mode): + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode=mode, + format_version=format_version, + ) + + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + logging.info(f"Adding another dataframe. result files: {files}") + + return files + + files = add_df(mode="overwrite") + for i in range(1, len(started_cluster.instances)): + files = add_df(mode="append") + + logging.info(f"Setup complete. files: {files}") + assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1) + + clusters = instance.query(f"SELECT * FROM system.clusters") + logging.info(f"Clusters setup: {clusters}") + + # Regular Query only node1 + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + select_regular = instance.query(f"SELECT * FROM {table_function_expr}").strip().split() + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True + ) + select_cluster = instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split() + + # Simple size check + assert len(select_regular) == 600 + assert len(select_cluster) == 600 + + # Actual check + assert select_cluster == select_regular + + # Check query_log + for replica in started_cluster.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryStart' AND + positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND + position(query, '{TABLE_NAME}') != 0 AND + position(query, 'system.query_log') = 0 AND + NOT is_initial_query + """ + ).strip().split("\n") + + logging.info(f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}") + assert len(cluster_secondary_queries) == 1 + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) def test_delete_files(started_cluster, format_version, storage_type):