diff --git a/src/Storages/StorageIceberg.cpp b/src/Storages/StorageIceberg.cpp index 93aa16cd29a..ec5284adf94 100644 --- a/src/Storages/StorageIceberg.cpp +++ b/src/Storages/StorageIceberg.cpp @@ -46,12 +46,35 @@ namespace ErrorCodes extern const int ILLEGAL_COLUMN; } -IcebergMetaParser::IcebergMetaParser(const StorageS3::Configuration & configuration_, ContextPtr context_) +std::shared_ptr +S3MetaReadHelper::createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration) +{ + S3Settings::RequestSettings request_settings; + request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries; + return std::make_shared( + base_configuration.client, + base_configuration.url.bucket, + key, + base_configuration.url.version_id, + request_settings, + context->getReadSettings()); +} +std::vector +S3MetaReadHelper::listFilesMatchSuffix(const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix) +{ + const auto & table_path = base_configuration.url.key; + return S3::listFiles( + *base_configuration.client, base_configuration.url.bucket, table_path, std::filesystem::path(table_path) / directory, suffix); +} + +template +IcebergMetaParser::IcebergMetaParser(const Configuration & configuration_, ContextPtr context_) : base_configuration(configuration_), context(context_) { } -std::vector IcebergMetaParser::getFiles() const +template +std::vector IcebergMetaParser::getFiles() const { auto metadata = getNewestMetaFile(); auto manifest_list = getManiFestList(metadata); @@ -66,29 +89,27 @@ std::vector IcebergMetaParser::getFiles() const return getFilesForRead(manifest_files); } -String IcebergMetaParser::getNewestMetaFile() const +template +String IcebergMetaParser::getNewestMetaFile() const { /// Iceberg stores all the metadata.json in metadata directory, and the /// newest version has the max version name, so we should list all of them /// then find the newest metadata. - const auto & table_path = base_configuration.url.key; - std::vector metadata_files = S3::listFiles( - *base_configuration.client, - base_configuration.url.bucket, - table_path, - std::filesystem::path(table_path) / metadata_directory, - ".json"); + constexpr auto meta_file_suffix = ".json"; + auto metadata_files = MetaReadHelper::listFilesMatchSuffix(base_configuration, metadata_directory, meta_file_suffix); if (metadata_files.empty()) - throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", table_path); + throw Exception( + ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", base_configuration.url.key); auto it = std::max_element(metadata_files.begin(), metadata_files.end()); return *it; } -String IcebergMetaParser::getManiFestList(const String & metadata_name) const +template +String IcebergMetaParser::getManiFestList(const String & metadata_name) const { - auto buffer = createS3ReadBuffer(metadata_name); + auto buffer = MetaReadHelper::createReadBuffer(metadata_name, context, base_configuration); String json_str; readJSONObjectPossiblyInvalid(json_str, *buffer); @@ -132,9 +153,10 @@ parseAvro(const std::unique_ptr & file_reader, const D return columns; } -std::vector IcebergMetaParser::getManifestFiles(const String & manifest_list) const +template +std::vector IcebergMetaParser::getManifestFiles(const String & manifest_list) const { - auto buffer = createS3ReadBuffer(manifest_list); + auto buffer = MetaReadHelper::createReadBuffer(manifest_list, context, base_configuration); auto file_reader = std::make_unique(std::make_unique(*buffer)); @@ -168,12 +190,13 @@ std::vector IcebergMetaParser::getManifestFiles(const String & manifest_ col->getFamilyName()); } -std::vector IcebergMetaParser::getFilesForRead(const std::vector & manifest_files) const +template +std::vector IcebergMetaParser::getFilesForRead(const std::vector & manifest_files) const { std::vector keys; for (const auto & manifest_file : manifest_files) { - auto buffer = createS3ReadBuffer(manifest_file); + auto buffer = MetaReadHelper::createReadBuffer(manifest_file, context, base_configuration); auto file_reader = std::make_unique(std::make_unique(*buffer)); @@ -222,27 +245,27 @@ std::vector IcebergMetaParser::getFilesForRead(const std::vector return keys; } -std::shared_ptr IcebergMetaParser::createS3ReadBuffer(const String & key) const -{ - S3Settings::RequestSettings request_settings; - request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries; - return std::make_shared( - base_configuration.client, - base_configuration.url.bucket, - key, - base_configuration.url.version_id, - request_settings, - context->getReadSettings()); -} - // generateQueryFromKeys constructs query from all parquet filenames // for underlying StorageS3 engine -String IcebergMetaParser::generateQueryFromKeys(const std::vector & keys, const String &) +template +String IcebergMetaParser::generateQueryFromKeys(const std::vector & keys, const String &) { std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); return new_query; } +template IcebergMetaParser::IcebergMetaParser( + const StorageS3::Configuration & configuration_, ContextPtr context_); +template std::vector IcebergMetaParser::getFiles() const; +template String IcebergMetaParser::generateQueryFromKeys( + const std::vector & keys, const String & format); +template String IcebergMetaParser::getNewestMetaFile() const; +template String IcebergMetaParser::getManiFestList(const String & metadata_name) const; +template std::vector +IcebergMetaParser::getManifestFiles(const String & manifest_list) const; +template std::vector +IcebergMetaParser::getFilesForRead(const std::vector & manifest_files) const; + void registerStorageIceberg(StorageFactory & factory) { factory.registerStorage( diff --git a/src/Storages/StorageIceberg.h b/src/Storages/StorageIceberg.h index d351f073c8b..8a535643fce 100644 --- a/src/Storages/StorageIceberg.h +++ b/src/Storages/StorageIceberg.h @@ -12,16 +12,26 @@ namespace DB { +struct S3MetaReadHelper +{ + static std::shared_ptr + createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration); + + static std::vector + listFilesMatchSuffix(const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix); +}; + // Class to parse iceberg metadata and find files needed for query in table // Iceberg table directory outlook: // table/ // data/ // metadata/ // The metadata has three layers: metadata -> manifest list -> manifest files +template class IcebergMetaParser { public: - IcebergMetaParser(const StorageS3::Configuration & configuration_, ContextPtr context_); + IcebergMetaParser(const Configuration & configuration_, ContextPtr context_); std::vector getFiles() const; @@ -29,7 +39,7 @@ public: private: static constexpr auto metadata_directory = "metadata"; - StorageS3::Configuration base_configuration; + Configuration base_configuration; ContextPtr context; /// Just get file name @@ -47,7 +57,7 @@ struct StorageIcebergName static constexpr auto data_directory_prefix = "data"; }; -using StorageIceberg = IStorageDataLake; +using StorageIceberg = IStorageDataLake>; } #endif diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index f9e5b35dd60..40ebb6456fe 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -39,6 +39,8 @@ template class IStorageDataLake; struct StorageIcebergName; +struct S3MetaReadHelper; +template class IcebergMetaParser; struct StorageDeltaLakeName; @@ -317,7 +319,7 @@ private: friend class TableFunctionS3Cluster; friend class IStorageDataLake; friend class IStorageDataLake; - friend class IStorageDataLake; + friend class IStorageDataLake>; Configuration s3_configuration; std::vector keys; diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index e1ad1f2f665..0ce8cd1f8df 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -4,9 +4,7 @@ #if USE_AWS_S3 -#include -#include - +# include # include # include # include @@ -15,21 +13,19 @@ # include # include # include +# include # include # include +# include namespace DB { - namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -class Context; -class TableFunctionS3Cluster; - -template +template class ITableFunctionDataLake : public ITableFunction { public: @@ -44,8 +40,6 @@ protected: executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const override { - S3::URI s3_uri(configuration.url); - ColumnsDescription columns; if (configuration.structure != "auto") columns = parseColumnsListFromString(configuration.structure, context); @@ -58,7 +52,7 @@ protected: return storage; } - const char * getStorageTypeName() const override { return name; } + const char * getStorageTypeName() const override { return Storage::name; } ColumnsDescription getActualTableStructure(ContextPtr context) const override { @@ -93,85 +87,14 @@ protected: auto & args = args_func.at(0)->children; - parseArgumentsImpl(message, args, context, configuration); + TableFunctionS3::parseArgumentsImpl(message, args, context, configuration); + + if (configuration.format == "auto") + configuration.format = "Parquet"; } - - static void - parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & base_configuration) - { - if (args.empty() || args.size() > 6) - throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers_from_ast, context); - if (header_it != args.end()) - args.erase(header_it); - - for (auto & arg : args) - arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); - - /// Size -> argument indexes - static auto size_to_args = std::map>{ - {1, {{}}}, - {2, {{"format", 1}}}, - {5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}}, - {6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}}; - - std::map args_to_idx; - /// For 4 arguments we support 2 possible variants: - /// dataLake(source, format, structure, compression_method) and iceberg(source, access_key_id, access_key_id, format) - /// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not. - if (args.size() == 4) - { - auto second_arg = checkAndGetLiteralArgument(args[1], "format/access_key_id"); - if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) - args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}}; - - else - args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}}; - } - /// For 3 arguments we support 2 possible variants: - /// dataLake(source, format, structure) and iceberg(source, access_key_id, access_key_id) - /// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not. - else if (args.size() == 3) - { - auto second_arg = checkAndGetLiteralArgument(args[1], "format/access_key_id"); - if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) - args_to_idx = {{"format", 1}, {"structure", 2}}; - else - args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}}; - } - else - { - args_to_idx = size_to_args[args.size()]; - } - - /// This argument is always the first - base_configuration.url = S3::URI(checkAndGetLiteralArgument(args[0], "url")); - - if (args_to_idx.contains("format")) - base_configuration.format = checkAndGetLiteralArgument(args[args_to_idx["format"]], "format"); - else - base_configuration.format = "Parquet"; - - if (args_to_idx.contains("structure")) - base_configuration.structure = checkAndGetLiteralArgument(args[args_to_idx["structure"]], "structure"); - - if (args_to_idx.contains("compression_method")) - base_configuration.compression_method - = checkAndGetLiteralArgument(args[args_to_idx["compression_method"]], "compression_method"); - - if (args_to_idx.contains("access_key_id")) - base_configuration.auth_settings.access_key_id - = checkAndGetLiteralArgument(args[args_to_idx["access_key_id"]], "access_key_id"); - - if (args_to_idx.contains("secret_access_key")) - base_configuration.auth_settings.secret_access_key - = checkAndGetLiteralArgument(args[args_to_idx["secret_access_key"]], "secret_access_key"); - } - - mutable StorageS3::Configuration configuration; - }; + mutable Configuration configuration; +}; } #endif diff --git a/src/TableFunctions/TableFunctionDeltaLake.cpp b/src/TableFunctions/TableFunctionDeltaLake.cpp index d968c58bf94..a5602814abc 100644 --- a/src/TableFunctions/TableFunctionDeltaLake.cpp +++ b/src/TableFunctions/TableFunctionDeltaLake.cpp @@ -15,7 +15,7 @@ struct TableFunctionDeltaLakeName static constexpr auto name = "deltaLake"; }; -using TableFunctionDeltaLake = ITableFunctionDataLake; +using TableFunctionDeltaLake = ITableFunctionDataLake; void registerTableFunctionDeltaLake(TableFunctionFactory & factory) { diff --git a/src/TableFunctions/TableFunctionHudi.cpp b/src/TableFunctions/TableFunctionHudi.cpp index 1139b4d4a2e..d7ddc49c1de 100644 --- a/src/TableFunctions/TableFunctionHudi.cpp +++ b/src/TableFunctions/TableFunctionHudi.cpp @@ -14,7 +14,7 @@ struct TableFunctionHudiName { static constexpr auto name = "hudi"; }; -using TableFunctionHudi = ITableFunctionDataLake; +using TableFunctionHudi = ITableFunctionDataLake; void registerTableFunctionHudi(TableFunctionFactory & factory) { diff --git a/src/TableFunctions/TableFunctionIceberg.cpp b/src/TableFunctions/TableFunctionIceberg.cpp index 2555b197d30..eb25a115055 100644 --- a/src/TableFunctions/TableFunctionIceberg.cpp +++ b/src/TableFunctions/TableFunctionIceberg.cpp @@ -16,7 +16,7 @@ struct TableFunctionIcebergName static constexpr auto name = "iceberg"; }; -using TableFunctionIceberg = ITableFunctionDataLake; +using TableFunctionIceberg = ITableFunctionDataLake; void registerTableFunctionIceberg(TableFunctionFactory & factory) { diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index 0abddfef3e4..8f54b04f550 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -29,7 +29,9 @@ namespace ErrorCodes /// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name -void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration) +template +void TableFunctionS3::parseArgumentsImpl( + const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration) { if (auto named_collection = tryGetNamedCollectionWithOverrides(args)) { @@ -105,10 +107,14 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar s3_configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument(args[args_to_idx["secret_access_key"]], "secret_access_key"); } - if (s3_configuration.format == "auto") + /// For DataLake table functions, we should specify default format. + if (s3_configuration.format == "auto" && get_format_from_file) s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url.uri.getPath(), true); } +template void TableFunctionS3::parseArgumentsImpl( + const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration); + void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context) { /// Parse args diff --git a/src/TableFunctions/TableFunctionS3.h b/src/TableFunctions/TableFunctionS3.h index 2adddef538c..5f197a6e058 100644 --- a/src/TableFunctions/TableFunctionS3.h +++ b/src/TableFunctions/TableFunctionS3.h @@ -12,7 +12,6 @@ namespace DB { class Context; -class TableFunctionS3Cluster; /* s3(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary storage for a file in S3. */ @@ -37,8 +36,10 @@ public: return {"_path", "_file"}; } + template + static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration); + protected: - friend class TableFunctionS3Cluster; StoragePtr executeImpl( const ASTPtr & ast_function, @@ -51,8 +52,6 @@ protected: ColumnsDescription getActualTableStructure(ContextPtr context) const override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; - static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration); - mutable StorageS3::Configuration configuration; ColumnsDescription structure_hint; }; diff --git a/src/TableFunctions/TableFunctionS3Cluster.cpp b/src/TableFunctions/TableFunctionS3Cluster.cpp index 6efe686ca5d..9b18bac536a 100644 --- a/src/TableFunctions/TableFunctionS3Cluster.cpp +++ b/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -73,7 +73,7 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args)); /// StorageS3ClusterConfiguration inherints from StorageS3::Configuration, so it is safe to upcast it. - TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast(configuration)); + TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast(configuration)); }