mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 12:32:04 +00:00
Use BaseSettings for auth settings
This commit is contained in:
parent
c35ee6db20
commit
56c7301d46
@ -88,14 +88,10 @@ namespace
|
||||
std::move(headers),
|
||||
S3::CredentialsConfiguration
|
||||
{
|
||||
settings.auth_settings.use_environment_credentials.value_or(
|
||||
context->getConfigRef().getBool("s3.use_environment_credentials", true)),
|
||||
settings.auth_settings.use_insecure_imds_request.value_or(
|
||||
context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
|
||||
settings.auth_settings.expiration_window_seconds.value_or(
|
||||
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
|
||||
settings.auth_settings.no_sign_request.value_or(
|
||||
context->getConfigRef().getBool("s3.no_sign_request", false)),
|
||||
settings.auth_settings.use_environment_credentials,
|
||||
settings.auth_settings.use_insecure_imds_request,
|
||||
settings.auth_settings.expiration_window_seconds,
|
||||
settings.auth_settings.no_sign_request
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
return;
|
||||
}
|
||||
|
||||
auto auth_settings = S3::AuthSettings::loadFromConfig(config, config_prefix, Context::getGlobalContextInstance()->getSettingsRef());
|
||||
auto auth_settings = S3::AuthSettings(config, config_prefix, Context::getGlobalContextInstance()->getSettingsRef());
|
||||
|
||||
String endpoint = macros->expand(config.getString(config_prefix + ".endpoint"));
|
||||
auto new_uri = S3::URI{endpoint};
|
||||
@ -119,10 +119,10 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
std::move(headers),
|
||||
S3::CredentialsConfiguration
|
||||
{
|
||||
auth_settings.use_environment_credentials.value_or(true),
|
||||
auth_settings.use_insecure_imds_request.value_or(false),
|
||||
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
|
||||
auth_settings.no_sign_request.value_or(false),
|
||||
auth_settings.use_environment_credentials,
|
||||
auth_settings.use_insecure_imds_request,
|
||||
auth_settings.expiration_window_seconds,
|
||||
auth_settings.no_sign_request,
|
||||
},
|
||||
credentials.GetSessionToken());
|
||||
|
||||
|
@ -190,8 +190,8 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto settings = getSettings(config, config_prefix, context, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto endpoint = getEndpoint(config, config_prefix, context);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
@ -227,8 +227,8 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto settings = getSettings(config, config_prefix, context, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto endpoint = getEndpoint(config, config_prefix, context);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
@ -262,8 +262,8 @@ void registerS3PlainRewritableObjectStorage(ObjectStorageFactory & factory)
|
||||
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto settings = getSettings(config, config_prefix, context, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto endpoint = getEndpoint(config, config_prefix, context);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
|
@ -574,13 +574,13 @@ void S3ObjectStorage::applyNewSettings(
|
||||
ContextPtr context,
|
||||
const ApplyNewSettingsOptions & options)
|
||||
{
|
||||
auto settings_from_config = getSettings(config, config_prefix, context, for_disk_s3, context->getSettingsRef().s3_validate_request_settings);
|
||||
auto settings_from_config = getSettings(config, config_prefix, context, uri.endpoint, for_disk_s3, context->getSettingsRef().s3_validate_request_settings);
|
||||
auto modified_settings = std::make_unique<S3ObjectStorageSettings>(*s3_settings.get());
|
||||
modified_settings->auth_settings.updateFrom(settings_from_config->auth_settings);
|
||||
modified_settings->auth_settings.updateIfChanged(settings_from_config->auth_settings);
|
||||
modified_settings->request_settings.updateIfChanged(settings_from_config->request_settings);
|
||||
|
||||
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(uri.uri.toString(), context->getUserName()))
|
||||
modified_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
|
||||
modified_settings->auth_settings.updateIfChanged(endpoint_settings->auth_settings);
|
||||
|
||||
auto current_settings = s3_settings.get();
|
||||
if (options.allow_client_change
|
||||
@ -598,7 +598,7 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context)
|
||||
{
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context, for_disk_s3, true);
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context, uri.endpoint, for_disk_s3, true);
|
||||
auto new_client = getClient(uri, *new_s3_settings, context, for_disk_s3);
|
||||
|
||||
auto new_uri{uri};
|
||||
|
@ -34,14 +34,18 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
ContextPtr context,
|
||||
const std::string & endpoint,
|
||||
bool for_disk_s3,
|
||||
bool validate_settings)
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
const std::string setting_name_prefix = for_disk_s3 ? "s3_" : "";
|
||||
|
||||
auto auth_settings = S3::AuthSettings::loadFromConfig(config, config_prefix, settings);
|
||||
auto request_settings = S3::RequestSettings::loadFromConfig(config, config_prefix, settings, validate_settings, setting_name_prefix);
|
||||
auto auth_settings = S3::AuthSettings(config, config_prefix, settings);
|
||||
auto request_settings = S3::RequestSettings(config, config_prefix, settings, validate_settings, setting_name_prefix);
|
||||
|
||||
request_settings.proxy_resolver = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
|
||||
ProxyConfiguration::protocolFromString(S3::URI(endpoint).uri.getScheme()), config_prefix, config);
|
||||
|
||||
return std::make_unique<S3ObjectStorageSettings>(
|
||||
request_settings,
|
||||
@ -75,7 +79,7 @@ std::unique_ptr<S3::Client> getClient(
|
||||
const auto & request_settings = settings.request_settings;
|
||||
|
||||
const bool is_s3_express_bucket = S3::isS3ExpressEndpoint(url.endpoint);
|
||||
if (is_s3_express_bucket && auth_settings.region.empty())
|
||||
if (is_s3_express_bucket && auth_settings.region.value.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NO_ELEMENTS_IN_CONFIG,
|
||||
@ -93,43 +97,36 @@ std::unique_ptr<S3::Client> getClient(
|
||||
request_settings.put_request_throttler,
|
||||
url.uri.getScheme());
|
||||
|
||||
client_configuration.connectTimeoutMs = auth_settings.connect_timeout_ms.value_or(S3::DEFAULT_CONNECT_TIMEOUT_MS);
|
||||
client_configuration.requestTimeoutMs = auth_settings.request_timeout_ms.value_or(S3::DEFAULT_REQUEST_TIMEOUT_MS);
|
||||
client_configuration.maxConnections = static_cast<uint32_t>(auth_settings.max_connections.value_or(S3::DEFAULT_MAX_CONNECTIONS));
|
||||
client_configuration.http_keep_alive_timeout = auth_settings.http_keep_alive_timeout.value_or(S3::DEFAULT_KEEP_ALIVE_TIMEOUT);
|
||||
client_configuration.http_keep_alive_max_requests = auth_settings.http_keep_alive_max_requests.value_or(S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS);
|
||||
client_configuration.connectTimeoutMs = auth_settings.connect_timeout_ms;
|
||||
client_configuration.requestTimeoutMs = auth_settings.request_timeout_ms;
|
||||
client_configuration.maxConnections = static_cast<uint32_t>(auth_settings.max_connections);
|
||||
client_configuration.http_keep_alive_timeout = auth_settings.http_keep_alive_timeout;
|
||||
client_configuration.http_keep_alive_max_requests = auth_settings.http_keep_alive_max_requests;
|
||||
|
||||
client_configuration.endpointOverride = url.endpoint;
|
||||
client_configuration.s3_use_adaptive_timeouts = auth_settings.use_adaptive_timeouts.value_or(S3::DEFAULT_USE_ADAPTIVE_TIMEOUTS);
|
||||
client_configuration.s3_use_adaptive_timeouts = auth_settings.use_adaptive_timeouts;
|
||||
|
||||
if (for_disk_s3)
|
||||
if (request_settings.proxy_resolver)
|
||||
{
|
||||
/// TODO: move to S3Common auth settings parsing
|
||||
/*
|
||||
* Override proxy configuration for backwards compatibility with old configuration format.
|
||||
* */
|
||||
// if (auto proxy_config = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
|
||||
// ProxyConfiguration::protocolFromString(url.uri.getScheme()), config_prefix, config))
|
||||
// {
|
||||
// client_configuration.per_request_configuration
|
||||
// = [proxy_config]() { return proxy_config->resolve(); };
|
||||
// client_configuration.error_report
|
||||
// = [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
|
||||
// }
|
||||
client_configuration.per_request_configuration = [=]() { return request_settings.proxy_resolver->resolve(); };
|
||||
client_configuration.error_report = [=](const auto & request_config) { request_settings.proxy_resolver->errorReport(request_config); };
|
||||
}
|
||||
|
||||
S3::ClientSettings client_settings{
|
||||
.use_virtual_addressing = url.is_virtual_hosted_style,
|
||||
.disable_checksum = auth_settings.disable_checksum.value_or(S3::DEFAULT_DISABLE_CHECKSUM),
|
||||
.gcs_issue_compose_request = auth_settings.gcs_issue_compose_request.value_or(false),
|
||||
.disable_checksum = auth_settings.disable_checksum,
|
||||
.gcs_issue_compose_request = auth_settings.gcs_issue_compose_request,
|
||||
};
|
||||
|
||||
auto credentials_configuration = S3::CredentialsConfiguration
|
||||
{
|
||||
auth_settings.use_environment_credentials.value_or(S3::DEFAULT_USE_ENVIRONMENT_CREDENTIALS),
|
||||
auth_settings.use_insecure_imds_request.value_or(false),
|
||||
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
|
||||
auth_settings.no_sign_request.value_or(S3::DEFAULT_NO_SIGN_REQUEST),
|
||||
auth_settings.use_environment_credentials,
|
||||
auth_settings.use_insecure_imds_request,
|
||||
auth_settings.expiration_window_seconds,
|
||||
auth_settings.no_sign_request,
|
||||
};
|
||||
|
||||
return S3::ClientFactory::instance().create(
|
||||
|
@ -18,6 +18,7 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
ContextPtr context,
|
||||
const std::string & endpoint,
|
||||
bool for_disk_s3,
|
||||
bool validate_settings);
|
||||
|
||||
|
@ -61,6 +61,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace S3
|
||||
@ -101,146 +102,14 @@ ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, c
|
||||
return sse_kms_config;
|
||||
}
|
||||
|
||||
AuthSettings AuthSettings::loadFromConfig(
|
||||
AuthSettings::AuthSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const DB::Settings & settings,
|
||||
const DB::Settings &, /// TODO: use settings
|
||||
const std::string & setting_name_prefix)
|
||||
{
|
||||
auto auth_settings = AuthSettings::loadFromSettings(settings);
|
||||
|
||||
const std::string prefix = config_prefix + "." + setting_name_prefix;
|
||||
auto has = [&](const std::string & key) -> bool { return config.has(prefix + key); };
|
||||
auto get_uint = [&](const std::string & key) -> size_t { return config.getUInt64(prefix + key); };
|
||||
auto get_bool = [&](const std::string & key) -> bool { return config.getBool(prefix + key); };
|
||||
auto get_string = [&](const std::string & key) -> std::string { return config.getString(prefix + key); };
|
||||
|
||||
if (has("access_key_id"))
|
||||
auth_settings.access_key_id = get_string("access_key_id");
|
||||
if (has("secret_access_key"))
|
||||
auth_settings.secret_access_key = get_string("secret_access_key");
|
||||
if (has("session_token"))
|
||||
auth_settings.secret_access_key = get_string("session_token");
|
||||
|
||||
if (has("region"))
|
||||
auth_settings.region = get_string("region");
|
||||
if (has("server_side_encryption_customer_key_base64"))
|
||||
auth_settings.region = get_string("server_side_encryption_customer_key_base64");
|
||||
|
||||
if (has("connect_timeout_ms"))
|
||||
auth_settings.connect_timeout_ms = get_uint("connect_timeout_ms");
|
||||
if (has("request_timeout_ms"))
|
||||
auth_settings.request_timeout_ms = get_uint("request_timeout_ms");
|
||||
if (has("max_connections"))
|
||||
auth_settings.max_connections = get_uint("max_connections");
|
||||
|
||||
if (has("http_keep_alive_timeout"))
|
||||
auth_settings.http_keep_alive_timeout = get_uint("http_keep_alive_timeout");
|
||||
if (has("http_keep_alive_max_requests"))
|
||||
auth_settings.http_keep_alive_max_requests = get_uint("http_keep_alive_max_requests");
|
||||
|
||||
if (has("use_environment_credentials"))
|
||||
auth_settings.use_environment_credentials = get_bool("use_environment_credentials");
|
||||
if (has("use_adaptive_timeouts"))
|
||||
auth_settings.use_adaptive_timeouts = get_bool("use_adaptive_timeouts");
|
||||
if (has("no_sing_request"))
|
||||
auth_settings.no_sign_request = get_bool("no_sing_request");
|
||||
if (has("expiration_window_seconds"))
|
||||
auth_settings.expiration_window_seconds = get_uint("expiration_window_seconds");
|
||||
if (has("gcs_issue_compose_request"))
|
||||
auth_settings.gcs_issue_compose_request = get_bool("gcs_issue_compose_request");
|
||||
if (has("use_insecure_imds_request"))
|
||||
auth_settings.use_insecure_imds_request = get_bool("use_insecure_imds_request");
|
||||
|
||||
auth_settings.headers = getHTTPHeaders(config_prefix, config);
|
||||
auth_settings.server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_prefix, keys);
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (startsWith(key, "user"))
|
||||
auth_settings.users.insert(config.getString(config_prefix + "." + key));
|
||||
}
|
||||
|
||||
return auth_settings;
|
||||
}
|
||||
|
||||
AuthSettings AuthSettings::loadFromSettings(const DB::Settings & settings)
|
||||
{
|
||||
AuthSettings auth_settings{};
|
||||
auth_settings.updateFromSettings(settings, /* if_changed */false);
|
||||
return auth_settings;
|
||||
}
|
||||
|
||||
void AuthSettings::updateFromSettings(const DB::Settings & settings, bool if_changed)
|
||||
{
|
||||
if (!if_changed || settings.s3_connect_timeout_ms.changed)
|
||||
connect_timeout_ms = settings.s3_connect_timeout_ms;
|
||||
if (!if_changed || settings.s3_request_timeout_ms.changed)
|
||||
request_timeout_ms = settings.s3_request_timeout_ms;
|
||||
if (!if_changed || settings.s3_max_connections.changed)
|
||||
max_connections = settings.s3_max_connections;
|
||||
if (!if_changed || settings.s3_use_adaptive_timeouts.changed)
|
||||
use_adaptive_timeouts = settings.s3_use_adaptive_timeouts;
|
||||
if (!if_changed || settings.s3_disable_checksum.changed)
|
||||
disable_checksum = settings.s3_disable_checksum;
|
||||
}
|
||||
|
||||
bool AuthSettings::hasUpdates(const AuthSettings & other) const
|
||||
{
|
||||
AuthSettings copy = *this;
|
||||
copy.updateFrom(other);
|
||||
return *this != copy;
|
||||
}
|
||||
|
||||
void AuthSettings::updateFrom(const AuthSettings & from)
|
||||
{
|
||||
/// Update with check for emptyness only parameters which
|
||||
/// can be passed not only from config, but via ast.
|
||||
|
||||
if (!from.access_key_id.empty())
|
||||
access_key_id = from.access_key_id;
|
||||
if (!from.secret_access_key.empty())
|
||||
secret_access_key = from.secret_access_key;
|
||||
if (!from.session_token.empty())
|
||||
session_token = from.session_token;
|
||||
|
||||
if (!from.headers.empty())
|
||||
headers = from.headers;
|
||||
if (!from.region.empty())
|
||||
region = from.region;
|
||||
|
||||
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
|
||||
server_side_encryption_kms_config = from.server_side_encryption_kms_config;
|
||||
|
||||
if (from.use_environment_credentials.has_value())
|
||||
use_environment_credentials = from.use_environment_credentials;
|
||||
|
||||
if (from.use_insecure_imds_request.has_value())
|
||||
use_insecure_imds_request = from.use_insecure_imds_request;
|
||||
|
||||
if (from.expiration_window_seconds.has_value())
|
||||
expiration_window_seconds = from.expiration_window_seconds;
|
||||
|
||||
if (from.no_sign_request.has_value())
|
||||
no_sign_request = from.no_sign_request;
|
||||
|
||||
users.insert(from.users.begin(), from.users.end());
|
||||
}
|
||||
|
||||
RequestSettings RequestSettings::loadFromConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const DB::Settings & settings,
|
||||
bool validate_settings,
|
||||
const std::string & setting_name_prefix)
|
||||
{
|
||||
auto request_settings = RequestSettings::loadFromSettings(settings, validate_settings);
|
||||
String prefix = config_prefix + "." + setting_name_prefix;
|
||||
|
||||
auto values = request_settings.allMutable();
|
||||
for (auto & field : values)
|
||||
for (auto & field : allMutable())
|
||||
{
|
||||
const auto path = prefix + field.getName();
|
||||
if (config.has(path))
|
||||
@ -257,22 +126,92 @@ RequestSettings RequestSettings::loadFromConfig(
|
||||
}
|
||||
}
|
||||
|
||||
if (!request_settings.storage_class_name.value.empty())
|
||||
request_settings.storage_class_name = Poco::toUpperInPlace(request_settings.storage_class_name.value);
|
||||
headers = getHTTPHeaders(config_prefix, config);
|
||||
server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
|
||||
|
||||
if (validate_settings)
|
||||
request_settings.validateUploadSettings();
|
||||
|
||||
request_settings.initializeThrottler(settings);
|
||||
|
||||
return request_settings;
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_prefix, keys);
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (startsWith(key, "user"))
|
||||
users.insert(config.getString(config_prefix + "." + key));
|
||||
}
|
||||
}
|
||||
|
||||
RequestSettings RequestSettings::loadFromNamedCollection(const NamedCollection & collection, bool validate_settings)
|
||||
AuthSettings::AuthSettings(const DB::Settings & settings)
|
||||
{
|
||||
RequestSettings request_settings{};
|
||||
updateFromSettings(settings, /* if_changed */false);
|
||||
}
|
||||
|
||||
auto values = request_settings.allMutable();
|
||||
void AuthSettings::updateFromSettings(const DB::Settings & settings, bool if_changed)
|
||||
{
|
||||
for (auto & field : allMutable())
|
||||
{
|
||||
const auto setting_name = "s3_" + field.getName();
|
||||
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
|
||||
{
|
||||
set(field.getName(), settings.get(setting_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool AuthSettings::hasUpdates(const AuthSettings & other) const
|
||||
{
|
||||
AuthSettings copy = *this;
|
||||
copy.updateIfChanged(other);
|
||||
return *this != copy;
|
||||
}
|
||||
|
||||
void AuthSettings::updateIfChanged(const AuthSettings & settings)
|
||||
{
|
||||
/// Update with check for emptyness only parameters which
|
||||
/// can be passed not only from config, but via ast.
|
||||
|
||||
for (auto & setting : settings.all())
|
||||
{
|
||||
if (setting.isValueChanged())
|
||||
set(setting.getName(), setting.getValue());
|
||||
}
|
||||
|
||||
if (!settings.headers.empty())
|
||||
headers = settings.headers;
|
||||
server_side_encryption_kms_config = settings.server_side_encryption_kms_config;
|
||||
users.insert(settings.users.begin(), settings.users.end());
|
||||
}
|
||||
|
||||
RequestSettings::RequestSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const DB::Settings & settings,
|
||||
bool validate_settings,
|
||||
const std::string & setting_name_prefix)
|
||||
{
|
||||
String prefix = config_prefix + "." + setting_name_prefix;
|
||||
for (auto & field : allMutable())
|
||||
{
|
||||
const auto path = prefix + field.getName();
|
||||
if (config.has(path))
|
||||
{
|
||||
auto which = field.getValue().getType();
|
||||
if (isInt64OrUInt64FieldType(which))
|
||||
field.setValue(config.getUInt64(path));
|
||||
else if (which == Field::Types::String)
|
||||
field.setValue(config.getString(path));
|
||||
else if (which == Field::Types::Bool)
|
||||
field.setValue(config.getBool(path));
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
|
||||
}
|
||||
}
|
||||
finishInit(settings, validate_settings);
|
||||
}
|
||||
|
||||
RequestSettings::RequestSettings(
|
||||
const NamedCollection & collection,
|
||||
const DB::Settings & settings,
|
||||
bool validate_settings)
|
||||
{
|
||||
auto values = allMutable();
|
||||
for (auto & field : values)
|
||||
{
|
||||
const auto path = field.getName();
|
||||
@ -289,26 +228,17 @@ RequestSettings RequestSettings::loadFromNamedCollection(const NamedCollection &
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
|
||||
}
|
||||
}
|
||||
|
||||
if (!request_settings.storage_class_name.value.empty())
|
||||
request_settings.storage_class_name = Poco::toUpperInPlace(request_settings.storage_class_name.value);
|
||||
|
||||
if (validate_settings)
|
||||
request_settings.validateUploadSettings();
|
||||
|
||||
// request_settings.initializeThrottler(settings);
|
||||
|
||||
return request_settings;
|
||||
finishInit(settings, validate_settings);
|
||||
}
|
||||
|
||||
RequestSettings RequestSettings::loadFromSettings(const DB::Settings & settings, bool validate_settings)
|
||||
RequestSettings::RequestSettings(const DB::Settings & settings, bool validate_settings)
|
||||
{
|
||||
RequestSettings request_settings{};
|
||||
request_settings.updateFromSettings(settings, /* if_changed */false, validate_settings);
|
||||
return request_settings;
|
||||
updateFromSettings(settings, /* if_changed */false, validate_settings);
|
||||
finishInit(settings, validate_settings);
|
||||
}
|
||||
|
||||
void RequestSettings::updateFromSettings(const DB::Settings & settings, bool if_changed, bool validate_settings)
|
||||
void RequestSettings::updateFromSettings(
|
||||
const DB::Settings & settings, bool if_changed, bool /* validate_settings */) /// TODO: process validate_settings
|
||||
{
|
||||
for (auto & field : allMutable())
|
||||
{
|
||||
@ -318,12 +248,6 @@ void RequestSettings::updateFromSettings(const DB::Settings & settings, bool if_
|
||||
set(field.getName(), settings.get(setting_name));
|
||||
}
|
||||
}
|
||||
|
||||
if (!storage_class_name.value.empty())
|
||||
storage_class_name = Poco::toUpperInPlace(storage_class_name.value);
|
||||
|
||||
if (validate_settings)
|
||||
validateUploadSettings();
|
||||
}
|
||||
|
||||
void RequestSettings::updateIfChanged(const RequestSettings & settings)
|
||||
@ -335,8 +259,14 @@ void RequestSettings::updateIfChanged(const RequestSettings & settings)
|
||||
}
|
||||
}
|
||||
|
||||
void RequestSettings::initializeThrottler(const DB::Settings & settings)
|
||||
void RequestSettings::finishInit(const DB::Settings & settings, bool validate_settings)
|
||||
{
|
||||
if (!storage_class_name.value.empty() && storage_class_name.changed)
|
||||
storage_class_name = Poco::toUpperInPlace(storage_class_name.value);
|
||||
|
||||
if (validate_settings)
|
||||
validateUploadSettings();
|
||||
|
||||
/// NOTE: it would be better to reuse old throttlers
|
||||
/// to avoid losing token bucket state on every config reload,
|
||||
/// which could lead to exceeding limit for short time.
|
||||
@ -443,6 +373,9 @@ void RequestSettings::validateUploadSettings()
|
||||
|
||||
}
|
||||
|
||||
/// TODO: sometimes disk settings have fallback to "s3" section settings from config, support this.
|
||||
|
||||
IMPLEMENT_SETTINGS_TRAITS(S3::AuthSettingsTraits, CLIENT_SETTINGS_LIST)
|
||||
IMPLEMENT_SETTINGS_TRAITS(S3::RequestSettingsTraits, REQUEST_SETTINGS_LIST)
|
||||
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ namespace ErrorCodes
|
||||
|
||||
class RemoteHostFilter;
|
||||
class NamedCollection;
|
||||
struct ProxyConfigurationResolver;
|
||||
|
||||
class S3Exception : public Exception
|
||||
{
|
||||
@ -72,64 +73,34 @@ namespace Poco::Util
|
||||
namespace DB::S3
|
||||
{
|
||||
|
||||
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
#define AUTH_SETTINGS(M, ALIAS) \
|
||||
M(String, access_key_id, "", "", 0) \
|
||||
M(String, secret_access_key, "", "", 0) \
|
||||
M(String, session_token, "", "", 0) \
|
||||
M(String, region, "", "", 0) \
|
||||
M(String, server_side_encryption_customer_key_base64, "", "", 0) \
|
||||
|
||||
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
struct AuthSettings
|
||||
{
|
||||
std::string access_key_id;
|
||||
std::string secret_access_key;
|
||||
std::string session_token;
|
||||
std::string region;
|
||||
std::string server_side_encryption_customer_key_base64;
|
||||
|
||||
HTTPHeaderEntries headers;
|
||||
std::unordered_set<std::string> users;
|
||||
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
|
||||
|
||||
std::optional<size_t> connect_timeout_ms;
|
||||
std::optional<size_t> request_timeout_ms;
|
||||
std::optional<size_t> max_connections;
|
||||
std::optional<size_t> http_keep_alive_timeout;
|
||||
std::optional<size_t> http_keep_alive_max_requests;
|
||||
std::optional<size_t> expiration_window_seconds;
|
||||
|
||||
std::optional<bool> use_environment_credentials;
|
||||
std::optional<bool> no_sign_request;
|
||||
std::optional<bool> use_adaptive_timeouts;
|
||||
std::optional<bool> use_insecure_imds_request;
|
||||
std::optional<bool> is_virtual_hosted_style;
|
||||
std::optional<bool> disable_checksum;
|
||||
std::optional<bool> gcs_issue_compose_request;
|
||||
|
||||
bool hasUpdates(const AuthSettings & other) const;
|
||||
void updateFrom(const AuthSettings & from);
|
||||
|
||||
bool canBeUsedByUser(const String & user) const { return users.empty() || users.contains(user); }
|
||||
|
||||
static AuthSettings loadFromConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const DB::Settings & settings,
|
||||
const std::string & setting_name_prefix = "");
|
||||
|
||||
static AuthSettings loadFromSettings(const DB::Settings & settings);
|
||||
|
||||
static AuthSettings loadFromNamedCollection(const NamedCollection & collection);
|
||||
|
||||
void updateFromSettings(const DB::Settings & settings, bool if_changed);
|
||||
|
||||
private:
|
||||
bool operator==(const AuthSettings & other) const = default;
|
||||
};
|
||||
#define CLIENT_SETTINGS(M, ALIAS) \
|
||||
M(UInt64, connect_timeout_ms, DEFAULT_CONNECT_TIMEOUT_MS, "", 0) \
|
||||
M(UInt64, request_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
|
||||
M(UInt64, max_connections, DEFAULT_MAX_CONNECTIONS, "", 0) \
|
||||
M(UInt64, http_keep_alive_timeout, DEFAULT_KEEP_ALIVE_TIMEOUT, "", 0) \
|
||||
M(UInt64, http_keep_alive_max_requests, DEFAULT_KEEP_ALIVE_MAX_REQUESTS, "", 0) \
|
||||
M(UInt64, expiration_window_seconds, DEFAULT_EXPIRATION_WINDOW_SECONDS, "", 0) \
|
||||
M(Bool, use_environment_credentials, DEFAULT_USE_ENVIRONMENT_CREDENTIALS, "", 0) \
|
||||
M(Bool, no_sign_request, DEFAULT_NO_SIGN_REQUEST, "", 0) \
|
||||
M(Bool, use_insecure_imds_request, false, "", 0) \
|
||||
M(Bool, use_adaptive_timeouts, DEFAULT_USE_ADAPTIVE_TIMEOUTS, "", 0) \
|
||||
M(Bool, is_virtual_hosted_style, false, "", 0) \
|
||||
M(Bool, disable_checksum, DEFAULT_DISABLE_CHECKSUM, "", 0) \
|
||||
M(Bool, gcs_issue_compose_request, false, "", 0) \
|
||||
|
||||
#define REQUEST_SETTINGS(M, ALIAS) \
|
||||
M(UInt64, max_single_read_retries, 4, "", 0) \
|
||||
M(UInt64, request_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
|
||||
M(UInt64, list_object_keys_size, 1000, "", 0) \
|
||||
M(Bool, allow_native_copy, true, "", 0) \
|
||||
M(Bool, check_objects_after_upload, false, "", 0) \
|
||||
M(UInt64, list_object_keys_size, DEFAULT_LIST_OBJECT_KEYS_SIZE, "", 0) \
|
||||
M(Bool, allow_native_copy, DEFAULT_ALLOW_NATIVE_COPY, "", 0) \
|
||||
M(Bool, check_objects_after_upload, DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD, "", 0) \
|
||||
M(Bool, throw_on_zero_files_match, false, "", 0) \
|
||||
M(UInt64, max_single_operation_copy_size, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "", 0) \
|
||||
M(String, storage_class_name, "", "", 0) \
|
||||
@ -145,23 +116,56 @@ private:
|
||||
M(UInt64, max_single_part_upload_size, DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE, "", 0) \
|
||||
M(UInt64, max_unexpected_write_error_retries, 4, "", 0) \
|
||||
|
||||
#define CLIENT_SETTINGS_LIST(M, ALIAS) \
|
||||
CLIENT_SETTINGS(M, ALIAS) \
|
||||
AUTH_SETTINGS(M, ALIAS)
|
||||
|
||||
#define REQUEST_SETTINGS_LIST(M, ALIAS) \
|
||||
REQUEST_SETTINGS(M, ALIAS) \
|
||||
PART_UPLOAD_SETTINGS(M, ALIAS)
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(AuthSettingsTraits, CLIENT_SETTINGS_LIST)
|
||||
DECLARE_SETTINGS_TRAITS(RequestSettingsTraits, REQUEST_SETTINGS_LIST)
|
||||
|
||||
struct AuthSettings : public BaseSettings<AuthSettingsTraits>
|
||||
{
|
||||
AuthSettings() = default;
|
||||
|
||||
AuthSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const DB::Settings & settings,
|
||||
const std::string & setting_name_prefix = "");
|
||||
|
||||
AuthSettings(const DB::Settings & settings);
|
||||
|
||||
AuthSettings(const NamedCollection & collection);
|
||||
|
||||
void updateFromSettings(const DB::Settings & settings, bool if_changed);
|
||||
bool hasUpdates(const AuthSettings & other) const;
|
||||
void updateIfChanged(const AuthSettings & settings);
|
||||
bool canBeUsedByUser(const String & user) const { return users.empty() || users.contains(user); }
|
||||
|
||||
HTTPHeaderEntries headers;
|
||||
std::unordered_set<std::string> users;
|
||||
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
|
||||
};
|
||||
|
||||
struct RequestSettings : public BaseSettings<RequestSettingsTraits>
|
||||
{
|
||||
void validateUploadSettings();
|
||||
RequestSettings() = default;
|
||||
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
/// Create request settings from DB::Settings.
|
||||
explicit RequestSettings(const DB::Settings & settings, bool validate_settings = true);
|
||||
|
||||
static RequestSettings loadFromSettings(const DB::Settings & settings, bool validate_settings = true);
|
||||
static RequestSettings loadFromNamedCollection(const NamedCollection & collection, bool validate_settings = true);
|
||||
static RequestSettings loadFromConfig(
|
||||
/// Create request settings from NamedCollection.
|
||||
RequestSettings(
|
||||
const NamedCollection & collection,
|
||||
const DB::Settings & settings,
|
||||
bool validate_settings = true);
|
||||
|
||||
/// Create request settings from Config.
|
||||
RequestSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const DB::Settings & settings,
|
||||
@ -170,9 +174,18 @@ struct RequestSettings : public BaseSettings<RequestSettingsTraits>
|
||||
|
||||
void updateFromSettings(const DB::Settings & settings, bool if_changed, bool validate_settings = true);
|
||||
void updateIfChanged(const RequestSettings & settings);
|
||||
void validateUploadSettings();
|
||||
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
std::shared_ptr<ProxyConfigurationResolver> proxy_resolver;
|
||||
|
||||
private:
|
||||
void initializeThrottler(const DB::Settings & settings);
|
||||
void finishInit(const DB::Settings & settings, bool validate_settings);
|
||||
};
|
||||
|
||||
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
}
|
||||
|
@ -28,5 +28,8 @@ inline static constexpr uint64_t DEFAULT_MAX_PART_NUMBER = 10000;
|
||||
/// Other settings.
|
||||
inline static constexpr uint64_t DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 32 * 1024 * 1024;
|
||||
inline static constexpr uint64_t DEFAULT_MAX_INFLIGHT_PARTS_FOR_ONE_FILE = 20;
|
||||
inline static constexpr uint64_t DEFAULT_LIST_OBJECT_KEYS_SIZE = 1000;
|
||||
inline static constexpr uint64_t DEFAULT_ALLOW_NATIVE_COPY = true;
|
||||
inline static constexpr uint64_t DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD = false;
|
||||
|
||||
}
|
||||
|
@ -32,8 +32,8 @@ void S3SettingsByEndpoint::loadFromConfig(
|
||||
if (config.has(endpoint_path))
|
||||
{
|
||||
auto endpoint = config.getString(endpoint_path);
|
||||
auto auth_settings = S3::AuthSettings::loadFromConfig(config, key_path, settings);
|
||||
auto request_settings = S3::RequestSettings::loadFromConfig(config, key_path, settings);
|
||||
auto auth_settings = S3::AuthSettings(config, key_path, settings);
|
||||
auto request_settings = S3::RequestSettings(config, key_path, settings);
|
||||
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(request_settings)});
|
||||
}
|
||||
}
|
||||
|
@ -249,7 +249,7 @@ AzureClientPtr StorageAzureConfiguration::createClient(bool is_read_only, bool a
|
||||
return result;
|
||||
}
|
||||
|
||||
void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & collection)
|
||||
void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
|
||||
{
|
||||
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
|
||||
|
||||
|
@ -51,7 +51,7 @@ public:
|
||||
ContextPtr context) override;
|
||||
|
||||
protected:
|
||||
void fromNamedCollection(const NamedCollection & collection) override;
|
||||
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
|
||||
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
|
||||
|
||||
using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
|
||||
|
@ -119,7 +119,7 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
|
||||
setURL(url_str);
|
||||
}
|
||||
|
||||
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & collection)
|
||||
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
|
||||
{
|
||||
std::string url_str;
|
||||
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
ContextPtr context) override;
|
||||
|
||||
private:
|
||||
void fromNamedCollection(const NamedCollection &) override;
|
||||
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
|
||||
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
|
||||
void setURL(const std::string & url_);
|
||||
|
||||
|
@ -108,10 +108,12 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
|
||||
const auto & settings = context->getSettingsRef();
|
||||
const std::string config_prefix = "s3.";
|
||||
|
||||
auto s3_settings = getSettings(config, config_prefix, context, /* for_disk_s3 */false, settings.s3_validate_request_settings);
|
||||
auto s3_settings = getSettings(
|
||||
config, config_prefix, context, url.endpoint, /* for_disk_s3 */false,
|
||||
settings.s3_validate_request_settings);
|
||||
|
||||
request_settings.updateFromSettings(settings, /* if_changed */true);
|
||||
auth_settings.updateFrom(s3_settings->auth_settings);
|
||||
auth_settings.updateIfChanged(s3_settings->auth_settings);
|
||||
|
||||
s3_settings->auth_settings = auth_settings;
|
||||
s3_settings->request_settings = request_settings;
|
||||
@ -124,7 +126,7 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
|
||||
}
|
||||
|
||||
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(url.uri.toString(), context->getUserName()))
|
||||
s3_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
|
||||
s3_settings->auth_settings.updateIfChanged(endpoint_settings->auth_settings);
|
||||
|
||||
auto client = getClient(url, *s3_settings, context, /* for_disk_s3 */false);
|
||||
auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(url.key);
|
||||
@ -139,8 +141,9 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
|
||||
key_generator, "StorageS3", false);
|
||||
}
|
||||
|
||||
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection)
|
||||
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection, ContextPtr context)
|
||||
{
|
||||
const auto settings = context->getSettingsRef();
|
||||
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
|
||||
|
||||
auto filename = collection.getOrDefault<String>("filename", "");
|
||||
@ -159,9 +162,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
|
||||
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
|
||||
structure = collection.getOrDefault<String>("structure", "auto");
|
||||
|
||||
request_settings = S3::RequestSettings::loadFromNamedCollection(collection, /* validate_settings */true);
|
||||
request_settings = S3::RequestSettings(collection, settings, /* validate_settings */true);
|
||||
|
||||
static_configuration = !auth_settings.access_key_id.empty() || auth_settings.no_sign_request.has_value();
|
||||
static_configuration = !auth_settings.access_key_id.value.empty() || auth_settings.no_sign_request.changed;
|
||||
|
||||
keys = {url.key};
|
||||
}
|
||||
@ -357,7 +360,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
|
||||
if (no_sign_request)
|
||||
auth_settings.no_sign_request = no_sign_request;
|
||||
|
||||
static_configuration = !auth_settings.access_key_id.empty() || auth_settings.no_sign_request.has_value();
|
||||
static_configuration = !auth_settings.access_key_id.value.empty() || auth_settings.no_sign_request.changed;
|
||||
auth_settings.no_sign_request = no_sign_request;
|
||||
|
||||
keys = {url.key};
|
||||
|
@ -51,7 +51,7 @@ public:
|
||||
ContextPtr context) override;
|
||||
|
||||
private:
|
||||
void fromNamedCollection(const NamedCollection & collection) override;
|
||||
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
|
||||
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
|
||||
|
||||
S3::URI url;
|
||||
|
@ -424,7 +424,7 @@ void StorageObjectStorage::Configuration::initialize(
|
||||
bool with_table_structure)
|
||||
{
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context))
|
||||
configuration.fromNamedCollection(*named_collection);
|
||||
configuration.fromNamedCollection(*named_collection, local_context);
|
||||
else
|
||||
configuration.fromAST(engine_args, local_context, with_table_structure);
|
||||
|
||||
|
@ -193,7 +193,7 @@ public:
|
||||
String structure = "auto";
|
||||
|
||||
protected:
|
||||
virtual void fromNamedCollection(const NamedCollection & collection) = 0;
|
||||
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
|
||||
virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0;
|
||||
|
||||
void assertInitialized() const;
|
||||
|
Loading…
Reference in New Issue
Block a user