mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
Merge pull request #72045 from ClickHouse/issues/70174/cluster_versions
Enable cluster table functions for DataLake Storages
This commit is contained in:
commit
44b4bd38b9
@ -49,4 +49,4 @@ LIMIT 2
|
||||
**See Also**
|
||||
|
||||
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
||||
|
||||
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)
|
||||
|
30
docs/en/sql-reference/table-functions/deltalakeCluster.md
Normal file
30
docs/en/sql-reference/table-functions/deltalakeCluster.md
Normal file
@ -0,0 +1,30 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/deltalakeCluster
|
||||
sidebar_position: 46
|
||||
sidebar_label: deltaLakeCluster
|
||||
title: "deltaLakeCluster Table Function"
|
||||
---
|
||||
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
|
||||
|
||||
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||
|
||||
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
|
||||
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)
|
@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
|
||||
**See Also**
|
||||
|
||||
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
||||
|
||||
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)
|
||||
|
30
docs/en/sql-reference/table-functions/hudiCluster.md
Normal file
30
docs/en/sql-reference/table-functions/hudiCluster.md
Normal file
@ -0,0 +1,30 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/hudiCluster
|
||||
sidebar_position: 86
|
||||
sidebar_label: hudiCluster
|
||||
title: "hudiCluster Table Function"
|
||||
---
|
||||
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
|
||||
|
||||
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||
|
||||
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
|
||||
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)
|
@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
|
||||
**See Also**
|
||||
|
||||
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
||||
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)
|
||||
|
43
docs/en/sql-reference/table-functions/icebergCluster.md
Normal file
43
docs/en/sql-reference/table-functions/icebergCluster.md
Normal file
@ -0,0 +1,43 @@
|
||||
---
|
||||
slug: /en/sql-reference/table-functions/icebergCluster
|
||||
sidebar_position: 91
|
||||
sidebar_label: icebergCluster
|
||||
title: "icebergCluster Table Function"
|
||||
---
|
||||
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
|
||||
|
||||
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
|
||||
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
|
||||
|
||||
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
|
||||
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
|
||||
|
||||
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
|
||||
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
|
||||
|
||||
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
A table with the specified structure for reading data from cluster in the specified Iceberg table.
|
||||
|
||||
**Examples**
|
||||
|
||||
```sql
|
||||
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
|
||||
```
|
||||
|
||||
**See Also**
|
||||
|
||||
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
|
||||
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)
|
@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
|
||||
#endif
|
||||
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
|
||||
|
||||
#if USE_AVRO && USE_AWS_S3
|
||||
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
|
||||
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_HDFS
|
||||
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_PARQUET && USE_AWS_S3
|
||||
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3
|
||||
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO
|
||||
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
||||
{
|
||||
|
@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
.documentation = {
|
||||
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
|
||||
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
|
||||
.allow_readonly = false
|
||||
}
|
||||
);
|
||||
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
|
||||
UNUSED(factory);
|
||||
}
|
||||
|
||||
|
||||
#if USE_AVRO
|
||||
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
UNUSED(factory);
|
||||
|
||||
#if USE_AWS_S3
|
||||
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
|
||||
factory.registerFunction<TableFunctionIcebergS3Cluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
|
||||
factory.registerFunction<TableFunctionIcebergAzureCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
||||
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3
|
||||
#if USE_PARQUET
|
||||
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionDeltaLakeCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
}
|
||||
#endif
|
||||
|
||||
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionHudiCluster>(
|
||||
{.documentation
|
||||
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
|
||||
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
|
||||
.categories{"DataLake"}},
|
||||
.allow_readonly = false});
|
||||
}
|
||||
#endif
|
||||
|
||||
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
|
||||
{
|
||||
UNUSED(factory);
|
||||
#if USE_AVRO
|
||||
registerTableFunctionIcebergCluster(factory);
|
||||
#endif
|
||||
#if USE_AWS_S3
|
||||
#if USE_PARQUET
|
||||
registerTableFunctionDeltaLakeCluster(factory);
|
||||
#endif
|
||||
registerTableFunctionHudiCluster(factory);
|
||||
#endif
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -33,6 +33,36 @@ struct HDFSClusterDefinition
|
||||
static constexpr auto storage_type_name = "HDFSCluster";
|
||||
};
|
||||
|
||||
struct IcebergS3ClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "icebergS3Cluster";
|
||||
static constexpr auto storage_type_name = "IcebergS3Cluster";
|
||||
};
|
||||
|
||||
struct IcebergAzureClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "icebergAzureCluster";
|
||||
static constexpr auto storage_type_name = "IcebergAzureCluster";
|
||||
};
|
||||
|
||||
struct IcebergHDFSClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "icebergHDFSCluster";
|
||||
static constexpr auto storage_type_name = "IcebergHDFSCluster";
|
||||
};
|
||||
|
||||
struct DeltaLakeClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "deltaLakeCluster";
|
||||
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
|
||||
};
|
||||
|
||||
struct HudiClusterDefinition
|
||||
{
|
||||
static constexpr auto name = "hudiCluster";
|
||||
static constexpr auto storage_type_name = "HudiS3Cluster";
|
||||
};
|
||||
|
||||
/**
|
||||
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
|
||||
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
|
||||
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
|
||||
#if USE_HDFS
|
||||
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_AWS_S3
|
||||
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
|
||||
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AVRO && USE_HDFS
|
||||
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3 && USE_PARQUET
|
||||
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
|
||||
#endif
|
||||
|
||||
#if USE_AWS_S3
|
||||
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
|
||||
#endif
|
||||
|
||||
}
|
||||
|
@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
|
||||
registerTableFunctionObjectStorage(factory);
|
||||
registerTableFunctionObjectStorageCluster(factory);
|
||||
registerDataLakeTableFunctions(factory);
|
||||
registerDataLakeClusterTableFunctions(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
|
||||
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
|
||||
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
|
||||
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
|
||||
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
|
||||
|
||||
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
|
||||
|
||||
|
@ -0,0 +1,20 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<cluster_simple>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node3</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster_simple>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,6 @@
|
||||
<clickhouse>
|
||||
<query_log>
|
||||
<database>system</database>
|
||||
<table>query_log</table>
|
||||
</query_log>
|
||||
</clickhouse>
|
@ -73,14 +73,38 @@ def started_cluster():
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=[
|
||||
"configs/config.d/query_log.xml",
|
||||
"configs/config.d/cluster.xml",
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
with_minio=True,
|
||||
with_azurite=True,
|
||||
stay_alive=True,
|
||||
with_hdfs=with_hdfs,
|
||||
stay_alive=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node2",
|
||||
main_configs=[
|
||||
"configs/config.d/query_log.xml",
|
||||
"configs/config.d/cluster.xml",
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
stay_alive=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node3",
|
||||
main_configs=[
|
||||
"configs/config.d/query_log.xml",
|
||||
"configs/config.d/cluster.xml",
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
logging.info("Starting cluster...")
|
||||
@ -182,6 +206,7 @@ def get_creation_expression(
|
||||
cluster,
|
||||
format="Parquet",
|
||||
table_function=False,
|
||||
run_on_cluster=False,
|
||||
**kwargs,
|
||||
):
|
||||
if storage_type == "s3":
|
||||
@ -189,35 +214,56 @@ def get_creation_expression(
|
||||
bucket = kwargs["bucket"]
|
||||
else:
|
||||
bucket = cluster.minio_bucket
|
||||
print(bucket)
|
||||
if table_function:
|
||||
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||
|
||||
if run_on_cluster:
|
||||
assert table_function
|
||||
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||
else:
|
||||
return f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
|
||||
if table_function:
|
||||
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
|
||||
else:
|
||||
return f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
|
||||
|
||||
elif storage_type == "azure":
|
||||
if table_function:
|
||||
if run_on_cluster:
|
||||
assert table_function
|
||||
return f"""
|
||||
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
"""
|
||||
else:
|
||||
return f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
|
||||
if table_function:
|
||||
return f"""
|
||||
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
"""
|
||||
else:
|
||||
return f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
|
||||
|
||||
elif storage_type == "hdfs":
|
||||
if table_function:
|
||||
if run_on_cluster:
|
||||
assert table_function
|
||||
return f"""
|
||||
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||
"""
|
||||
else:
|
||||
return f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
|
||||
if table_function:
|
||||
return f"""
|
||||
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
|
||||
"""
|
||||
else:
|
||||
return f"""
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
|
||||
|
||||
elif storage_type == "local":
|
||||
assert not run_on_cluster
|
||||
|
||||
if table_function:
|
||||
return f"""
|
||||
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
|
||||
@ -227,6 +273,7 @@ def get_creation_expression(
|
||||
DROP TABLE IF EXISTS {table_name};
|
||||
CREATE TABLE {table_name}
|
||||
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
|
||||
|
||||
else:
|
||||
raise Exception(f"Unknown iceberg storage type: {storage_type}")
|
||||
|
||||
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
|
||||
def test_cluster_table_function(started_cluster, format_version, storage_type):
|
||||
if is_arm() and storage_type == "hdfs":
|
||||
pytest.skip("Disabled test IcebergHDFS for aarch64")
|
||||
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
|
||||
TABLE_NAME = (
|
||||
"test_iceberg_cluster_"
|
||||
+ format_version
|
||||
+ "_"
|
||||
+ storage_type
|
||||
+ "_"
|
||||
+ get_uuid_str()
|
||||
)
|
||||
|
||||
def add_df(mode):
|
||||
write_iceberg_from_df(
|
||||
spark,
|
||||
generate_data(spark, 0, 100),
|
||||
TABLE_NAME,
|
||||
mode=mode,
|
||||
format_version=format_version,
|
||||
)
|
||||
|
||||
files = default_upload_directory(
|
||||
started_cluster,
|
||||
storage_type,
|
||||
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||
)
|
||||
|
||||
logging.info(f"Adding another dataframe. result files: {files}")
|
||||
|
||||
return files
|
||||
|
||||
files = add_df(mode="overwrite")
|
||||
for i in range(1, len(started_cluster.instances)):
|
||||
files = add_df(mode="append")
|
||||
|
||||
logging.info(f"Setup complete. files: {files}")
|
||||
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
|
||||
|
||||
clusters = instance.query(f"SELECT * FROM system.clusters")
|
||||
logging.info(f"Clusters setup: {clusters}")
|
||||
|
||||
# Regular Query only node1
|
||||
table_function_expr = get_creation_expression(
|
||||
storage_type, TABLE_NAME, started_cluster, table_function=True
|
||||
)
|
||||
select_regular = (
|
||||
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
|
||||
)
|
||||
|
||||
# Cluster Query with node1 as coordinator
|
||||
table_function_expr_cluster = get_creation_expression(
|
||||
storage_type,
|
||||
TABLE_NAME,
|
||||
started_cluster,
|
||||
table_function=True,
|
||||
run_on_cluster=True,
|
||||
)
|
||||
select_cluster = (
|
||||
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
|
||||
)
|
||||
|
||||
# Simple size check
|
||||
assert len(select_regular) == 600
|
||||
assert len(select_cluster) == 600
|
||||
|
||||
# Actual check
|
||||
assert select_cluster == select_regular
|
||||
|
||||
# Check query_log
|
||||
for replica in started_cluster.instances.values():
|
||||
replica.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
for node_name, replica in started_cluster.instances.items():
|
||||
cluster_secondary_queries = (
|
||||
replica.query(
|
||||
f"""
|
||||
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
|
||||
WHERE
|
||||
type = 'QueryStart' AND
|
||||
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
|
||||
position(query, '{TABLE_NAME}') != 0 AND
|
||||
position(query, 'system.query_log') = 0 AND
|
||||
NOT is_initial_query
|
||||
"""
|
||||
)
|
||||
.strip()
|
||||
.split("\n")
|
||||
)
|
||||
|
||||
logging.info(
|
||||
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
|
||||
)
|
||||
assert len(cluster_secondary_queries) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("format_version", ["1", "2"])
|
||||
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
|
||||
def test_delete_files(started_cluster, format_version, storage_type):
|
||||
|
@ -244,7 +244,10 @@ Deduplication
|
||||
DefaultTableEngine
|
||||
DelayedInserts
|
||||
DeliveryTag
|
||||
Deltalake
|
||||
DeltaLake
|
||||
deltalakeCluster
|
||||
deltaLakeCluster
|
||||
Denormalize
|
||||
DestroyAggregatesThreads
|
||||
DestroyAggregatesThreadsActive
|
||||
@ -377,10 +380,15 @@ Homebrew's
|
||||
HorizontalDivide
|
||||
Hostname
|
||||
HouseOps
|
||||
hudi
|
||||
Hudi
|
||||
hudiCluster
|
||||
HudiCluster
|
||||
HyperLogLog
|
||||
Hypot
|
||||
IANA
|
||||
icebergCluster
|
||||
IcebergCluster
|
||||
IDE
|
||||
IDEs
|
||||
IDNA
|
||||
|
Loading…
Reference in New Issue
Block a user