Merge pull request #46083 from kssenii/simplify-storage-s3-configuration

Simplify code around storages s3/hudi/delta-lake
This commit is contained in:
Kseniia Sumarokova 2023-02-08 12:02:38 +01:00 committed by GitHub
commit 693006ba42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 184 additions and 237 deletions

View File

@ -30,6 +30,7 @@ struct URI
bool is_virtual_hosted_style; bool is_virtual_hosted_style;
URI() = default;
explicit URI(const std::string & uri_); explicit URI(const std::string & uri_);
static void validateBucket(const std::string & bucket, const Poco::URI & uri); static void validateBucket(const std::string & bucket, const Poco::URI & uri);

View File

@ -109,18 +109,6 @@ struct URLBasedDataSourceConfiguration
void set(const URLBasedDataSourceConfiguration & conf); void set(const URLBasedDataSourceConfiguration & conf);
}; };
struct StorageS3Configuration : URLBasedDataSourceConfiguration
{
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;
};
struct StorageS3ClusterConfiguration : StorageS3Configuration
{
String cluster_name;
};
struct URLBasedDataSourceConfig struct URLBasedDataSourceConfig
{ {
URLBasedDataSourceConfiguration configuration; URLBasedDataSourceConfiguration configuration;

View File

@ -0,0 +1,15 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
/// A base class for stateless table engines configurations.
struct StatelessTableEngineConfiguration
{
String format = "auto";
String compression_method = "auto";
String structure = "auto";
};
}

View File

@ -57,8 +57,8 @@ std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
return keys; return keys;
} }
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context) JsonMetadataGetter::JsonMetadataGetter(const StorageS3::Configuration & configuration_, ContextPtr context)
: base_configuration(configuration_), table_path(table_path_) : base_configuration(configuration_)
{ {
init(context); init(context);
} }
@ -94,17 +94,17 @@ void JsonMetadataGetter::init(ContextPtr context)
} }
} }
std::vector<String> JsonMetadataGetter::getJsonLogFiles() std::vector<String> JsonMetadataGetter::getJsonLogFiles() const
{ {
std::vector<String> keys;
const auto & client = base_configuration.client; const auto & client = base_configuration.client;
const auto table_path = base_configuration.url.key;
const auto bucket = base_configuration.url.bucket;
std::vector<String> keys;
S3::ListObjectsV2Request request; S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome; Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false}; bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket); request.SetBucket(bucket);
@ -154,9 +154,9 @@ std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String
request_settings.max_single_read_retries = 10; request_settings.max_single_read_retries = 10;
return std::make_shared<ReadBufferFromS3>( return std::make_shared<ReadBufferFromS3>(
base_configuration.client, base_configuration.client,
base_configuration.uri.bucket, base_configuration.url.bucket,
key, key,
base_configuration.uri.version_id, base_configuration.url.version_id,
request_settings, request_settings,
context->getReadSettings()); context->getReadSettings());
} }
@ -182,11 +182,6 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
namespace namespace
{ {
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
{
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
}
// DeltaLake stores data in parts in different files // DeltaLake stores data in parts in different files
// keys is vector of parts with latest version // keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for // generateQueryFromKeys constructs query from parts filenames for
@ -198,35 +193,25 @@ String generateQueryFromKeys(const std::vector<String> & keys)
} }
StorageS3Configuration getAdjustedS3Configuration( StorageS3::Configuration getAdjustedS3Configuration(
const ContextPtr & context, const ContextPtr & context, const StorageS3::Configuration & configuration, Poco::Logger * log)
StorageS3::S3Configuration & base_configuration,
const StorageS3Configuration & configuration,
const std::string & table_path,
Poco::Logger * log)
{ {
JsonMetadataGetter getter{base_configuration, table_path, context}; JsonMetadataGetter getter{configuration, context};
const auto keys = getter.getFiles();
auto keys = getter.getFiles(); const auto new_uri = configuration.url.uri.toString() + generateQueryFromKeys(keys);
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys);
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
// set new url in configuration // set new url in configuration
StorageS3Configuration new_configuration; StorageS3::Configuration new_configuration(configuration);
new_configuration.url = new_uri; new_configuration.url = S3::URI(new_uri);
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
new_configuration.format = configuration.format;
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, new_uri);
return new_configuration; return new_configuration;
} }
} }
StorageDeltaLake::StorageDeltaLake( StorageDeltaLake::StorageDeltaLake(
const StorageS3Configuration & configuration_, const StorageS3::Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
ColumnsDescription columns_, ColumnsDescription columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -234,14 +219,13 @@ StorageDeltaLake::StorageDeltaLake(
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_) std::optional<FormatSettings> format_settings_)
: IStorage(table_id_) : IStorage(table_id_)
, base_configuration{getBaseConfiguration(configuration_)} , base_configuration{configuration_}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")")) , log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key) , table_path(base_configuration.url.key)
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration); StorageS3::updateS3Configuration(context_, base_configuration);
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, log);
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log);
if (columns_.empty()) if (columns_.empty())
{ {
@ -284,14 +268,11 @@ Pipe StorageDeltaLake::read(
} }
ColumnsDescription StorageDeltaLake::getTableStructureFromData( ColumnsDescription StorageDeltaLake::getTableStructureFromData(
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx) StorageS3::Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{ {
auto base_configuration = getBaseConfiguration(configuration); StorageS3::updateS3Configuration(ctx, configuration);
StorageS3::updateS3Configuration(ctx, base_configuration); auto new_configuration = getAdjustedS3Configuration(ctx, configuration, &Poco::Logger::get("StorageDeltaLake"));
auto new_configuration = getAdjustedS3Configuration( return StorageS3::getTableStructureFromData(new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
} }
void registerStorageDeltaLake(StorageFactory & factory) void registerStorageDeltaLake(StorageFactory & factory)
@ -306,9 +287,9 @@ void registerStorageDeltaLake(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage DeltaLake requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]"); "Storage DeltaLake requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
StorageS3Configuration configuration; StorageS3::Configuration configuration;
configuration.url = checkAndGetLiteralArgument<String>(engine_args[0], "url"); configuration.url = S3::URI(checkAndGetLiteralArgument<String>(engine_args[0], "url"));
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id"); configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key"); configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");

View File

@ -37,21 +37,20 @@ private:
class JsonMetadataGetter class JsonMetadataGetter
{ {
public: public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context); JsonMetadataGetter(const StorageS3::Configuration & configuration_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); } std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
private: private:
void init(ContextPtr context); void init(ContextPtr context);
std::vector<String> getJsonLogFiles(); std::vector<String> getJsonLogFiles() const;
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context); std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context);
void handleJSON(const JSON & json); void handleJSON(const JSON & json);
StorageS3::S3Configuration base_configuration; StorageS3::Configuration base_configuration;
String table_path;
DeltaLakeMetadata metadata; DeltaLakeMetadata metadata;
}; };
@ -62,7 +61,7 @@ public:
// 2. Finds out parts with latest version // 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads // 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDeltaLake( StorageDeltaLake(
const StorageS3Configuration & configuration_, const StorageS3::Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
ColumnsDescription columns_, ColumnsDescription columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -83,11 +82,12 @@ public:
size_t num_streams) override; size_t num_streams) override;
static ColumnsDescription getTableStructureFromData( static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration, StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx); ContextPtr ctx);
private: private:
StorageS3::S3Configuration base_configuration; StorageS3::Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine; std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log; Poco::Logger * log;
String table_path; String table_path;

View File

@ -30,11 +30,6 @@ namespace ErrorCodes
namespace namespace
{ {
StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration)
{
return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers};
}
/// Apache Hudi store parts of data in different files. /// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it. /// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part. /// Every partition(directory) in Apache Hudi has different versions of part.
@ -91,17 +86,17 @@ String generateQueryFromKeys(const std::vector<std::string> & keys, const String
return "{" + list_of_keys + "}"; return "{" + list_of_keys + "}";
} }
std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_configuration, const std::string & table_path, Poco::Logger * log) std::vector<std::string> getKeysFromS3(const StorageS3::Configuration & configuration, Poco::Logger * log)
{ {
const auto & client = configuration.client;
const auto & table_path = configuration.url.key;
const auto & bucket = configuration.url.bucket;
std::vector<std::string> keys; std::vector<std::string> keys;
const auto & client = base_configuration.client;
S3::ListObjectsV2Request request; S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome; Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false}; bool is_finished{false};
const auto bucket{base_configuration.uri.bucket};
request.SetBucket(bucket); request.SetBucket(bucket);
request.SetPrefix(table_path); request.SetPrefix(table_path);
@ -134,31 +129,22 @@ std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_c
} }
StorageS3Configuration getAdjustedS3Configuration( StorageS3::Configuration getAdjustedS3Configuration(const StorageS3::Configuration & configuration, Poco::Logger * log)
StorageS3::S3Configuration & base_configuration,
const StorageS3Configuration & configuration,
const std::string & table_path,
Poco::Logger * log)
{ {
auto keys = getKeysFromS3(base_configuration, table_path, log); const auto keys = getKeysFromS3(configuration, log);
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys, configuration.format); const auto new_uri = configuration.url.uri.toString() + generateQueryFromKeys(keys, configuration.format);
LOG_DEBUG(log, "New uri: {}", new_uri); StorageS3::Configuration new_configuration(configuration);
LOG_DEBUG(log, "Table path: {}", table_path); new_configuration.url = S3::URI(new_uri);
StorageS3Configuration new_configuration;
new_configuration.url = new_uri;
new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id;
new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key;
new_configuration.format = configuration.format;
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, new_uri);
return new_configuration; return new_configuration;
} }
} }
StorageHudi::StorageHudi( StorageHudi::StorageHudi(
const StorageS3Configuration & configuration_, const StorageS3::Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
ColumnsDescription columns_, ColumnsDescription columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -166,14 +152,13 @@ StorageHudi::StorageHudi(
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_) std::optional<FormatSettings> format_settings_)
: IStorage(table_id_) : IStorage(table_id_)
, base_configuration{getBaseConfiguration(configuration_)} , base_configuration{configuration_}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")")) , log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration); StorageS3::updateS3Configuration(context_, base_configuration);
auto new_configuration = getAdjustedS3Configuration(base_configuration, configuration_, table_path, log); auto new_configuration = getAdjustedS3Configuration(base_configuration, log);
if (columns_.empty()) if (columns_.empty())
{ {
@ -214,14 +199,11 @@ Pipe StorageHudi::read(
} }
ColumnsDescription StorageHudi::getTableStructureFromData( ColumnsDescription StorageHudi::getTableStructureFromData(
const StorageS3Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx) StorageS3::Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{ {
auto base_configuration = getBaseConfiguration(configuration); StorageS3::updateS3Configuration(ctx, configuration);
StorageS3::updateS3Configuration(ctx, base_configuration); auto new_configuration = getAdjustedS3Configuration(configuration, &Poco::Logger::get("StorageDeltaLake"));
auto new_configuration = getAdjustedS3Configuration( return StorageS3::getTableStructureFromData(new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
} }
void registerStorageHudi(StorageFactory & factory) void registerStorageHudi(StorageFactory & factory)
@ -236,9 +218,9 @@ void registerStorageHudi(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Hudi requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]"); "Storage Hudi requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
StorageS3Configuration configuration; StorageS3::Configuration configuration;
configuration.url = checkAndGetLiteralArgument<String>(engine_args[0], "url"); configuration.url = S3::URI(checkAndGetLiteralArgument<String>(engine_args[0], "url"));
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id"); configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key"); configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");

View File

@ -22,7 +22,7 @@ public:
/// 2. Finds out parts with latest version. /// 2. Finds out parts with latest version.
/// 3. Creates url for underlying StorageS3 enigne to handle reads. /// 3. Creates url for underlying StorageS3 enigne to handle reads.
StorageHudi( StorageHudi(
const StorageS3Configuration & configuration_, const StorageS3::Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
ColumnsDescription columns_, ColumnsDescription columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -44,14 +44,14 @@ public:
size_t num_streams) override; size_t num_streams) override;
static ColumnsDescription getTableStructureFromData( static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration, StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx); ContextPtr ctx);
private: private:
StorageS3::S3Configuration base_configuration; StorageS3::Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine; std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log; Poco::Logger * log;
String table_path;
}; };
} }

View File

@ -764,7 +764,7 @@ public:
ContextPtr context, ContextPtr context,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
const CompressionMethod compression_method, const CompressionMethod compression_method,
const StorageS3::S3Configuration & s3_configuration_, const StorageS3::Configuration & s3_configuration_,
const String & bucket, const String & bucket,
const String & key) const String & key)
: SinkToStorage(sample_block_) : SinkToStorage(sample_block_)
@ -855,7 +855,7 @@ public:
ContextPtr context_, ContextPtr context_,
std::optional<FormatSettings> format_settings_, std::optional<FormatSettings> format_settings_,
const CompressionMethod compression_method_, const CompressionMethod compression_method_,
const StorageS3::S3Configuration & s3_configuration_, const StorageS3::Configuration & s3_configuration_,
const String & bucket_, const String & bucket_,
const String & key_) const String & key_)
: PartitionedSink(partition_by, context_, sample_block_) : PartitionedSink(partition_by, context_, sample_block_)
@ -895,7 +895,7 @@ private:
const Block sample_block; const Block sample_block;
ContextPtr context; ContextPtr context;
const CompressionMethod compression_method; const CompressionMethod compression_method;
const StorageS3::S3Configuration & s3_configuration; const StorageS3::Configuration & s3_configuration;
const String bucket; const String bucket;
const String key; const String key;
std::optional<FormatSettings> format_settings; std::optional<FormatSettings> format_settings;
@ -930,7 +930,7 @@ private:
StorageS3::StorageS3( StorageS3::StorageS3(
const StorageS3Configuration & configuration_, const StorageS3::Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -940,18 +940,18 @@ StorageS3::StorageS3(
bool distributed_processing_, bool distributed_processing_,
ASTPtr partition_by_) ASTPtr partition_by_)
: IStorage(table_id_) : IStorage(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers} , s3_configuration{configuration_}
, keys({s3_configuration.uri.key}) , keys({s3_configuration.url.key})
, format_name(configuration_.format) , format_name(configuration_.format)
, compression_method(configuration_.compression_method) , compression_method(configuration_.compression_method)
, name(s3_configuration.uri.storage_name) , name(s3_configuration.url.storage_name)
, distributed_processing(distributed_processing_) , distributed_processing(distributed_processing_)
, format_settings(format_settings_) , format_settings(format_settings_)
, partition_by(partition_by_) , partition_by(partition_by_)
, is_key_with_globs(s3_configuration.uri.key.find_first_of("*?{") != std::string::npos) , is_key_with_globs(s3_configuration.url.key.find_first_of("*?{") != std::string::npos)
{ {
FormatFactory::instance().checkFormatName(format_name); FormatFactory::instance().checkFormatName(format_name);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.uri.uri); context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.url.uri);
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, s3_configuration); updateS3Configuration(context_, s3_configuration);
@ -987,7 +987,7 @@ StorageS3::StorageS3(
} }
std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator( std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
const std::vector<String> & keys, const std::vector<String> & keys,
bool is_key_with_globs, bool is_key_with_globs,
bool distributed_processing, bool distributed_processing,
@ -1005,14 +1005,14 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
{ {
/// Iterate through disclosed globs and make a source for each file /// Iterate through disclosed globs and make a source for each file
return std::make_shared<StorageS3Source::DisclosedGlobIterator>( return std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, *s3_configuration.client, s3_configuration.url, query, virtual_block,
local_context, object_infos, read_keys, s3_configuration.request_settings); local_context, object_infos, read_keys, s3_configuration.request_settings);
} }
else else
{ {
return std::make_shared<StorageS3Source::KeysIterator>( return std::make_shared<StorageS3Source::KeysIterator>(
*s3_configuration.client, s3_configuration.uri.version_id, keys, *s3_configuration.client, s3_configuration.url.version_id, keys,
s3_configuration.uri.bucket, s3_configuration.request_settings, query, virtual_block, local_context, s3_configuration.url.bucket, s3_configuration.request_settings, query, virtual_block, local_context,
object_infos, read_keys); object_infos, read_keys);
} }
} }
@ -1036,7 +1036,7 @@ Pipe StorageS3::read(
size_t max_block_size, size_t max_block_size,
size_t num_streams) size_t num_streams)
{ {
bool has_wildcards = s3_configuration.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos bool has_wildcards = s3_configuration.url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos; || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
if (partition_by && has_wildcards) if (partition_by && has_wildcards)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
@ -1102,8 +1102,8 @@ Pipe StorageS3::read(
s3_configuration.request_settings, s3_configuration.request_settings,
compression_method, compression_method,
s3_configuration.client, s3_configuration.client,
s3_configuration.uri.bucket, s3_configuration.url.bucket,
s3_configuration.uri.version_id, s3_configuration.url.version_id,
iterator_wrapper, iterator_wrapper,
max_download_threads)); max_download_threads));
} }
@ -1120,7 +1120,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
auto sample_block = metadata_snapshot->getSampleBlock(); auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method); auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
bool has_wildcards = s3_configuration.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; bool has_wildcards = s3_configuration.url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query); auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@ -1136,18 +1136,18 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
format_settings, format_settings,
chosen_compression_method, chosen_compression_method,
s3_configuration, s3_configuration,
s3_configuration.uri.bucket, s3_configuration.url.bucket,
keys.back()); keys.back());
} }
else else
{ {
if (is_key_with_globs) if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.uri.key); "S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.url.key);
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert; bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && S3::objectExists(*s3_configuration.client, s3_configuration.uri.bucket, keys.back(), s3_configuration.uri.version_id, s3_configuration.request_settings)) if (!truncate_in_insert && S3::objectExists(*s3_configuration.client, s3_configuration.url.bucket, keys.back(), s3_configuration.url.version_id, s3_configuration.request_settings))
{ {
if (local_context->getSettingsRef().s3_create_new_file_on_insert) if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{ {
@ -1159,7 +1159,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos)); new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index; ++index;
} }
while (S3::objectExists(*s3_configuration.client, s3_configuration.uri.bucket, new_key, s3_configuration.uri.version_id, s3_configuration.request_settings)); while (S3::objectExists(*s3_configuration.client, s3_configuration.url.bucket, new_key, s3_configuration.url.version_id, s3_configuration.request_settings));
keys.push_back(new_key); keys.push_back(new_key);
} }
else else
@ -1168,7 +1168,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
"Object in bucket {} with key {} already exists. " "Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you " "If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert", "want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
s3_configuration.uri.bucket, s3_configuration.url.bucket,
keys.back()); keys.back());
} }
@ -1179,7 +1179,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
format_settings, format_settings,
chosen_compression_method, chosen_compression_method,
s3_configuration, s3_configuration,
s3_configuration.uri.bucket, s3_configuration.url.bucket,
keys.back()); keys.back());
} }
} }
@ -1190,7 +1190,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
if (is_key_with_globs) if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.uri.key); "S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.url.key);
Aws::S3::Model::Delete delkeys; Aws::S3::Model::Delete delkeys;
@ -1203,7 +1203,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
ProfileEvents::increment(ProfileEvents::S3DeleteObjects); ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
S3::DeleteObjectsRequest request; S3::DeleteObjectsRequest request;
request.SetBucket(s3_configuration.uri.bucket); request.SetBucket(s3_configuration.url.bucket);
request.SetDelete(delkeys); request.SetDelete(delkeys);
auto response = s3_configuration.client->DeleteObjects(request); auto response = s3_configuration.client->DeleteObjects(request);
@ -1218,9 +1218,9 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
} }
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd) void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::Configuration & upd)
{ {
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString()); auto settings = ctx->getStorageS3Settings().getSettings(upd.url.uri.toString());
upd.request_settings = settings.request_settings; upd.request_settings = settings.request_settings;
upd.request_settings.updateFromSettings(ctx->getSettings()); upd.request_settings.updateFromSettings(ctx->getSettings());
@ -1244,7 +1244,7 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
upd.request_settings.get_request_throttler, upd.request_settings.get_request_throttler,
upd.request_settings.put_request_throttler); upd.request_settings.put_request_throttler);
client_configuration.endpointOverride = upd.uri.endpoint; client_configuration.endpointOverride = upd.url.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(upd.request_settings.max_connections); client_configuration.maxConnections = static_cast<unsigned>(upd.request_settings.max_connections);
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key); auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
@ -1254,7 +1254,7 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
upd.client = S3::ClientFactory::instance().create( upd.client = S3::ClientFactory::instance().create(
client_configuration, client_configuration,
upd.uri.is_virtual_hosted_style, upd.url.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(), credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(), credentials.GetAWSSecretKey(),
upd.auth_settings.server_side_encryption_customer_key_base64, upd.auth_settings.server_side_encryption_customer_key_base64,
@ -1263,15 +1263,15 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false))); upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
} }
void StorageS3::processNamedCollectionResult(StorageS3Configuration & configuration, const NamedCollection & collection) void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
{ {
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys); validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
configuration.url = collection.get<String>("url");
auto filename = collection.getOrDefault<String>("filename", ""); auto filename = collection.getOrDefault<String>("filename", "");
if (!filename.empty()) if (!filename.empty())
configuration.url = std::filesystem::path(configuration.url) / filename; configuration.url = S3::URI(std::filesystem::path(collection.get<String>("url")) / filename);
else
configuration.url = S3::URI(collection.get<String>("url"));
configuration.auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", ""); configuration.auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
configuration.auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", ""); configuration.auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
@ -1284,9 +1284,9 @@ void StorageS3::processNamedCollectionResult(StorageS3Configuration & configurat
configuration.request_settings = S3Settings::RequestSettings(collection); configuration.request_settings = S3Settings::RequestSettings(collection);
} }
StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context) StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
{ {
StorageS3Configuration configuration; StorageS3::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args)) if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args))
{ {
@ -1307,14 +1307,14 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
"Storage S3 requires 1 to 5 arguments: " "Storage S3 requires 1 to 5 arguments: "
"url, [access_key_id, secret_access_key], name of used format and [compression_method]."); "url, [access_key_id, secret_access_key], name of used format and [compression_method].");
auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers, local_context); auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context);
if (header_it != engine_args.end()) if (header_it != engine_args.end())
engine_args.erase(header_it); engine_args.erase(header_it);
for (auto & engine_arg : engine_args) for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
configuration.url = checkAndGetLiteralArgument<String>(engine_args[0], "url"); configuration.url = S3::URI(checkAndGetLiteralArgument<String>(engine_args[0], "url"));
if (engine_args.size() >= 4) if (engine_args.size() >= 4)
{ {
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id"); configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
@ -1332,36 +1332,30 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
configuration.format = checkAndGetLiteralArgument<String>(engine_args.back(), "format"); configuration.format = checkAndGetLiteralArgument<String>(engine_args.back(), "format");
} }
} }
configuration.static_configuration = !configuration.auth_settings.access_key_id.empty();
if (configuration.format == "auto") if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true); configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url.key, true);
return configuration; return configuration;
} }
ColumnsDescription StorageS3::getTableStructureFromData( ColumnsDescription StorageS3::getTableStructureFromData(
const StorageS3Configuration & configuration, StorageS3::Configuration & configuration,
bool distributed_processing, bool distributed_processing,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx,
ObjectInfos * object_infos) ObjectInfos * object_infos)
{ {
S3Configuration s3_configuration{ updateS3Configuration(ctx, configuration);
configuration.url,
configuration.auth_settings,
S3Settings::RequestSettings(ctx->getSettingsRef()),
configuration.headers};
updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl( return getTableStructureFromDataImpl(
configuration.format, s3_configuration, configuration.compression_method, distributed_processing, configuration.format, configuration, configuration.compression_method, distributed_processing,
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos); configuration.url.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos);
} }
ColumnsDescription StorageS3::getTableStructureFromDataImpl( ColumnsDescription StorageS3::getTableStructureFromDataImpl(
const String & format, const String & format,
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
const String & compression_method, const String & compression_method,
bool distributed_processing, bool distributed_processing,
bool is_key_with_globs, bool is_key_with_globs,
@ -1373,7 +1367,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
auto file_iterator = createFileIterator( auto file_iterator = createFileIterator(
s3_configuration, s3_configuration,
{s3_configuration.uri.key}, {s3_configuration.url.key},
is_key_with_globs, is_key_with_globs,
distributed_processing, distributed_processing,
ctx, nullptr, ctx, nullptr,
@ -1415,7 +1409,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max); int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max);
return wrapReadBufferWithCompressionMethod( return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>( std::make_unique<ReadBufferFromS3>(
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.request_settings, ctx->getReadSettings()), s3_configuration.client, s3_configuration.url.bucket, key, s3_configuration.url.version_id, s3_configuration.request_settings, ctx->getReadSettings()),
chooseCompressionMethod(key, compression_method), chooseCompressionMethod(key, compression_method),
zstd_window_log_max); zstd_window_log_max);
}; };
@ -1525,7 +1519,7 @@ SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx)
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache( std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
const Strings::const_iterator & begin, const Strings::const_iterator & begin,
const Strings::const_iterator & end, const Strings::const_iterator & end,
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
ObjectInfos * object_infos, ObjectInfos * object_infos,
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
@ -1534,7 +1528,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
auto & schema_cache = getSchemaCache(ctx); auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it) for (auto it = begin; it < end; ++it)
{ {
String path = fs::path(s3_configuration.uri.bucket) / *it; String path = fs::path(s3_configuration.url.bucket) / *it;
auto get_last_mod_time = [&]() -> std::optional<time_t> auto get_last_mod_time = [&]() -> std::optional<time_t>
{ {
S3::ObjectInfo info; S3::ObjectInfo info;
@ -1547,7 +1541,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
/// Note that in case of exception in getObjectInfo returned info will be empty, /// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache /// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time. /// because we can't say that it's valid without last modification time.
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, s3_configuration.request_settings, info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.url.bucket, *it, s3_configuration.url.version_id, s3_configuration.request_settings,
{}, {}, /* throw_on_error= */ false); {}, {}, /* throw_on_error= */ false);
if (object_infos) if (object_infos)
(*object_infos)[path] = info; (*object_infos)[path] = info;
@ -1559,7 +1553,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
return std::nullopt; return std::nullopt;
}; };
String source = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path; String source = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / path;
auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx); auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns) if (columns)
@ -1571,13 +1565,13 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
void StorageS3::addColumnsToCache( void StorageS3::addColumnsToCache(
const Strings & keys, const Strings & keys,
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
const ColumnsDescription & columns, const ColumnsDescription & columns,
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx) const ContextPtr & ctx)
{ {
auto host_and_bucket = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / s3_configuration.uri.bucket; auto host_and_bucket = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / s3_configuration.url.bucket;
Strings sources; Strings sources;
sources.reserve(keys.size()); sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; }); std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; });

View File

@ -21,6 +21,7 @@
#include <Interpreters/threadPoolCallbackRunner.h> #include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h> #include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
namespace Aws::S3 namespace Aws::S3
{ {
@ -238,8 +239,21 @@ private:
class StorageS3 : public IStorage, WithContext class StorageS3 : public IStorage, WithContext
{ {
public: public:
struct Configuration : public StatelessTableEngineConfiguration
{
S3::URI url;
std::shared_ptr<const S3::Client> client;
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HTTPHeaderEntries headers_from_ast;
};
StorageS3( StorageS3(
const StorageS3Configuration & configuration_, const StorageS3::Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -271,45 +285,18 @@ public:
bool supportsPartitionBy() const override; bool supportsPartitionBy() const override;
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context); static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
using ObjectInfos = StorageS3Source::ObjectInfos; using ObjectInfos = StorageS3Source::ObjectInfos;
static ColumnsDescription getTableStructureFromData( static ColumnsDescription getTableStructureFromData(
const StorageS3Configuration & configuration, StorageS3::Configuration & configuration,
bool distributed_processing, bool distributed_processing,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx,
ObjectInfos * object_infos = nullptr); ObjectInfos * object_infos = nullptr);
static void processNamedCollectionResult(StorageS3Configuration & configuration, const NamedCollection & collection); static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
struct S3Configuration
{
const S3::URI uri;
std::shared_ptr<const S3::Client> client;
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HTTPHeaderEntries headers_from_ast;
S3Configuration(
const String & url_,
const S3::AuthSettings & auth_settings_,
const S3Settings::RequestSettings & request_settings_,
const HTTPHeaderEntries & headers_from_ast_)
: uri(S3::URI(url_))
, auth_settings(auth_settings_)
, request_settings(request_settings_)
, static_configuration(!auth_settings_.access_key_id.empty())
, headers_from_ast(headers_from_ast_) {}
};
static SchemaCache & getSchemaCache(const ContextPtr & ctx); static SchemaCache & getSchemaCache(const ContextPtr & ctx);
@ -319,7 +306,7 @@ private:
friend class StorageHudi; friend class StorageHudi;
friend class StorageDeltaLake; friend class StorageDeltaLake;
S3Configuration s3_configuration; Configuration s3_configuration;
std::vector<String> keys; std::vector<String> keys;
NamesAndTypesList virtual_columns; NamesAndTypesList virtual_columns;
Block virtual_block; Block virtual_block;
@ -334,10 +321,10 @@ private:
ObjectInfos object_infos; ObjectInfos object_infos;
static void updateS3Configuration(ContextPtr, S3Configuration &); static void updateS3Configuration(ContextPtr, Configuration &);
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator( static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
const std::vector<String> & keys, const std::vector<String> & keys,
bool is_key_with_globs, bool is_key_with_globs,
bool distributed_processing, bool distributed_processing,
@ -349,7 +336,7 @@ private:
static ColumnsDescription getTableStructureFromDataImpl( static ColumnsDescription getTableStructureFromDataImpl(
const String & format, const String & format,
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
const String & compression_method, const String & compression_method,
bool distributed_processing, bool distributed_processing,
bool is_key_with_globs, bool is_key_with_globs,
@ -364,7 +351,7 @@ private:
static std::optional<ColumnsDescription> tryGetColumnsFromCache( static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings::const_iterator & begin, const Strings::const_iterator & begin,
const Strings::const_iterator & end, const Strings::const_iterator & end,
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
ObjectInfos * object_infos, ObjectInfos * object_infos,
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
@ -372,7 +359,7 @@ private:
static void addColumnsToCache( static void addColumnsToCache(
const Strings & keys, const Strings & keys,
const S3Configuration & s3_configuration, const Configuration & s3_configuration,
const ColumnsDescription & columns, const ColumnsDescription & columns,
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,

View File

@ -41,26 +41,26 @@ namespace DB
{ {
StorageS3Cluster::StorageS3Cluster( StorageS3Cluster::StorageS3Cluster(
const StorageS3ClusterConfiguration & configuration_, const Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
ContextPtr context_, ContextPtr context_,
bool structure_argument_was_provided_) bool structure_argument_was_provided_)
: IStorageCluster(table_id_) : IStorageCluster(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers} , s3_configuration{configuration_}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name) , cluster_name(configuration_.cluster_name)
, format_name(configuration_.format) , format_name(configuration_.format)
, compression_method(configuration_.compression_method) , compression_method(configuration_.compression_method)
, structure_argument_was_provided(structure_argument_was_provided_) , structure_argument_was_provided(structure_argument_was_provided_)
{ {
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename}); context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri);
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, s3_configuration); StorageS3::updateS3Configuration(context_, s3_configuration);
if (columns_.empty()) if (columns_.empty())
{ {
const auto & filename = configuration_.url.uri.getPath();
const bool is_key_with_globs = filename.find_first_of("*?{") != std::string::npos; const bool is_key_with_globs = filename.find_first_of("*?{") != std::string::npos;
/// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set /// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set
@ -173,7 +173,7 @@ ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const
{ {
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>( auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, context); *s3_configuration.client, s3_configuration.url, query, virtual_block, context);
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next().key; }); auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next().key; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
} }

View File

@ -21,8 +21,13 @@ class Context;
class StorageS3Cluster : public IStorageCluster class StorageS3Cluster : public IStorageCluster
{ {
public: public:
struct Configuration : public StorageS3::Configuration
{
std::string cluster_name;
};
StorageS3Cluster( StorageS3Cluster(
const StorageS3ClusterConfiguration & configuration_, const Configuration & configuration_,
const StorageID & table_id_, const StorageID & table_id_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
@ -43,8 +48,7 @@ public:
ClusterPtr getCluster(ContextPtr context) const override; ClusterPtr getCluster(ContextPtr context) const override;
private: private:
StorageS3::S3Configuration s3_configuration; StorageS3::Configuration s3_configuration;
String filename;
String cluster_name; String cluster_name;
String format_name; String format_name;
String compression_method; String compression_method;

View File

@ -10,6 +10,7 @@
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h> #include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
namespace DB namespace DB
@ -181,15 +182,10 @@ public:
static FormatSettings getFormatSettingsFromArgs(const StorageFactory::Arguments & args); static FormatSettings getFormatSettingsFromArgs(const StorageFactory::Arguments & args);
struct Configuration struct Configuration : public StatelessTableEngineConfiguration
{ {
std::string url; std::string url;
std::string http_method; std::string http_method;
std::string format = "auto";
std::string compression_method = "auto";
std::string structure = "auto";
HTTPHeaderEntries headers; HTTPHeaderEntries headers;
}; };

View File

@ -28,12 +28,12 @@ namespace ErrorCodes
void TableFunctionDeltaLake::parseArgumentsImpl( void TableFunctionDeltaLake::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration) const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & base_configuration)
{ {
if (args.empty() || args.size() > 6) if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers, context); auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers_from_ast, context);
if (header_it != args.end()) if (header_it != args.end())
args.erase(header_it); args.erase(header_it);
@ -77,7 +77,7 @@ void TableFunctionDeltaLake::parseArgumentsImpl(
} }
/// This argument is always the first /// This argument is always the first
base_configuration.url = checkAndGetLiteralArgument<String>(args[0], "url"); base_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format")) if (args_to_idx.contains("format"))
base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format"); base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");

View File

@ -34,9 +34,9 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & configuration); static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
StorageS3Configuration configuration; mutable StorageS3::Configuration configuration;
}; };
} }

View File

@ -28,12 +28,12 @@ namespace ErrorCodes
void TableFunctionHudi::parseArgumentsImpl( void TableFunctionHudi::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & base_configuration) const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & base_configuration)
{ {
if (args.empty() || args.size() > 6) if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers, context); auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers_from_ast, context);
if (header_it != args.end()) if (header_it != args.end())
args.erase(header_it); args.erase(header_it);
@ -77,7 +77,7 @@ void TableFunctionHudi::parseArgumentsImpl(
} }
/// This argument is always the first /// This argument is always the first
base_configuration.url = checkAndGetLiteralArgument<String>(args[0], "url"); base_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format")) if (args_to_idx.contains("format"))
base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format"); base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");

View File

@ -34,9 +34,9 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & configuration); static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
StorageS3Configuration configuration; mutable StorageS3::Configuration configuration;
}; };
} }

View File

@ -29,7 +29,7 @@ namespace ErrorCodes
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name /// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & s3_configuration) void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration)
{ {
if (auto named_collection = tryGetNamedCollectionWithOverrides(args)) if (auto named_collection = tryGetNamedCollectionWithOverrides(args))
{ {
@ -40,7 +40,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
if (args.empty() || args.size() > 6) if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto * header_it = StorageURL::collectHeaders(args, s3_configuration.headers, context); auto * header_it = StorageURL::collectHeaders(args, s3_configuration.headers_from_ast, context);
if (header_it != args.end()) if (header_it != args.end())
args.erase(header_it); args.erase(header_it);
@ -87,7 +87,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
} }
/// This argument is always the first /// This argument is always the first
s3_configuration.url = checkAndGetLiteralArgument<String>(args[0], "url"); s3_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format")) if (args_to_idx.contains("format"))
s3_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format"); s3_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
@ -106,7 +106,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
} }
if (s3_configuration.format == "auto") if (s3_configuration.format == "auto")
s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url, true); s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url.uri.getPath(), true);
} }
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context) void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context)

View File

@ -5,7 +5,7 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/StorageS3.h>
namespace DB namespace DB
@ -51,9 +51,9 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & configuration); static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
StorageS3Configuration configuration; mutable StorageS3::Configuration configuration;
ColumnsDescription structure_hint; ColumnsDescription structure_hint;
}; };

View File

@ -2,7 +2,6 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <Storages/StorageS3Cluster.h>
#include <Storages/StorageS3.h> #include <Storages/StorageS3.h>
#include <Storages/checkAndGetLiteralArgument.h> #include <Storages/checkAndGetLiteralArgument.h>
@ -73,8 +72,8 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
clipped_args.reserve(args.size()); clipped_args.reserve(args.size());
std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args)); std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args));
/// StorageS3ClusterConfiguration inherints from StorageS3Configuration, so it is safe to upcast it. /// StorageS3ClusterConfiguration inherints from StorageS3::Configuration, so it is safe to upcast it.
TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3Configuration & >(configuration)); TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3::Configuration & >(configuration));
} }

View File

@ -5,7 +5,7 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/StorageS3Cluster.h>
namespace DB namespace DB
@ -50,7 +50,7 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr) const override; ColumnsDescription getActualTableStructure(ContextPtr) const override;
void parseArguments(const ASTPtr &, ContextPtr) override; void parseArguments(const ASTPtr &, ContextPtr) override;
StorageS3ClusterConfiguration configuration; mutable StorageS3Cluster::Configuration configuration;
ColumnsDescription structure_hint; ColumnsDescription structure_hint;
}; };