Add different iceberg tables

This commit is contained in:
divanik 2024-07-22 09:50:47 +00:00
parent 57181a5a48
commit c59949d057
3 changed files with 96 additions and 18 deletions

View File

@ -2,10 +2,12 @@
#if USE_AWS_S3
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
# include <Storages/ObjectStorage/Azure/Configuration.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h>
namespace DB
@ -22,6 +24,54 @@ void registerStorageIceberg(StorageFactory & factory)
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergS3",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
@ -29,7 +79,7 @@ void registerStorageIceberg(StorageFactory & factory)
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
.source_access_type = AccessType::FILE,
});
}

View File

@ -76,6 +76,21 @@ struct TableFunctionIcebergName
static constexpr auto name = "iceberg";
};
struct TableFunctionIcebergS3Name
{
static constexpr auto name = "icebergS3";
};
struct TableFunctionIcebergAzureName
{
static constexpr auto name = "icebergAzure";
};
struct TableFunctionIcebergLocalName
{
static constexpr auto name = "icebergLocal";
};
struct TableFunctionDeltaLakeName
{
static constexpr auto name = "deltaLake";
@ -86,14 +101,20 @@ struct TableFunctionHudiName
static constexpr auto name = "hudi";
};
#if USE_AWS_S3
#if USE_AVRO
# if USE_AWS_S3
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
using TableFunctionIcebergS3 = ITableFunctionDataLake<TableFunctionIcebergS3Name, StorageIceberg, TableFunctionS3>;
# endif
# if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = ITableFunctionDataLake<TableFunctionIcebergAzureName, StorageIceberg, TableFunctionAzureBlob>;
# endif
using TableFunctionIcebergLocal = ITableFunctionDataLake<TableFunctionIcebergLocalName, StorageIceberg, TableFunctionLocal>;
#endif
#if USE_PARQUET
#if USE_AWS_S3
# if USE_PARQUET
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, TableFunctionS3>;
#endif
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, TableFunctionS3>;
#endif
}

View File

@ -4,24 +4,31 @@
namespace DB
{
#if USE_AWS_S3
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
# if USE_AWS_S3
factory.registerFunction<TableFunctionIceberg>(
{
.documentation =
{
.description=R"(The table function can be used to read the Iceberg table stored on object store.)",
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}
},
.allow_readonly = false
});
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
# if USE_AZURE_BLOB_STORAGE
# endif
}
#endif
#if USE_PARQUET
#if USE_AWS_S3
# if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(