From d2efae7511dd5cef218791b94a06eca356cc64cc Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Mon, 18 Nov 2024 17:35:21 +0000 Subject: [PATCH 1/8] enable cluster versions for datalake storages --- .../TableFunctionObjectStorage.cpp | 21 +++++ .../TableFunctionObjectStorageCluster.cpp | 76 +++++++++++++++++-- .../TableFunctionObjectStorageCluster.h | 58 ++++++++++++++ src/TableFunctions/registerTableFunctions.cpp | 1 + src/TableFunctions/registerTableFunctions.h | 1 + 5 files changed, 151 insertions(+), 6 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 12de08afad0..9917b9438d5 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -226,6 +226,27 @@ template class TableFunctionObjectStorage; +#if USE_AVRO && USE_AWS_S3 +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE +template class TableFunctionObjectStorage; +#endif + +#if USE_AVRO && USE_HDFS +template class TableFunctionObjectStorage; +#endif + +#if USE_AWS_S3 && USE_PARQUET +template class TableFunctionObjectStorage; +#endif + +#if USE_AWS_S3 +template class TableFunctionObjectStorage; +#endif + #if USE_AVRO void registerTableFunctionIceberg(TableFunctionFactory & factory) { diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 5ca26aabe32..19f7a23af4d 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -105,15 +105,79 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory) UNUSED(factory); } -#if USE_AWS_S3 -template class TableFunctionObjectStorageCluster; + +void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) +{ + UNUSED(factory); + +#if USE_AVRO && USE_AWS_S3 + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster. Alias to icebergS3)", + .examples{{"icebergCluster", "SELECT * FROM icebergCluster(cluster_name, url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); + + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)", + .examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster_name, url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); #endif -#if USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorageCluster; +#if USE_AVRO && USE_AZURE_BLOB_STORAGE + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)", + .examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); #endif -#if USE_HDFS -template class TableFunctionObjectStorageCluster; +#if USE_AVRO && USE_HDFS + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)", + .examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(url)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); #endif } + +void registerTableFunctionDeltaLakeCluster([[maybe_unused]] TableFunctionFactory & factory) +{ + UNUSED(factory); + +#if USE_AWS_S3 && USE_PARQUET + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)", + .examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +#endif +} + +void registerTableFunctionHudiCluster([[maybe_unused]] TableFunctionFactory & factory) +{ + UNUSED(factory); + +#if USE_AWS_S3 + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)", + .examples{{"hudiCluster", "SELECT * FROM hudiCluster(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +#endif +} + +void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory) +{ + registerTableFunctionIcebergCluster(factory); + registerTableFunctionHudiCluster(factory); + registerTableFunctionDeltaLakeCluster(factory); +} + +} diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 11e6c1fde82..2e031b45684 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -33,6 +33,42 @@ struct HDFSClusterDefinition static constexpr auto storage_type_name = "HDFSCluster"; }; +struct IcebergClusterDefinition +{ + static constexpr auto name = "icebergCluster"; + static constexpr auto storage_type_name = "IcebergS3Cluster"; +}; + +struct IcebergS3ClusterDefinition +{ + static constexpr auto name = "icebergS3Cluster"; + static constexpr auto storage_type_name = "IcebergS3Cluster"; +}; + +struct IcebergAzureClusterDefinition +{ + static constexpr auto name = "icebergAzureCluster"; + static constexpr auto storage_type_name = "IcebergAzureCluster"; +}; + +struct IcebergHDFSClusterDefinition +{ + static constexpr auto name = "icebergHDFSCluster"; + static constexpr auto storage_type_name = "IcebergHDFSCluster"; +}; + +struct DeltaLakeClusterDefinition +{ + static constexpr auto name = "deltaLakeCluster"; + static constexpr auto storage_type_name = "DeltaLakeS3Cluster"; +}; + +struct HudiClusterDefinition +{ + static constexpr auto name = "hudiCluster"; + static constexpr auto storage_type_name = "HudiS3Cluster"; +}; + /** * Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions, * which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster. @@ -79,4 +115,26 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; #endif + +#if USE_AVRO && USE_AWS_S3 +using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE +using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AVRO && USE_HDFS +using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AWS_S3 && USE_PARQUET +using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AWS_S3 +using TableFunctionHudiCluster = TableFunctionObjectStorageCluster; +#endif + } diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index fbe2c7c59ed..131ca783f73 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]] registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorageCluster(factory); registerDataLakeTableFunctions(factory); + registerDataLakeClusterTableFunctions(factory); } } diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index e22ba7346fa..1168a8ef739 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); void registerDataLakeTableFunctions(TableFunctionFactory & factory); +void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory); void registerTableFunctionTimeSeries(TableFunctionFactory & factory); From a29ded494186165469ab207c5fdbc9edb31d9c2a Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Mon, 18 Nov 2024 17:37:24 +0000 Subject: [PATCH 2/8] add test for iceberg --- .../configs/config.d/cluster.xml | 20 ++ .../configs/config.d/query_log.xml | 6 + .../integration/test_storage_iceberg/test.py | 175 ++++++++++++++++-- 3 files changed, 181 insertions(+), 20 deletions(-) create mode 100644 tests/integration/test_storage_iceberg/configs/config.d/cluster.xml create mode 100644 tests/integration/test_storage_iceberg/configs/config.d/query_log.xml diff --git a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml new file mode 100644 index 00000000000..54c08b27abe --- /dev/null +++ b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml b/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml new file mode 100644 index 00000000000..a63e91f41fb --- /dev/null +++ b/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml @@ -0,0 +1,6 @@ + + + system + query_log
+
+
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 36aba550dbd..2d22208b477 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -73,14 +73,38 @@ def started_cluster(): cluster.add_instance( "node1", main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", "configs/config.d/named_collections.xml", "configs/config.d/filesystem_caches.xml", ], user_configs=["configs/users.d/users.xml"], with_minio=True, with_azurite=True, - stay_alive=True, with_hdfs=with_hdfs, + stay_alive=True, + ) + cluster.add_instance( + "node2", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + "configs/config.d/filesystem_caches.xml", + ], + user_configs=["configs/users.d/users.xml"], + stay_alive=True, + ) + cluster.add_instance( + "node3", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + "configs/config.d/filesystem_caches.xml", + ], + user_configs=["configs/users.d/users.xml"], + stay_alive=True, ) logging.info("Starting cluster...") @@ -182,6 +206,7 @@ def get_creation_expression( cluster, format="Parquet", table_function=False, + run_on_cluster=False, **kwargs, ): if storage_type == "s3": @@ -189,35 +214,56 @@ def get_creation_expression( bucket = kwargs["bucket"] else: bucket = cluster.minio_bucket - print(bucket) - if table_function: - return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + + if run_on_cluster: + assert table_function + return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + if table_function: + return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + elif storage_type == "azure": - if table_function: + if run_on_cluster: + assert table_function return f""" - icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) """ else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + if table_function: + return f""" + icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + elif storage_type == "hdfs": - if table_function: + if run_on_cluster: + assert table_function return f""" - icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') + icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') """ else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" + if table_function: + return f""" + icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" + elif storage_type == "local": + assert not run_on_cluster + if table_function: return f""" icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) @@ -227,6 +273,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});""" + else: raise Exception(f"Unknown iceberg storage type: {storage_type}") @@ -492,6 +539,94 @@ def test_types(started_cluster, format_version, storage_type): ) +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"]) +def test_cluster_table_function(started_cluster, format_version, storage_type): + if is_arm() and storage_type == "hdfs": + pytest.skip("Disabled test IcebergHDFS for aarch64") + + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = ( + "test_iceberg_cluster_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + def add_df(mode): + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode=mode, + format_version=format_version, + ) + + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + logging.info(f"Adding another dataframe. result files: {files}") + + return files + + files = add_df(mode="overwrite") + for i in range(1, len(started_cluster.instances)): + files = add_df(mode="append") + + logging.info(f"Setup complete. files: {files}") + assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1) + + clusters = instance.query(f"SELECT * FROM system.clusters") + logging.info(f"Clusters setup: {clusters}") + + # Regular Query only node1 + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + select_regular = instance.query(f"SELECT * FROM {table_function_expr}").strip().split() + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True + ) + select_cluster = instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split() + + # Simple size check + assert len(select_regular) == 600 + assert len(select_cluster) == 600 + + # Actual check + assert select_cluster == select_regular + + # Check query_log + for replica in started_cluster.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryStart' AND + positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND + position(query, '{TABLE_NAME}') != 0 AND + position(query, 'system.query_log') = 0 AND + NOT is_initial_query + """ + ).strip().split("\n") + + logging.info(f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}") + assert len(cluster_secondary_queries) == 1 + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) def test_delete_files(started_cluster, format_version, storage_type): From 014608fb6b68e3abe10c0827a7609e756ff6cdfd Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Mon, 18 Nov 2024 17:51:41 +0000 Subject: [PATCH 3/8] Automatic style fix --- .../integration/test_storage_iceberg/test.py | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 2d22208b477..b36347aa510 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -591,13 +591,21 @@ def test_cluster_table_function(started_cluster, format_version, storage_type): table_function_expr = get_creation_expression( storage_type, TABLE_NAME, started_cluster, table_function=True ) - select_regular = instance.query(f"SELECT * FROM {table_function_expr}").strip().split() + select_regular = ( + instance.query(f"SELECT * FROM {table_function_expr}").strip().split() + ) # Cluster Query with node1 as coordinator table_function_expr_cluster = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + run_on_cluster=True, + ) + select_cluster = ( + instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split() ) - select_cluster = instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split() # Simple size check assert len(select_regular) == 600 @@ -611,8 +619,9 @@ def test_cluster_table_function(started_cluster, format_version, storage_type): replica.query("SYSTEM FLUSH LOGS") for node_name, replica in started_cluster.instances.items(): - cluster_secondary_queries = replica.query( - f""" + cluster_secondary_queries = ( + replica.query( + f""" SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log WHERE type = 'QueryStart' AND @@ -621,9 +630,14 @@ def test_cluster_table_function(started_cluster, format_version, storage_type): position(query, 'system.query_log') = 0 AND NOT is_initial_query """ - ).strip().split("\n") + ) + .strip() + .split("\n") + ) - logging.info(f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}") + logging.info( + f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" + ) assert len(cluster_secondary_queries) == 1 From 6894e280b282a737c6a3f68b9d5b3012ab5bff68 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 19 Nov 2024 12:34:42 +0000 Subject: [PATCH 4/8] fix pr issues --- .../TableFunctionObjectStorage.cpp | 4 +- .../TableFunctionObjectStorageCluster.cpp | 47 ++++++++++--------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 9917b9438d5..02f886436b2 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -235,11 +235,11 @@ template class TableFunctionObjectStorage; #endif -#if USE_AVRO && USE_HDFS +#if USE_AVRO && USE_HDFS template class TableFunctionObjectStorage; #endif -#if USE_AWS_S3 && USE_PARQUET +#if USE_PARQUET && USE_AWS_S3 template class TableFunctionObjectStorage; #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 19f7a23af4d..c7aa2b3cc2c 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory) { .documentation = { .description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)", - .examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}}, + .examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}}, .allow_readonly = false } ); @@ -106,78 +106,83 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory) } +#if USE_AVRO void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) { UNUSED(factory); -#if USE_AVRO && USE_AWS_S3 +#if USE_AWS_S3 factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster. Alias to icebergS3)", - .examples{{"icebergCluster", "SELECT * FROM icebergCluster(cluster_name, url, access_key_id, secret_access_key)", ""}}, + .examples{{"icebergCluster", "SELECT * FROM icebergCluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)", - .examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster_name, url, access_key_id, secret_access_key)", ""}}, + .examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); #endif -#if USE_AVRO && USE_AZURE_BLOB_STORAGE +#if USE_AZURE_BLOB_STORAGE factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)", - .examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(url, access_key_id, secret_access_key)", ""}}, + .examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); #endif -#if USE_AVRO && USE_HDFS +#if USE_HDFS factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)", - .examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(url)", ""}}, + .examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); #endif } +#endif -void registerTableFunctionDeltaLakeCluster([[maybe_unused]] TableFunctionFactory & factory) +#if USE_AWS_S3 +#if USE_PARQUET +void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory) { - UNUSED(factory); - -#if USE_AWS_S3 && USE_PARQUET factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)", - .examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(url, access_key_id, secret_access_key)", ""}}, + .examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); -#endif } +#endif -void registerTableFunctionHudiCluster([[maybe_unused]] TableFunctionFactory & factory) +void registerTableFunctionHudiCluster(TableFunctionFactory & factory) { - UNUSED(factory); - -#if USE_AWS_S3 factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)", - .examples{{"hudiCluster", "SELECT * FROM hudiCluster(url, access_key_id, secret_access_key)", ""}}, + .examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}}, .categories{"DataLake"}}, .allow_readonly = false}); -#endif } +#endif void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory) { + UNUSED(factory); +#if USE_AVRO registerTableFunctionIcebergCluster(factory); - registerTableFunctionHudiCluster(factory); +#endif +#if USE_AWS_S3 +#if USE_PARQUET registerTableFunctionDeltaLakeCluster(factory); +#endif + registerTableFunctionHudiCluster(factory); +#endif } } From a367de9977d07994d38bf710c89be67f57e75561 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 19 Nov 2024 12:49:59 +0000 Subject: [PATCH 5/8] add docs --- .../table-functions/deltalake.md | 2 +- .../table-functions/deltalakeCluster.md | 30 ++++++++++++ docs/en/sql-reference/table-functions/hudi.md | 2 +- .../table-functions/hudiCluster.md | 30 ++++++++++++ .../sql-reference/table-functions/iceberg.md | 1 + .../table-functions/icebergCluster.md | 47 +++++++++++++++++++ 6 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 docs/en/sql-reference/table-functions/deltalakeCluster.md create mode 100644 docs/en/sql-reference/table-functions/hudiCluster.md create mode 100644 docs/en/sql-reference/table-functions/icebergCluster.md diff --git a/docs/en/sql-reference/table-functions/deltalake.md b/docs/en/sql-reference/table-functions/deltalake.md index 885d8df6a1e..4f8515a539f 100644 --- a/docs/en/sql-reference/table-functions/deltalake.md +++ b/docs/en/sql-reference/table-functions/deltalake.md @@ -49,4 +49,4 @@ LIMIT 2 **See Also** - [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md) - +- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md) diff --git a/docs/en/sql-reference/table-functions/deltalakeCluster.md b/docs/en/sql-reference/table-functions/deltalakeCluster.md new file mode 100644 index 00000000000..49c2264823f --- /dev/null +++ b/docs/en/sql-reference/table-functions/deltalakeCluster.md @@ -0,0 +1,30 @@ +--- +slug: /en/sql-reference/table-functions/deltalakeCluster +sidebar_position: 46 +sidebar_label: deltaLakeCluster +title: "deltaLakeCluster Table Function" +--- +This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function. + +Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression]) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. + +- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function. + +**Returned value** + +A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3. + +**See Also** + +- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md) +- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md) diff --git a/docs/en/sql-reference/table-functions/hudi.md b/docs/en/sql-reference/table-functions/hudi.md index 959a32fe26d..f4cdb0bf948 100644 --- a/docs/en/sql-reference/table-functions/hudi.md +++ b/docs/en/sql-reference/table-functions/hudi.md @@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl **See Also** - [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md) - +- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md) diff --git a/docs/en/sql-reference/table-functions/hudiCluster.md b/docs/en/sql-reference/table-functions/hudiCluster.md new file mode 100644 index 00000000000..985b7479f66 --- /dev/null +++ b/docs/en/sql-reference/table-functions/hudiCluster.md @@ -0,0 +1,30 @@ +--- +slug: /en/sql-reference/table-functions/hudiCluster +sidebar_position: 86 +sidebar_label: hudiCluster +title: "hudiCluster Table Function" +--- +This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function. + +Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression]) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. + +- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function. + +**Returned value** + +A table with the specified structure for reading data from cluster in the specified Hudi table in S3. + +**See Also** + +- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md) +- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md) diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md index 4f54b2cd440..28063330008 100644 --- a/docs/en/sql-reference/table-functions/iceberg.md +++ b/docs/en/sql-reference/table-functions/iceberg.md @@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now. **See Also** - [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md) +- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md) diff --git a/docs/en/sql-reference/table-functions/icebergCluster.md b/docs/en/sql-reference/table-functions/icebergCluster.md new file mode 100644 index 00000000000..fb0d9d4cb51 --- /dev/null +++ b/docs/en/sql-reference/table-functions/icebergCluster.md @@ -0,0 +1,47 @@ +--- +slug: /en/sql-reference/table-functions/icebergCluster +sidebar_position: 91 +sidebar_label: icebergCluster +title: "icebergCluster Table Function" +--- +This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function. + +Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +icebergS3(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) +icebergS3(cluster_name, named_collection[, option=value [,..]]) + +icebergAzure(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) +icebergAzure(cluster_name, named_collection[, option=value [,..]]) + +icebergHDFS(cluster_name, path_to_table, [,format] [,compression_method]) +icebergHDFS(cluster_name, named_collection[, option=value [,..]]) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. + +- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function. + +**Returned value** + +A table with the specified structure for reading data from cluster in the specified Iceberg table. + +**Examples** + +```sql +SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') +``` + +**Aliases** + +Table function `icebergCluster` is an alias to `icebergS3Cluster` now. + +**See Also** + +- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md) +- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md) From 0951991c1dc97c46ef287714ec45571f9fd284a7 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 19 Nov 2024 13:10:42 +0000 Subject: [PATCH 6/8] update aspell-dict.txt --- utils/check-style/aspell-ignore/en/aspell-dict.txt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index a53104581bb..03e4d149a8d 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -244,7 +244,10 @@ Deduplication DefaultTableEngine DelayedInserts DeliveryTag +Deltalake DeltaLake +deltalakeCluster +deltaLakeCluster Denormalize DestroyAggregatesThreads DestroyAggregatesThreadsActive @@ -377,10 +380,15 @@ Homebrew's HorizontalDivide Hostname HouseOps +hudi Hudi +hudiCluster +HudiCluster HyperLogLog Hypot IANA +icebergCluster +IcebergCluster IDE IDEs IDNA From 99177c0daf734d09f1eb19fa61eed92e002a49db Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Wed, 20 Nov 2024 11:15:12 +0000 Subject: [PATCH 7/8] remove icebergCluster alias --- src/TableFunctions/TableFunctionObjectStorage.cpp | 1 - src/TableFunctions/TableFunctionObjectStorageCluster.cpp | 7 ------- src/TableFunctions/TableFunctionObjectStorageCluster.h | 7 ------- 3 files changed, 15 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 02f886436b2..1ed803ae5ce 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -227,7 +227,6 @@ template class TableFunctionObjectStorage; #if USE_AVRO && USE_AWS_S3 -template class TableFunctionObjectStorage; template class TableFunctionObjectStorage; #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index c7aa2b3cc2c..be7603f18e6 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -112,13 +112,6 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) UNUSED(factory); #if USE_AWS_S3 - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster. Alias to icebergS3)", - .examples{{"icebergCluster", "SELECT * FROM icebergCluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}}, - .categories{"DataLake"}}, - .allow_readonly = false}); - factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)", diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 2e031b45684..d1fac5fdc2c 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -33,12 +33,6 @@ struct HDFSClusterDefinition static constexpr auto storage_type_name = "HDFSCluster"; }; -struct IcebergClusterDefinition -{ - static constexpr auto name = "icebergCluster"; - static constexpr auto storage_type_name = "IcebergS3Cluster"; -}; - struct IcebergS3ClusterDefinition { static constexpr auto name = "icebergS3Cluster"; @@ -117,7 +111,6 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; #endif From 4ccebd9a2493c2a126761b6caf5d569d67081c12 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Wed, 20 Nov 2024 11:15:39 +0000 Subject: [PATCH 8/8] fix syntax for iceberg in docs --- .../table-functions/icebergCluster.md | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/docs/en/sql-reference/table-functions/icebergCluster.md b/docs/en/sql-reference/table-functions/icebergCluster.md index fb0d9d4cb51..bc444f361d5 100644 --- a/docs/en/sql-reference/table-functions/icebergCluster.md +++ b/docs/en/sql-reference/table-functions/icebergCluster.md @@ -11,14 +11,14 @@ Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in pa **Syntax** ``` sql -icebergS3(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) -icebergS3(cluster_name, named_collection[, option=value [,..]]) +icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) +icebergS3Cluster(cluster_name, named_collection[, option=value [,..]]) -icebergAzure(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) -icebergAzure(cluster_name, named_collection[, option=value [,..]]) +icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) +icebergAzureCluster(cluster_name, named_collection[, option=value [,..]]) -icebergHDFS(cluster_name, path_to_table, [,format] [,compression_method]) -icebergHDFS(cluster_name, named_collection[, option=value [,..]]) +icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method]) +icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]]) ``` **Arguments** @@ -37,10 +37,6 @@ A table with the specified structure for reading data from cluster in the specif SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') ``` -**Aliases** - -Table function `icebergCluster` is an alias to `icebergS3Cluster` now. - **See Also** - [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)