enable cluster versions for datalake storages

This commit is contained in:
Mikhail Artemenko 2024-11-18 17:35:21 +00:00
parent 84828120b3
commit d2efae7511
5 changed files with 151 additions and 6 deletions

View File

@ -226,6 +226,27 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
#endif #endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>; template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO && USE_AWS_S3
template class TableFunctionObjectStorage<IcebergClusterDefinition, StorageS3IcebergConfiguration>;
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
#if USE_AVRO #if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory) void registerTableFunctionIceberg(TableFunctionFactory & factory)
{ {

View File

@ -105,15 +105,79 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
UNUSED(factory); UNUSED(factory);
} }
#if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>; void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO && USE_AWS_S3
factory.registerFunction<TableFunctionIcebergCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster. Alias to icebergS3)",
.examples{{"icebergCluster", "SELECT * FROM icebergCluster(cluster_name, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster_name, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif #endif
#if USE_AZURE_BLOB_STORAGE #if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>; factory.registerFunction<TableFunctionIcebergAzureCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif #endif
#if USE_HDFS #if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>; factory.registerFunction<TableFunctionIcebergHDFSCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(url)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif #endif
} }
void registerTableFunctionDeltaLakeCluster([[maybe_unused]] TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3 && USE_PARQUET
factory.registerFunction<TableFunctionDeltaLakeCluster>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
}
void registerTableFunctionHudiCluster([[maybe_unused]] TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
factory.registerFunction<TableFunctionHudiCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
}
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
{
registerTableFunctionIcebergCluster(factory);
registerTableFunctionHudiCluster(factory);
registerTableFunctionDeltaLakeCluster(factory);
}
}

View File

@ -33,6 +33,42 @@ struct HDFSClusterDefinition
static constexpr auto storage_type_name = "HDFSCluster"; static constexpr auto storage_type_name = "HDFSCluster";
}; };
struct IcebergClusterDefinition
{
static constexpr auto name = "icebergCluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergS3ClusterDefinition
{
static constexpr auto name = "icebergS3Cluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergAzureClusterDefinition
{
static constexpr auto name = "icebergAzureCluster";
static constexpr auto storage_type_name = "IcebergAzureCluster";
};
struct IcebergHDFSClusterDefinition
{
static constexpr auto name = "icebergHDFSCluster";
static constexpr auto storage_type_name = "IcebergHDFSCluster";
};
struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
};
struct HudiClusterDefinition
{
static constexpr auto name = "hudiCluster";
static constexpr auto storage_type_name = "HudiS3Cluster";
};
/** /**
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions, * Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster. * which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
@ -79,4 +115,26 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
#if USE_HDFS #if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>; using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif #endif
#if USE_AVRO && USE_AWS_S3
using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster<IcebergClusterDefinition, StorageS3IcebergConfiguration>;
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
} }

View File

@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory); registerTableFunctionObjectStorageCluster(factory);
registerDataLakeTableFunctions(factory); registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
} }
} }

View File

@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory); void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
void registerTableFunctionTimeSeries(TableFunctionFactory & factory); void registerTableFunctionTimeSeries(TableFunctionFactory & factory);