From a228e4fa895979ee1d5bf6de71242ece82bc21e6 Mon Sep 17 00:00:00 2001 From: divanik Date: Thu, 24 Oct 2024 13:28:32 +0000 Subject: [PATCH] Fix issues with tests --- .../DataLakes/DataLakeConfiguration.h | 14 +++++++ .../ObjectStorage/StorageObjectStorage.cpp | 39 +++++++++++++++---- .../ObjectStorage/StorageObjectStorage.h | 11 ++++-- .../registerStorageObjectStorage.cpp | 22 ++++++++++- .../TableFunctionObjectStorage.cpp | 25 ++++-------- .../TableFunctionObjectStorage.h | 9 +++++ 6 files changed, 89 insertions(+), 31 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index d19b7f65640..c01e615acd9 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -10,6 +10,7 @@ # include # include # include +# include # include # include # include @@ -46,6 +47,18 @@ public: BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns()); } + std::optional tryGetTableStructureFromMetadata() const override + { + if (!current_metadata) + return std::nullopt; + auto schema_from_metadata = current_metadata->getTableSchema(); + if (!schema_from_metadata.empty()) + { + return ColumnsDescription(std::move(schema_from_metadata)); + } + return std::nullopt; + } + private: DataLakeMetadataPtr current_metadata; @@ -77,6 +90,7 @@ private: using StorageS3IcebergConfiguration = DataLakeConfiguration; using StorageAzureIcebergConfiguration = DataLakeConfiguration; using StorageLocalIcebergConfiguration = DataLakeConfiguration; +using StorageHDFSIcebergConfiguration = DataLakeConfiguration; using StorageS3DeltaLakeConfiguration = DataLakeConfiguration; using StorageS3HudiConfiguration = DataLakeConfiguration; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 86630b897d0..f24f152ecb4 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -14,14 +14,15 @@ #include #include -#include #include -#include -#include #include +#include #include #include -#include +#include +#include +#include +#include "Storages/ColumnsDescription.h" namespace DB @@ -252,6 +253,11 @@ ReadFromFormatInfo StorageObjectStorage::Configuration::prepareReadingFromFormat return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); } +std::optional StorageObjectStorage::Configuration::tryGetTableStructureFromMetadata() const +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration"); +} + void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, @@ -409,6 +415,16 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( std::string & sample_path, const ContextPtr & context) { + if (configuration->isDataLakeConfiguration()) + { + configuration->update(object_storage, context); + auto table_structure = configuration->tryGetTableStructureFromMetadata(); + if (table_structure) + { + return table_structure.value(); + } + } + ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context); @@ -489,10 +505,17 @@ void StorageObjectStorage::Configuration::initialize( if (configuration.format == "auto") { - configuration.format = FormatFactory::instance().tryGetFormatFromFileName( - configuration.isArchive() - ? configuration.getPathInArchive() - : configuration.getPath()).value_or("auto"); + if (configuration.isDataLakeConfiguration()) + { + configuration.format = "Parquet"; + } + else + { + configuration.format + = FormatFactory::instance() + .tryGetFormatFromFileName(configuration.isArchive() ? configuration.getPathInArchive() : configuration.getPath()) + .value_or("auto"); + } } else FormatFactory::instance().checkFormatName(configuration.format); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 9781d5dbe6e..21a6cdeba6f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -1,12 +1,13 @@ #pragma once -#include -#include #include -#include +#include #include -#include #include +#include #include +#include +#include +#include "Storages/ColumnsDescription.h" namespace DB { @@ -208,6 +209,8 @@ public: bool supports_subset_of_columns, ContextPtr local_context); + virtual std::optional tryGetTableStructureFromMetadata() const; + String format = "auto"; String compression_method = "auto"; String structure = "auto"; diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 570e888da91..1e231a8e3e4 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -153,6 +153,7 @@ void registerStorageObjectStorage(StorageFactory & factory) void registerStorageIceberg(StorageFactory & factory) { +#if USE_AWS_S3 factory.registerStorage( "Iceberg", [&](const StorageFactory::Arguments & args) @@ -182,7 +183,8 @@ void registerStorageIceberg(StorageFactory & factory) .supports_schema_inference = true, .source_access_type = AccessType::S3, }); - +#endif +#if USE_AZURE_BLOB_STORAGE factory.registerStorage( "IcebergAzure", [&](const StorageFactory::Arguments & args) @@ -197,7 +199,7 @@ void registerStorageIceberg(StorageFactory & factory) .supports_schema_inference = true, .source_access_type = AccessType::AZURE, }); - +#endif factory.registerStorage( "IcebergLocal", [&](const StorageFactory::Arguments & args) @@ -212,6 +214,22 @@ void registerStorageIceberg(StorageFactory & factory) .supports_schema_inference = true, .source_access_type = AccessType::FILE, }); +#if USE_HDFS + factory.registerStorage( + "IcebergHDFS", + [&](const StorageFactory::Arguments & args) + { + auto configuration = std::make_shared(); + StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); + + return createStorageObjectStorage(args, configuration, args.getLocalContext()); + }, + { + .supports_settings = false, + .supports_schema_inference = true, + .source_access_type = AccessType::HDFS, + }); +#endif } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index ecfc1e462f0..509ef92e8b2 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -251,6 +251,14 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) .categories{"DataLake"}}, .allow_readonly = false}); # endif +# if USE_HDFS + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)", + .examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +# endif factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored locally.)", @@ -297,21 +305,4 @@ void registerDataLakeTableFunctions(TableFunctionFactory & factory) registerTableFunctionHudi(factory); #endif } - -#if USE_AVRO -# if USE_AWS_S3 -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; -# endif -# if USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorage; -# endif -template class TableFunctionObjectStorage; -#endif -#if USE_AWS_S3 -# if USE_PARQUET -template class TableFunctionObjectStorage; -# endif -template class TableFunctionObjectStorage; -#endif } diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 3cf86f982d1..19cd637bd80 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -86,6 +86,12 @@ struct IcebergLocalDefinition static constexpr auto storage_type_name = "Local"; }; +struct IcebergHDFSDefinition +{ + static constexpr auto name = "icebergHDFS"; + static constexpr auto storage_type_name = "HDFS"; +}; + struct DeltaLakeDefinition { static constexpr auto name = "deltaLake"; @@ -184,6 +190,9 @@ using TableFunctionIcebergS3 = TableFunctionObjectStorage; # endif +# if USE_HDFS +using TableFunctionIcebergHDFS = TableFunctionObjectStorage; +# endif using TableFunctionIcebergLocal = TableFunctionObjectStorage; #endif #if USE_AWS_S3