From 13f29a72423d2006278a701a2ab7fec1539c5a6d Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 28 Mar 2023 15:39:59 +0200 Subject: [PATCH] Better --- src/Storages/IStorageDataLake.cpp | 45 ++++ src/Storages/IStorageDataLake.h | 127 +++++---- src/Storages/S3DataLakeMetadataReadHelper.cpp | 51 +--- src/Storages/S3DataLakeMetadataReadHelper.h | 9 +- src/Storages/StorageDeltaLake.cpp | 33 --- src/Storages/StorageDeltaLake.h | 2 +- src/Storages/StorageHudi.cpp | 33 --- src/Storages/StorageIceberg.cpp | 20 +- src/Storages/StorageS3.cpp | 244 +++++++++--------- src/Storages/StorageS3.h | 44 ++-- src/Storages/StorageS3Cluster.cpp | 11 +- src/Storages/StorageS3Cluster.h | 4 + src/TableFunctions/ITableFunctionDataLake.h | 23 +- src/TableFunctions/TableFunctionS3.cpp | 5 +- src/TableFunctions/TableFunctionS3Cluster.cpp | 2 +- .../configs/config.d/named_collections.xml | 9 + 16 files changed, 311 insertions(+), 351 deletions(-) create mode 100644 src/Storages/IStorageDataLake.cpp delete mode 100644 src/Storages/StorageDeltaLake.cpp delete mode 100644 src/Storages/StorageHudi.cpp create mode 100644 tests/integration/test_storage_delta/configs/config.d/named_collections.xml diff --git a/src/Storages/IStorageDataLake.cpp b/src/Storages/IStorageDataLake.cpp new file mode 100644 index 00000000000..828e614d0d2 --- /dev/null +++ b/src/Storages/IStorageDataLake.cpp @@ -0,0 +1,45 @@ +#include +#include "config.h" + +#if USE_AWS_S3 + +#include +#include +#include + + +namespace DB +{ + +#define REGISTER_DATA_LAKE_STORAGE(STORAGE, NAME) \ + factory.registerStorage( \ + NAME, \ + [](const StorageFactory::Arguments & args) \ + { \ + return createDataLakeStorage(args);\ + }, \ + { \ + .supports_settings = false, \ + .supports_schema_inference = true, \ + .source_access_type = AccessType::S3, \ + }); + + +void registerStorageDeltaLake(StorageFactory & factory) +{ + REGISTER_DATA_LAKE_STORAGE(StorageDeltaLake, StorageDeltaLakeName::name) +} + +void registerStorageIceberg(StorageFactory & factory) +{ + REGISTER_DATA_LAKE_STORAGE(StorageIceberg, StorageIcebergName::name) +} + +void registerStorageHudi(StorageFactory & factory) +{ + REGISTER_DATA_LAKE_STORAGE(StorageHudi, StorageHudiName::name) +} + +} + +#endif diff --git a/src/Storages/IStorageDataLake.h b/src/Storages/IStorageDataLake.h index afd9cb7d5bf..64dc70a57b7 100644 --- a/src/Storages/IStorageDataLake.h +++ b/src/Storages/IStorageDataLake.h @@ -6,6 +6,9 @@ #include #include +#include +#include +#include namespace DB @@ -15,78 +18,88 @@ template class IStorageDataLake : public Storage { public: - using IConfiguration = typename Storage::Configuration; - using ConfigurationPtr = std::unique_ptr; + static constexpr auto name = Name::name; + using Configuration = typename Storage::Configuration; template - explicit IStorageDataLake( - ConfigurationPtr configuration_, - ContextPtr context_, - Args && ...args) - : Storage(createConfigurationForDataRead(context_, *configuration_), context_, std::forward(args)...) - , base_configuration(std::move(configuration_)) - , log(&Poco::Logger::get(getName())) - { - } + explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args) + : Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward(args)...) + , base_configuration(configuration_) + , log(&Poco::Logger::get(getName())) {} - struct Configuration : public Storage::Configuration - { - template - explicit Configuration(Args && ...args) : Storage::Configuration(std::forward(args)...) {} - - bool update(ContextPtr /* context */) override - { - return false; - } - }; - - static constexpr auto name = Name::name; String getName() const override { return name; } - static ConfigurationPtr getConfiguration(ASTs & engine_args, ContextPtr local_context) + static ColumnsDescription getTableStructureFromData( + Configuration & base_configuration, + const std::optional & format_settings, + ContextPtr local_context) { - auto configuration = Storage::getConfiguration(engine_args, local_context, false /* get_format_from_file */); - /// Data lakes have default format as parquet. - if (configuration->format == "auto") - configuration->format = "Parquet"; + auto configuration = getConfigurationForDataRead(base_configuration, local_context); + return Storage::getTableStructureFromData(configuration, format_settings, local_context); + } + + static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context) + { + return Storage::getConfiguration(engine_args, local_context, /* get_format_from_file */false); + } + + void updateConfigurationIfChanged(ContextPtr local_context) override + { + const bool updated = base_configuration.update(local_context); + auto new_keys = getDataFiles(base_configuration, local_context); + + if (!updated && new_keys == Storage::getConfiguration().keys) + return; + + Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys)); + } + +private: + static Configuration getConfigurationForDataRead( + const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {}) + { + auto configuration{base_configuration}; + configuration.update(local_context); + configuration.static_configuration = true; + + if (keys.empty()) + configuration.keys = getDataFiles(configuration, local_context); + else + configuration.keys = keys; + + LOG_TRACE(&Poco::Logger::get("DataLake"), "New configuration path: {}", configuration.getPath()); + + configuration.connect(local_context); return configuration; } - static ColumnsDescription getTableStructureFromData( - IConfiguration & base_configuration, const std::optional & format_settings, ContextPtr local_context) + static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context) { - auto configuration = createConfigurationForDataRead(local_context, base_configuration); - return Storage::getTableStructureFromData(*configuration, format_settings, local_context, /*object_infos*/ nullptr); + auto files = MetadataParser(configuration, local_context).getFiles(); + for (auto & file : files) + file = std::filesystem::path(configuration.getPath()) / file; + return files; } - // void updateConfiguration(ContextPtr local_context, Configuration & configuration) override - // { - // configuration = createConfigurationForDataRead(local_context, base_configuration); - // } - -private: - static ConfigurationPtr createConfigurationForDataRead(ContextPtr local_context, const IConfiguration & base_configuration) - { - Poco::Logger * log = &Poco::Logger::get("IStorageDataLake"); - - auto new_configuration = std::make_unique(base_configuration); - new_configuration->update(local_context); - - MetadataParser parser{*new_configuration, local_context}; - auto keys = parser.getFiles(); - auto files = MetadataParser::generateQueryFromKeys(keys, new_configuration->format); - - LOG_TEST(log, "FILES: {}, ", fmt::join(files, ", ")); - new_configuration->appendToPath(files); - - LOG_DEBUG(log, "URL for read: {}", new_configuration->getPath()); - return new_configuration; - } - - ConfigurationPtr base_configuration; + Configuration base_configuration; Poco::Logger * log; }; + +template +static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args) +{ + auto configuration = DataLake::getConfiguration(args.engine_args, args.getLocalContext()); + + /// Data lakes use parquet format, no need for schema inference. + if (configuration.format == "auto") + configuration.format = "Parquet"; + + return std::make_shared( + configuration, args.getContext(), args.table_id, args.columns, args.constraints, + args.comment, getFormatSettings(args.getContext())); +} + } #endif diff --git a/src/Storages/S3DataLakeMetadataReadHelper.cpp b/src/Storages/S3DataLakeMetadataReadHelper.cpp index 76024ceb4ea..f346cbc1cec 100644 --- a/src/Storages/S3DataLakeMetadataReadHelper.cpp +++ b/src/Storages/S3DataLakeMetadataReadHelper.cpp @@ -33,8 +33,8 @@ S3DataLakeMetadataReadHelper::createReadBuffer(const String & key, ContextPtr co context->getReadSettings()); } -std::vector S3DataLakeMetadataReadHelper::listFilesMatchSuffix( - const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix) +std::vector S3DataLakeMetadataReadHelper::listFiles( + const StorageS3::Configuration & base_configuration, const String & prefix, const String & suffix) { const auto & table_path = base_configuration.url.key; const auto & bucket = base_configuration.url.bucket; @@ -48,7 +48,7 @@ std::vector S3DataLakeMetadataReadHelper::listFilesMatchSuffix( request.SetBucket(bucket); - request.SetPrefix(std::filesystem::path(table_path) / directory); + request.SetPrefix(std::filesystem::path(table_path) / prefix); while (!is_finished) { @@ -72,54 +72,13 @@ std::vector S3DataLakeMetadataReadHelper::listFilesMatchSuffix( } request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - is_finished = !outcome.GetResult().GetIsTruncated(); } + LOG_TRACE(&Poco::Logger::get("S3DataLakeMetadataReadHelper"), "Listed {} files", res.size()); + return res; } -std::vector S3DataLakeMetadataReadHelper::listFiles(const StorageS3::Configuration & configuration) -{ - const auto & client = configuration.client; - const auto & table_path = configuration.url.key; - const auto & bucket = configuration.url.bucket; - - std::vector keys; - S3::ListObjectsV2Request request; - Aws::S3::Model::ListObjectsV2Outcome outcome; - - bool is_finished{false}; - - request.SetBucket(bucket); - request.SetPrefix(table_path); - - while (!is_finished) - { - outcome = client->ListObjectsV2(request); - if (!outcome.IsSuccess()) - throw Exception( - ErrorCodes::S3_ERROR, - "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", - quoteString(bucket), - quoteString(table_path), - backQuote(outcome.GetError().GetExceptionName()), - quoteString(outcome.GetError().GetMessage())); - - const auto & result_batch = outcome.GetResult().GetContents(); - for (const auto & obj : result_batch) - { - const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix. - keys.push_back(filename); - } - - request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - is_finished = !outcome.GetResult().GetIsTruncated(); - } - - LOG_TRACE(&Poco::Logger::get("S3DataLakeMetadataReadHelper"), "Listed {} files", keys.size()); - - return keys; -} } #endif diff --git a/src/Storages/S3DataLakeMetadataReadHelper.h b/src/Storages/S3DataLakeMetadataReadHelper.h index 2187ffa3eda..c9ee88068e7 100644 --- a/src/Storages/S3DataLakeMetadataReadHelper.h +++ b/src/Storages/S3DataLakeMetadataReadHelper.h @@ -13,13 +13,10 @@ namespace DB struct S3DataLakeMetadataReadHelper { - static std::shared_ptr - createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration); + static std::shared_ptr createReadBuffer( + const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration); - static std::vector - listFilesMatchSuffix(const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix); - - static std::vector listFiles(const StorageS3::Configuration & configuration); + static std::vector listFiles(const StorageS3::Configuration & configuration, const std::string & prefix = "", const std::string & suffix = ""); }; } diff --git a/src/Storages/StorageDeltaLake.cpp b/src/Storages/StorageDeltaLake.cpp deleted file mode 100644 index bd6fbf55d4a..00000000000 --- a/src/Storages/StorageDeltaLake.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "config.h" - -#if USE_AWS_S3 - -#include -#include -#include - - -namespace DB -{ - -void registerStorageDeltaLake(StorageFactory & factory) -{ - factory.registerStorage( - "DeltaLake", - [](const StorageFactory::Arguments & args) - { - auto configuration = StorageDeltaLake::getConfiguration(args.engine_args, args.getLocalContext()); - return std::make_shared( - std::move(configuration), args.getContext(), args.table_id, args.columns, args.constraints, - args.comment, getFormatSettings(args.getContext())); - }, - { - .supports_settings = false, - .supports_schema_inference = true, - .source_access_type = AccessType::S3, - }); -} - -} - -#endif diff --git a/src/Storages/StorageDeltaLake.h b/src/Storages/StorageDeltaLake.h index 1ca3b09aa71..6d83a385777 100644 --- a/src/Storages/StorageDeltaLake.h +++ b/src/Storages/StorageDeltaLake.h @@ -61,7 +61,7 @@ private: static constexpr auto deltalake_metadata_directory = "_delta_log"; static constexpr auto metadata_file_suffix = ".json"; - return MetadataReadHelper::listFilesMatchSuffix( + return MetadataReadHelper::listFiles( storage_configuration, deltalake_metadata_directory, metadata_file_suffix); } diff --git a/src/Storages/StorageHudi.cpp b/src/Storages/StorageHudi.cpp deleted file mode 100644 index 72e9c73abfd..00000000000 --- a/src/Storages/StorageHudi.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "config.h" - -#if USE_AWS_S3 - -#include -#include -#include - - -namespace DB -{ - -void registerStorageHudi(StorageFactory & factory) -{ - factory.registerStorage( - "Hudi", - [](const StorageFactory::Arguments & args) - { - auto configuration = StorageHudi::getConfiguration(args.engine_args, args.getLocalContext()); - return std::make_shared( - std::move(configuration), args.getContext(), args.table_id, args.columns, args.constraints, - args.comment, getFormatSettings(args.getContext())); - }, - { - .supports_settings = false, - .supports_schema_inference = true, - .source_access_type = AccessType::S3, - }); -} - -} - -#endif diff --git a/src/Storages/StorageIceberg.cpp b/src/Storages/StorageIceberg.cpp index a8662470efe..8f519259e3d 100644 --- a/src/Storages/StorageIceberg.cpp +++ b/src/Storages/StorageIceberg.cpp @@ -60,7 +60,7 @@ String IcebergMetadataParser::fetchMetadataFi /// newest version has the max version name, so we should list all of them, /// then find the newest metadata. static constexpr auto meta_file_suffix = ".json"; - auto metadata_files = MetadataReadHelper::listFilesMatchSuffix(base_configuration, metadata_directory, meta_file_suffix); + auto metadata_files = MetadataReadHelper::listFiles(base_configuration, metadata_directory, meta_file_suffix); if (metadata_files.empty()) throw Exception( @@ -243,24 +243,6 @@ IcebergMetadataParser::g template std::vector IcebergMetadataParser::getFilesForRead(const std::vector & manifest_files) const; -void registerStorageIceberg(StorageFactory & factory) -{ - factory.registerStorage( - "Iceberg", - [](const StorageFactory::Arguments & args) - { - auto configuration = StorageIceberg::getConfiguration(args.engine_args, args.getLocalContext()); - auto format_settings = getFormatSettings(args.getContext()); - return std::make_shared( - std::move(configuration), args.getContext(), args.table_id, args.columns, args.constraints, args.comment, format_settings); - }, - { - .supports_settings = false, - .supports_schema_inference = true, - .source_access_type = AccessType::S3, - }); -} - } #endif diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index d52702dc7bb..32f5e5ccbe6 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -74,8 +74,6 @@ namespace ProfileEvents namespace DB { -static const String PARTITION_ID_WILDCARD = "{_partition_id}"; - static const std::unordered_set required_configuration_keys = { "url", }; @@ -764,7 +762,7 @@ public: ContextPtr context, std::optional format_settings_, const CompressionMethod compression_method, - const StorageS3::Configuration & s3_configuration_, + const StorageS3::Configuration & configuration_, const String & bucket, const String & key) : SinkToStorage(sample_block_) @@ -773,10 +771,10 @@ public: { write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique( - s3_configuration_.client, + configuration_.client, bucket, key, - s3_configuration_.request_settings, + configuration_.request_settings, std::nullopt, DBMS_DEFAULT_BUFFER_SIZE, threadPoolCallbackRunner(IOThreadPool::get(), "S3ParallelRead"), @@ -855,7 +853,7 @@ public: ContextPtr context_, std::optional format_settings_, const CompressionMethod compression_method_, - const StorageS3::Configuration & s3_configuration_, + const StorageS3::Configuration & configuration_, const String & bucket_, const String & key_) : PartitionedSink(partition_by, context_, sample_block_) @@ -863,7 +861,7 @@ public: , sample_block(sample_block_) , context(context_) , compression_method(compression_method_) - , s3_configuration(s3_configuration_) + , configuration(configuration_) , bucket(bucket_) , key(key_) , format_settings(format_settings_) @@ -884,7 +882,7 @@ public: context, format_settings, compression_method, - s3_configuration, + configuration, partition_bucket, partition_key ); @@ -895,7 +893,7 @@ private: const Block sample_block; ContextPtr context; const CompressionMethod compression_method; - const StorageS3::Configuration & s3_configuration; + const StorageS3::Configuration & configuration; const String bucket; const String key; std::optional format_settings; @@ -930,7 +928,7 @@ private: StorageS3::StorageS3( - StorageS3::ConfigurationPtr configuration_, + const Configuration & configuration_, ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, @@ -940,21 +938,21 @@ StorageS3::StorageS3( bool distributed_processing_, ASTPtr partition_by_) : IStorage(table_id_) - , s3_configuration(std::move(configuration_)) - , keys({s3_configuration->url.key}) - , name(s3_configuration->url.storage_name) + , configuration(configuration_) + , name(configuration.url.storage_name) , distributed_processing(distributed_processing_) , format_settings(format_settings_) , partition_by(partition_by_) { - FormatFactory::instance().checkFormatName(s3_configuration->format); - context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration->url.uri); - StorageInMemoryMetadata storage_metadata; + updateConfigurationIfChanged(context_); - s3_configuration->update(context_); + FormatFactory::instance().checkFormatName(configuration.format); + context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri); + + StorageInMemoryMetadata storage_metadata; if (columns_.empty()) { - auto columns = getTableStructureFromDataImpl(*s3_configuration, format_settings, context_, &object_infos); + auto columns = getTableStructureFromDataImpl(configuration, format_settings, context_, &object_infos); storage_metadata.setColumns(columns); } else @@ -975,8 +973,7 @@ StorageS3::StorageS3( } std::shared_ptr StorageS3::createFileIterator( - const Configuration & s3_configuration, - const std::vector & keys, + const Configuration & configuration, bool distributed_processing, ContextPtr local_context, ASTPtr query, @@ -988,30 +985,30 @@ std::shared_ptr StorageS3::createFileIterator( { return std::make_shared(local_context->getReadTaskCallback()); } - else if (s3_configuration.withGlobs()) + else if (configuration.withGlobs()) { /// Iterate through disclosed globs and make a source for each file return std::make_shared( - *s3_configuration.client, s3_configuration.url, query, virtual_block, - local_context, object_infos, read_keys, s3_configuration.request_settings); + *configuration.client, configuration.url, query, virtual_block, + local_context, object_infos, read_keys, configuration.request_settings); } else { return std::make_shared( - *s3_configuration.client, s3_configuration.url.version_id, keys, - s3_configuration.url.bucket, s3_configuration.request_settings, query, virtual_block, local_context, + *configuration.client, configuration.url.version_id, configuration.keys, + configuration.url.bucket, configuration.request_settings, query, virtual_block, local_context, object_infos, read_keys); } } bool StorageS3::supportsSubcolumns() const { - return FormatFactory::instance().checkIfFormatSupportsSubcolumns(s3_configuration->format); + return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format); } bool StorageS3::supportsSubsetOfColumns() const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(s3_configuration->format); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format); } Pipe StorageS3::read( @@ -1023,12 +1020,10 @@ Pipe StorageS3::read( size_t max_block_size, size_t num_streams) { - bool has_wildcards = s3_configuration->url.bucket.find(PARTITION_ID_WILDCARD) != String::npos - || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; - if (partition_by && has_wildcards) + if (partition_by && configuration.withWildcard()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet"); - s3_configuration->update(local_context); + updateConfigurationIfChanged(local_context); Pipes pipes; @@ -1042,8 +1037,7 @@ Pipe StorageS3::read( } std::shared_ptr iterator_wrapper = createFileIterator( - *s3_configuration, - keys, + configuration, false, local_context, query_info.query, @@ -1078,18 +1072,18 @@ Pipe StorageS3::read( { pipes.emplace_back(std::make_shared( requested_virtual_columns, - s3_configuration->format, + configuration.format, getName(), block_for_format, local_context, format_settings, columns_description, max_block_size, - s3_configuration->request_settings, - s3_configuration->compression_method, - s3_configuration->client, - s3_configuration->url.bucket, - s3_configuration->url.version_id, + configuration.request_settings, + configuration.compression_method, + configuration.client, + configuration.url.bucket, + configuration.url.version_id, iterator_wrapper, max_download_threads)); } @@ -1102,89 +1096,90 @@ Pipe StorageS3::read( SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) { - s3_configuration->update(local_context); + updateConfigurationIfChanged(local_context); auto sample_block = metadata_snapshot->getSampleBlock(); - auto chosen_compression_method = chooseCompressionMethod(keys.back(), s3_configuration->compression_method); - bool has_wildcards = s3_configuration->url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; + auto chosen_compression_method = chooseCompressionMethod(configuration.keys.back(), configuration.compression_method); auto insert_query = std::dynamic_pointer_cast(query); auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; - bool is_partitioned_implementation = partition_by_ast && has_wildcards; + bool is_partitioned_implementation = partition_by_ast && configuration.withWildcard(); if (is_partitioned_implementation) { return std::make_shared( partition_by_ast, - s3_configuration->format, + configuration.format, sample_block, local_context, format_settings, chosen_compression_method, - *s3_configuration, - s3_configuration->url.bucket, - keys.back()); + configuration, + configuration.url.bucket, + configuration.keys.back()); } else { - if (s3_configuration->withGlobs()) + if (configuration.withGlobs()) throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, - "S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration->url.key); + "S3 key '{}' contains globs, so the table is in readonly mode", configuration.url.key); bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert; - if (!truncate_in_insert && S3::objectExists(*s3_configuration->client, s3_configuration->url.bucket, keys.back(), s3_configuration->url.version_id, s3_configuration->request_settings)) + if (!truncate_in_insert && S3::objectExists(*configuration.client, configuration.url.bucket, configuration.keys.back(), configuration.url.version_id, configuration.request_settings)) { if (local_context->getSettingsRef().s3_create_new_file_on_insert) { - size_t index = keys.size(); - auto pos = keys[0].find_first_of('.'); + size_t index = configuration.keys.size(); + const auto & first_key = configuration.keys[0]; + auto pos = first_key.find_first_of('.'); String new_key; do { - new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos)); + new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); ++index; } - while (S3::objectExists(*s3_configuration->client, s3_configuration->url.bucket, new_key, s3_configuration->url.version_id, s3_configuration->request_settings)); - keys.push_back(new_key); + while (S3::objectExists(*configuration.client, configuration.url.bucket, new_key, configuration.url.version_id, configuration.request_settings)); + configuration.keys.push_back(new_key); } else + { throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Object in bucket {} with key {} already exists. " - "If you want to overwrite it, enable setting s3_truncate_on_insert, if you " - "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", - s3_configuration->url.bucket, - keys.back()); + ErrorCodes::BAD_ARGUMENTS, + "Object in bucket {} with key {} already exists. " + "If you want to overwrite it, enable setting s3_truncate_on_insert, if you " + "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", + configuration.url.bucket, configuration.keys.back()); + } } return std::make_shared( - s3_configuration->format, + configuration.format, sample_block, local_context, format_settings, chosen_compression_method, - *s3_configuration, - s3_configuration->url.bucket, - keys.back()); + configuration, + configuration.url.bucket, + configuration.keys.back()); } } void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { - s3_configuration->update(local_context); + updateConfigurationIfChanged(local_context); - if (s3_configuration->withGlobs()) + if (configuration.withGlobs()) { throw Exception( ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", - s3_configuration->url.key); + configuration.url.key); } Aws::S3::Model::Delete delkeys; - for (const auto & key : keys) + for (const auto & key : configuration.keys) { Aws::S3::Model::ObjectIdentifier obj; obj.SetKey(key); @@ -1193,10 +1188,10 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ProfileEvents::increment(ProfileEvents::S3DeleteObjects); S3::DeleteObjectsRequest request; - request.SetBucket(s3_configuration->url.bucket); + request.SetBucket(configuration.url.bucket); request.SetDelete(delkeys); - auto response = s3_configuration->client->DeleteObjects(request); + auto response = configuration.client->DeleteObjects(request); if (!response.IsSuccess()) { const auto & err = response.GetError(); @@ -1207,24 +1202,35 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, LOG_WARNING(&Poco::Logger::get("StorageS3"), "Failed to delete {}, error: {}", error.GetKey(), error.GetMessage()); } +void StorageS3::updateConfigurationIfChanged(ContextPtr local_context) +{ + std::cerr << "\n\nkssenii fff\n\n"; + configuration.update(local_context); +} + +void StorageS3::useConfiguration(const Configuration & new_configuration) +{ + configuration = new_configuration; +} bool StorageS3::Configuration::update(ContextPtr context) { - auto settings = context->getStorageS3Settings().getSettings(url.uri.toString()); - request_settings = settings.request_settings; + std::cerr << "\n\nupdating configuration!\n\n"; + + auto s3_settings = context->getStorageS3Settings().getSettings(url.uri.toString()); + request_settings = s3_settings.request_settings; request_settings.updateFromSettings(context->getSettings()); - if (client) - { - if (static_configuration) - return false; + if (client && (static_configuration || s3_settings.auth_settings == auth_settings)) + return false; - if (settings.auth_settings == auth_settings) - return false; - } - - auth_settings.updateFrom(settings.auth_settings); + auth_settings.updateFrom(s3_settings.auth_settings); + connect(context); + return true; +} +void StorageS3::Configuration::connect(ContextPtr context) +{ S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( auth_settings.region, context->getRemoteHostFilter(), @@ -1236,12 +1242,11 @@ bool StorageS3::Configuration::update(ContextPtr context) client_configuration.endpointOverride = url.endpoint; client_configuration.maxConnections = static_cast(request_settings.max_connections); - - auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key); auto headers = auth_settings.headers; if (!headers_from_ast.empty()) headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end()); + auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key); client = S3::ClientFactory::instance().create( client_configuration, url.is_virtual_hosted_style, @@ -1250,9 +1255,8 @@ bool StorageS3::Configuration::update(ContextPtr context) auth_settings.server_side_encryption_customer_key_base64, std::move(headers), auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", false)), - auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false))); - - return true; + auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)), + auth_settings.expiration_window_seconds.value_or(context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS))); } void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection) @@ -1276,13 +1280,13 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur configuration.request_settings = S3Settings::RequestSettings(collection); } -StorageS3::ConfigurationPtr StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file) +StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file) { - auto configuration = std::make_unique(); + StorageS3::Configuration configuration; - if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args)) + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) { - processNamedCollectionResult(*configuration, *named_collection); + processNamedCollectionResult(configuration, *named_collection); } else { @@ -1301,7 +1305,7 @@ StorageS3::ConfigurationPtr StorageS3::getConfiguration(ASTs & engine_args, Cont "Storage S3 requires 1 to 5 arguments: " "url, [access_key_id, secret_access_key], name of used format and [compression_method]"); - auto * header_it = StorageURL::collectHeaders(engine_args, configuration->headers_from_ast, local_context); + auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context); if (header_it != engine_args.end()) engine_args.erase(header_it); @@ -1336,25 +1340,27 @@ StorageS3::ConfigurationPtr StorageS3::getConfiguration(ASTs & engine_args, Cont } /// This argument is always the first - configuration->url = S3::URI(checkAndGetLiteralArgument(engine_args[0], "url")); + configuration.url = S3::URI(checkAndGetLiteralArgument(engine_args[0], "url")); if (engine_args_to_idx.contains("format")) - configuration->format = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["format"]], "format"); + configuration.format = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["format"]], "format"); if (engine_args_to_idx.contains("compression_method")) - configuration->compression_method = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); + configuration.compression_method = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); if (engine_args_to_idx.contains("access_key_id")) - configuration->auth_settings.access_key_id = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); + configuration.auth_settings.access_key_id = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); if (engine_args_to_idx.contains("secret_access_key")) - configuration->auth_settings.secret_access_key = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); + configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); } - configuration->static_configuration = !configuration->auth_settings.access_key_id.empty(); + configuration.static_configuration = !configuration.auth_settings.access_key_id.empty(); - if (configuration->format == "auto" && get_format_from_file) - configuration->format = FormatFactory::instance().getFormatFromFileName(configuration->url.key, true); + configuration.keys = {configuration.url.key}; + + if (configuration.format == "auto" && get_format_from_file) + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url.key, true); return configuration; } @@ -1370,7 +1376,7 @@ ColumnsDescription StorageS3::getTableStructureFromData( } ColumnsDescription StorageS3::getTableStructureFromDataImpl( - const Configuration & s3_configuration, + const Configuration & configuration, const std::optional & format_settings, ContextPtr ctx, ObjectInfos * object_infos) @@ -1378,16 +1384,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( std::vector read_keys; auto file_iterator = createFileIterator( - s3_configuration, - {s3_configuration.url.key}, - false, - ctx, nullptr, - {}, object_infos, &read_keys); + configuration, false, ctx, nullptr, {}, object_infos, &read_keys); std::optional columns_from_cache; size_t prev_read_keys_size = read_keys.size(); if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) - columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos, format_settings, ctx); + columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), configuration, object_infos, format_settings, ctx); ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr { @@ -1399,7 +1401,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( throw Exception( ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file, because there are no files with provided path " - "in S3. You must specify table structure manually", s3_configuration.format); + "in S3. You must specify table structure manually", configuration.format); return nullptr; } @@ -1408,7 +1410,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) { columns_from_cache = tryGetColumnsFromCache( - read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos, format_settings, ctx); + read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, object_infos, format_settings, ctx); prev_read_keys_size = read_keys.size(); if (columns_from_cache) { @@ -1421,8 +1423,8 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( int zstd_window_log_max = static_cast(ctx->getSettingsRef().zstd_window_log_max); return wrapReadBufferWithCompressionMethod( std::make_unique( - s3_configuration.client, s3_configuration.url.bucket, key, s3_configuration.url.version_id, s3_configuration.request_settings, ctx->getReadSettings()), - chooseCompressionMethod(key, s3_configuration.compression_method), + configuration.client, configuration.url.bucket, key, configuration.url.version_id, configuration.request_settings, ctx->getReadSettings()), + chooseCompressionMethod(key, configuration.compression_method), zstd_window_log_max); }; @@ -1430,10 +1432,10 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( if (columns_from_cache) columns = *columns_from_cache; else - columns = readSchemaFromFormat(s3_configuration.format, format_settings, read_buffer_iterator, s3_configuration.withGlobs(), ctx); + columns = readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx); if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) - addColumnsToCache(read_keys, s3_configuration, columns, s3_configuration.format, format_settings, ctx); + addColumnsToCache(read_keys, configuration, columns, configuration.format, format_settings, ctx); return columns; } @@ -1531,7 +1533,7 @@ SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx) std::optional StorageS3::tryGetColumnsFromCache( const Strings::const_iterator & begin, const Strings::const_iterator & end, - const Configuration & s3_configuration, + const Configuration & configuration, ObjectInfos * object_infos, const std::optional & format_settings, const ContextPtr & ctx) @@ -1539,7 +1541,7 @@ std::optional StorageS3::tryGetColumnsFromCache( auto & schema_cache = getSchemaCache(ctx); for (auto it = begin; it < end; ++it) { - String path = fs::path(s3_configuration.url.bucket) / *it; + String path = fs::path(configuration.url.bucket) / *it; auto get_last_mod_time = [&]() -> std::optional { S3::ObjectInfo info; @@ -1553,8 +1555,8 @@ std::optional StorageS3::tryGetColumnsFromCache( /// but schema cache will handle this case and won't return columns from cache /// because we can't say that it's valid without last modification time. info = S3::getObjectInfo( - *s3_configuration.client, s3_configuration.url.bucket, *it, s3_configuration.url.version_id, - s3_configuration.request_settings, {}, {}, /* throw_on_error= */ false); + *configuration.client, configuration.url.bucket, *it, configuration.url.version_id, + configuration.request_settings, {}, {}, /* throw_on_error= */ false); if (object_infos) (*object_infos)[path] = info; @@ -1566,8 +1568,8 @@ std::optional StorageS3::tryGetColumnsFromCache( return std::nullopt; }; - String source = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / path; - auto cache_key = getKeyForSchemaCache(source, s3_configuration.format, format_settings, ctx); + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path; + auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx); auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); if (columns) return columns; @@ -1578,13 +1580,13 @@ std::optional StorageS3::tryGetColumnsFromCache( void StorageS3::addColumnsToCache( const Strings & keys, - const Configuration & s3_configuration, + const Configuration & configuration, const ColumnsDescription & columns, const String & format_name, const std::optional & format_settings, const ContextPtr & ctx) { - auto host_and_bucket = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / s3_configuration.url.bucket; + auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; Strings sources; sources.reserve(keys.size()); std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; }); diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 07016c35368..a66ff8a46dc 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -241,11 +241,10 @@ public: struct Configuration : public StatelessTableEngineConfiguration { Configuration() = default; - virtual ~Configuration() = default; - Configuration(const Configuration &) = default; S3::URI url; std::shared_ptr client; + std::vector keys; S3::AuthSettings auth_settings; S3Settings::RequestSettings request_settings; /// If s3 configuration was passed from ast, then it is static. @@ -254,21 +253,29 @@ public: /// Headers from ast is a part of static configuration. HTTPHeaderEntries headers_from_ast; - String getPath() const { return url.uri.toString(); } /// For logging - - bool withGlobs() const { return url.key.find_first_of("*?{") != std::string::npos; } + String getPath() const { return url.key; } void appendToPath(const String & suffix) { url = S3::URI{std::filesystem::path(url.uri.toString()) / suffix}; } - virtual bool update(ContextPtr context); + bool update(ContextPtr context); + + void connect(ContextPtr context); + + bool withGlobs() const { return url.key.find_first_of("*?{") != std::string::npos; } + + bool withWildcard() const + { + static const String PARTITION_ID_WILDCARD = "{_partition_id}"; + return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos + || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; + } }; - using ConfigurationPtr = std::unique_ptr; StorageS3( - StorageS3::ConfigurationPtr configuration_, + const Configuration & configuration_, ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, @@ -306,7 +313,7 @@ public: static SchemaCache & getSchemaCache(const ContextPtr & ctx); - static StorageS3::ConfigurationPtr getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true); + static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true); static ColumnsDescription getTableStructureFromData( StorageS3::Configuration & configuration, @@ -314,12 +321,18 @@ public: ContextPtr ctx, ObjectInfos * object_infos = nullptr); +protected: + virtual void updateConfigurationIfChanged(ContextPtr local_context); + + void useConfiguration(const Configuration & new_configuration); + + const Configuration & getConfiguration() { return configuration; } + private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; - const ConfigurationPtr s3_configuration; - std::vector keys; + Configuration configuration; NamesAndTypesList virtual_columns; Block virtual_block; @@ -331,8 +344,7 @@ private: ObjectInfos object_infos; static std::shared_ptr createFileIterator( - const Configuration & s3_configuration, - const std::vector & keys, + const Configuration & configuration, bool distributed_processing, ContextPtr local_context, ASTPtr query, @@ -341,7 +353,7 @@ private: Strings * read_keys = nullptr); static ColumnsDescription getTableStructureFromDataImpl( - const Configuration & s3_configuration, + const Configuration & configuration, const std::optional & format_settings, ContextPtr ctx, ObjectInfos * object_infos = nullptr); @@ -353,14 +365,14 @@ private: static std::optional tryGetColumnsFromCache( const Strings::const_iterator & begin, const Strings::const_iterator & end, - const Configuration & s3_configuration, + const Configuration & configuration, ObjectInfos * object_infos, const std::optional & format_settings, const ContextPtr & ctx); static void addColumnsToCache( const Strings & keys, - const Configuration & s3_configuration, + const Configuration & configuration, const ColumnsDescription & columns, const String & format_name, const std::optional & format_settings, diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 2b40d04b793..b9019ba83ae 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -49,6 +49,7 @@ StorageS3Cluster::StorageS3Cluster( ContextPtr context_, bool structure_argument_was_provided_) : IStorageCluster(table_id_) + , log(&Poco::Logger::get("StorageS3Cluster (" + table_id_.table_name + ")")) , s3_configuration{configuration_} , cluster_name(configuration_.cluster_name) , format_name(configuration_.format) @@ -57,7 +58,7 @@ StorageS3Cluster::StorageS3Cluster( { context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri); StorageInMemoryMetadata storage_metadata; - s3_configuration.update(context_); + updateConfigurationIfChanged(context_); if (columns_.empty()) { @@ -83,6 +84,11 @@ StorageS3Cluster::StorageS3Cluster( virtual_block.insert({column.type->createColumn(), column.type, column.name}); } +void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context) +{ + s3_configuration.update(local_context); +} + /// The code executes on initiator Pipe StorageS3Cluster::read( const Names & column_names, @@ -93,7 +99,7 @@ Pipe StorageS3Cluster::read( size_t /*max_block_size*/, size_t /*num_streams*/) { - s3_configuration.update(context); + updateConfigurationIfChanged(context); auto cluster = getCluster(context); auto extension = getTaskIteratorExtension(query_info.query, context); @@ -152,6 +158,7 @@ Pipe StorageS3Cluster::read( processed_stage, extension); + remote_query_executor->setLogger(log); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } } diff --git a/src/Storages/StorageS3Cluster.h b/src/Storages/StorageS3Cluster.h index e55382c66b2..2896ab57f49 100644 --- a/src/Storages/StorageS3Cluster.h +++ b/src/Storages/StorageS3Cluster.h @@ -47,7 +47,11 @@ public: RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override; ClusterPtr getCluster(ContextPtr context) const override; +protected: + void updateConfigurationIfChanged(ContextPtr local_context); + private: + Poco::Logger * log; StorageS3::Configuration s3_configuration; String cluster_name; String format_name; diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index 9f046d756a1..1a20632430b 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -23,26 +23,24 @@ class ITableFunctionDataLake : public ITableFunction { public: static constexpr auto name = Name::name; - std::string getName() const override - { - return name; - } + std::string getName() const override { return name; } protected: - StoragePtr - executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) - const override + StoragePtr executeImpl( + const ASTPtr & /*ast_function*/, + ContextPtr context, + const std::string & table_name, + ColumnsDescription /*cached_columns*/) const override { ColumnsDescription columns; if (configuration.structure != "auto") columns = parseColumnsListFromString(configuration.structure, context); StoragePtr storage = std::make_shared( - std::make_unique(configuration), context, - StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, std::nullopt); + configuration, context, StorageID(getDatabaseName(), table_name), + columns, ConstraintsDescription{}, String{}, std::nullopt); storage->startup(); - return storage; } @@ -61,11 +59,10 @@ protected: void parseArguments(const ASTPtr & ast_function, ContextPtr context) override { - /// Parse args ASTs & args_func = ast_function->children; - const auto message - = fmt::format("The signature of table function '{}' could be the following:\n{}", getName(), TableFunction::signature); + const auto message = fmt::format( + "The signature of table function '{}' could be the following:\n{}", getName(), TableFunction::signature); if (args_func.size() != 1) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments", getName()); diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index febd10383a8..5cca51ef650 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -32,7 +32,7 @@ namespace ErrorCodes void TableFunctionS3::parseArgumentsImpl( const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration, bool get_format_from_file) { - if (auto named_collection = tryGetNamedCollectionWithOverrides(args)) + if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) { StorageS3::processNamedCollectionResult(s3_configuration, *named_collection); } @@ -152,9 +152,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context else if (!structure_hint.empty()) columns = structure_hint; - auto storage_configuration = std::make_unique(configuration); StoragePtr storage = std::make_shared( - std::move(storage_configuration), + configuration, context, StorageID(getDatabaseName(), table_name), columns, diff --git a/src/TableFunctions/TableFunctionS3Cluster.cpp b/src/TableFunctions/TableFunctionS3Cluster.cpp index 079d8ee207a..cca316d0c95 100644 --- a/src/TableFunctions/TableFunctionS3Cluster.cpp +++ b/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -108,7 +108,7 @@ StoragePtr TableFunctionS3Cluster::executeImpl( { /// On worker node this filename won't contains globs storage = std::make_shared( - std::make_unique(configuration), + configuration, context, StorageID(getDatabaseName(), table_name), columns, diff --git a/tests/integration/test_storage_delta/configs/config.d/named_collections.xml b/tests/integration/test_storage_delta/configs/config.d/named_collections.xml new file mode 100644 index 00000000000..d4c54e2d13d --- /dev/null +++ b/tests/integration/test_storage_delta/configs/config.d/named_collections.xml @@ -0,0 +1,9 @@ + + + + http://minio1:9001/root/ + minio + minio123 + + +