Merge pull request #71019 from Algunenano/settings6

Move remaining settings objects to pImpl and start simplification
This commit is contained in:
Raúl Marín 2024-10-25 20:23:12 +00:00 committed by GitHub
commit 9006eec35d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 1032 additions and 732 deletions

View File

@ -52,26 +52,6 @@ find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' 2>/dev/n
# Broken symlinks
find -L $ROOT_PATH -type l 2>/dev/null | grep -v contrib && echo "^ Broken symlinks found"
# Duplicated or incorrect setting declarations
SETTINGS_FILE=$(mktemp)
ALL_DECLARATION_FILES="
$ROOT_PATH/src/Core/Settings.cpp
$ROOT_PATH/src/Storages/MergeTree/MergeTreeSettings.cpp
$ROOT_PATH/src/Core/FormatFactorySettingsDeclaration.h"
cat $ROOT_PATH/src/Core/Settings.cpp $ROOT_PATH/src/Core/FormatFactorySettingsDeclaration.h | grep "M(" | awk '{print substr($2, 0, length($2) - 1) " Settings" substr($1, 3, length($1) - 3) " SettingsDeclaration" }' | sort | uniq > ${SETTINGS_FILE}
cat $ROOT_PATH/src/Storages/MergeTree/MergeTreeSettings.cpp | grep "M(" | awk '{print substr($2, 0, length($2) - 1) " MergeTreeSettings" substr($1, 3, length($1) - 3) " SettingsDeclaration" }' | sort | uniq >> ${SETTINGS_FILE}
# Check that if there are duplicated settings (declared in different objects) they all have the same type (it's simpler to validate style with that assert)
for setting in $(awk '{print $1 " " $2}' ${SETTINGS_FILE} | sed -e 's/MergeTreeSettings//g' -e 's/Settings//g' | sort | uniq | awk '{ print $1 }' | uniq -d);
do
echo "# Found multiple definitions of setting ${setting} with different types: "
grep --line-number " ${setting}," ${ALL_DECLARATION_FILES} | awk '{print " > " $0 }'
done
# We append all uses of extern found in implementation files to validate them in a single pass and avoid reading the same files over and over
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' | xargs grep -e "^\s*extern const Settings" -e "^\s**extern const MergeTreeSettings" -T | awk '{print substr($5, 0, length($5) -1) " " $4 " " substr($1, 0, length($1) - 1)}' >> ${SETTINGS_FILE}
# Duplicated or incorrect setting declarations
bash $ROOT_PATH/utils/check-style/check-settings-style

View File

@ -4,6 +4,7 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteBufferFromFile.h>

View File

@ -36,6 +36,24 @@ namespace Setting
extern const SettingsUInt64 s3_max_redirects;
}
namespace S3AuthSetting
{
extern const S3AuthSettingsString access_key_id;
extern const S3AuthSettingsUInt64 expiration_window_seconds;
extern const S3AuthSettingsBool no_sign_request;
extern const S3AuthSettingsString region;
extern const S3AuthSettingsString secret_access_key;
extern const S3AuthSettingsString server_side_encryption_customer_key_base64;
extern const S3AuthSettingsBool use_environment_credentials;
extern const S3AuthSettingsBool use_insecure_imds_request;
}
namespace S3RequestSetting
{
extern const S3RequestSettingsBool allow_native_copy;
extern const S3RequestSettingsString storage_class_name;
}
namespace ErrorCodes
{
extern const int S3_ERROR;
@ -55,7 +73,7 @@ namespace
HTTPHeaderEntries headers;
if (access_key_id.empty())
{
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);
credentials = Aws::Auth::AWSCredentials(settings.auth_settings[S3AuthSetting::access_key_id], settings.auth_settings[S3AuthSetting::secret_access_key]);
headers = settings.auth_settings.headers;
}
@ -64,7 +82,7 @@ namespace
const Settings & local_settings = context->getSettingsRef();
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region,
settings.auth_settings[S3AuthSetting::region],
context->getRemoteHostFilter(),
static_cast<unsigned>(local_settings[Setting::s3_max_redirects]),
static_cast<unsigned>(local_settings[Setting::backup_restore_s3_retry_attempts]),
@ -95,15 +113,15 @@ namespace
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
settings.auth_settings.server_side_encryption_customer_key_base64,
settings.auth_settings[S3AuthSetting::server_side_encryption_customer_key_base64],
settings.auth_settings.server_side_encryption_kms_config,
std::move(headers),
S3::CredentialsConfiguration
{
settings.auth_settings.use_environment_credentials,
settings.auth_settings.use_insecure_imds_request,
settings.auth_settings.expiration_window_seconds,
settings.auth_settings.no_sign_request
settings.auth_settings[S3AuthSetting::use_environment_credentials],
settings.auth_settings[S3AuthSetting::use_insecure_imds_request],
settings.auth_settings[S3AuthSetting::expiration_window_seconds],
settings.auth_settings[S3AuthSetting::no_sign_request]
});
}
@ -143,7 +161,7 @@ BackupReaderS3::BackupReaderS3(
}
s3_settings.request_settings.updateFromSettings(context_->getSettingsRef(), /* if_changed */true);
s3_settings.request_settings.allow_native_copy = allow_s3_native_copy;
s3_settings.request_settings[S3RequestSetting::allow_native_copy] = allow_s3_native_copy;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
@ -242,8 +260,8 @@ BackupWriterS3::BackupWriterS3(
}
s3_settings.request_settings.updateFromSettings(context_->getSettingsRef(), /* if_changed */true);
s3_settings.request_settings.allow_native_copy = allow_s3_native_copy;
s3_settings.request_settings.storage_class_name = storage_class_name;
s3_settings.request_settings[S3RequestSetting::allow_native_copy] = allow_s3_native_copy;
s3_settings.request_settings[S3RequestSetting::storage_class_name] = storage_class_name;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())

View File

@ -1,10 +1,9 @@
#pragma once
#include <memory>
#include <time.h>
#include <Compression/CompressedReadBufferBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <time.h>
#include <memory>
namespace DB

View File

@ -31,16 +31,34 @@ namespace fs = std::filesystem;
namespace DB
{
namespace S3AuthSetting
{
extern const S3AuthSettingsString access_key_id;
extern const S3AuthSettingsUInt64 expiration_window_seconds;
extern const S3AuthSettingsBool no_sign_request;
extern const S3AuthSettingsString region;
extern const S3AuthSettingsString secret_access_key;
extern const S3AuthSettingsString server_side_encryption_customer_key_base64;
extern const S3AuthSettingsString session_token;
extern const S3AuthSettingsBool use_environment_credentials;
extern const S3AuthSettingsBool use_insecure_imds_request;
}
namespace S3RequestSetting
{
extern const S3RequestSettingsUInt64 max_single_read_retries;
}
struct KeeperSnapshotManagerS3::S3Configuration
{
S3Configuration(S3::URI uri_, S3::AuthSettings auth_settings_, std::shared_ptr<const S3::Client> client_)
S3Configuration(S3::URI uri_, S3::S3AuthSettings auth_settings_, std::shared_ptr<const S3::Client> client_)
: uri(std::move(uri_))
, auth_settings(std::move(auth_settings_))
, client(std::move(client_))
{}
S3::URI uri;
S3::AuthSettings auth_settings;
S3::S3AuthSettings auth_settings;
std::shared_ptr<const S3::Client> client;
};
@ -66,7 +84,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
}
const auto & settings = Context::getGlobalContextInstance()->getSettingsRef();
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
auto auth_settings = S3::S3AuthSettings(config, settings, config_prefix);
String endpoint = macros->expand(config.getString(config_prefix + ".endpoint"));
auto new_uri = S3::URI{endpoint};
@ -81,7 +99,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
LOG_INFO(log, "S3 configuration was updated");
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token);
auto credentials = Aws::Auth::AWSCredentials(auth_settings[S3AuthSetting::access_key_id], auth_settings[S3AuthSetting::secret_access_key], auth_settings[S3AuthSetting::session_token]);
auto headers = auth_settings.headers;
static constexpr size_t s3_max_redirects = 10;
@ -95,7 +113,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
}
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
auth_settings.region,
auth_settings[S3AuthSetting::region],
RemoteHostFilter(), s3_max_redirects, s3_retry_attempts,
enable_s3_requests_logging,
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {},
@ -115,15 +133,15 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
auth_settings.server_side_encryption_customer_key_base64,
auth_settings[S3AuthSetting::server_side_encryption_customer_key_base64],
auth_settings.server_side_encryption_kms_config,
std::move(headers),
S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials,
auth_settings.use_insecure_imds_request,
auth_settings.expiration_window_seconds,
auth_settings.no_sign_request,
auth_settings[S3AuthSetting::use_environment_credentials],
auth_settings[S3AuthSetting::use_insecure_imds_request],
auth_settings[S3AuthSetting::expiration_window_seconds],
auth_settings[S3AuthSetting::no_sign_request],
},
credentials.GetSessionToken());
@ -156,7 +174,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
if (s3_client == nullptr)
return;
S3::RequestSettings request_settings_1;
S3::S3RequestSettings request_settings_1;
const auto create_writer = [&](const auto & key)
{
@ -199,8 +217,8 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
lock_writer.finalize();
// We read back the written UUID, if it's the same we can upload the file
S3::RequestSettings request_settings_2;
request_settings_2.max_single_read_retries = 1;
S3::S3RequestSettings request_settings_2;
request_settings_2[S3RequestSetting::max_single_read_retries] = 1;
ReadBufferFromS3 lock_reader
{
s3_client->client,

View File

@ -91,17 +91,12 @@ public:
virtual void set(std::string_view name, const Field & value);
Field get(std::string_view name) const;
void setString(std::string_view name, const String & value);
String getString(std::string_view name) const;
bool tryGet(std::string_view name, Field & value) const;
bool tryGetString(std::string_view name, String & value) const;
bool isChanged(std::string_view name) const;
SettingsChanges changes() const;
void applyChange(const SettingChange & change);
void applyChanges(const SettingsChanges & changes);
void applyChanges(const BaseSettings & changes); /// NOLINT
/// Resets all the settings to their default values.
void resetToDefault();
@ -118,15 +113,12 @@ public:
/// Checks if it's possible to assign a field to a specified value and throws an exception if not.
/// This function doesn't change the fields, it performs check only.
static void checkCanSet(std::string_view name, const Field & value);
static void checkCanSetString(std::string_view name, const String & str);
/// Conversions without changing the fields.
static Field castValueUtil(std::string_view name, const Field & value);
static String valueToStringUtil(std::string_view name, const Field & value);
static Field stringToValueUtil(std::string_view name, const String & str);
static std::string_view resolveName(std::string_view name);
void write(WriteBuffer & out, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT) const;
void read(ReadBuffer & in, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT);
@ -140,7 +132,6 @@ public:
const String & getName() const;
Field getValue() const;
void setValue(const Field & value);
Field getDefaultValue() const;
String getValueString() const;
String getDefaultValueString() const;
bool isValueChanged() const;
@ -273,27 +264,6 @@ Field BaseSettings<TTraits>::get(std::string_view name) const
return static_cast<Field>(getCustomSetting(name));
}
template <typename TTraits>
void BaseSettings<TTraits>::setString(std::string_view name, const String & value)
{
name = TTraits::resolveName(name);
const auto & accessor = Traits::Accessor::instance();
if (size_t index = accessor.find(name); index != static_cast<size_t>(-1))
accessor.setValueString(*this, index, value);
else
getCustomSetting(name).parseFromString(value);
}
template <typename TTraits>
String BaseSettings<TTraits>::getString(std::string_view name) const
{
name = TTraits::resolveName(name);
const auto & accessor = Traits::Accessor::instance();
if (size_t index = accessor.find(name); index != static_cast<size_t>(-1))
return accessor.getValueString(*this, index);
return getCustomSetting(name).toString();
}
template <typename TTraits>
bool BaseSettings<TTraits>::tryGet(std::string_view name, Field & value) const
{
@ -312,24 +282,6 @@ bool BaseSettings<TTraits>::tryGet(std::string_view name, Field & value) const
return false;
}
template <typename TTraits>
bool BaseSettings<TTraits>::tryGetString(std::string_view name, String & value) const
{
name = TTraits::resolveName(name);
const auto & accessor = Traits::Accessor::instance();
if (size_t index = accessor.find(name); index != static_cast<size_t>(-1))
{
value = accessor.getValueString(*this, index);
return true;
}
if (const auto * custom_setting = tryGetCustomSetting(name))
{
value = custom_setting->toString();
return true;
}
return false;
}
template <typename TTraits>
bool BaseSettings<TTraits>::isChanged(std::string_view name) const
{
@ -362,13 +314,6 @@ void BaseSettings<TTraits>::applyChanges(const SettingsChanges & changes)
applyChange(change);
}
template <typename TTraits>
void BaseSettings<TTraits>::applyChanges(const BaseSettings & other_settings)
{
for (const auto & field : other_settings)
set(field.getName(), field.getValue());
}
template <typename TTraits>
void BaseSettings<TTraits>::resetToDefault()
{
@ -438,13 +383,6 @@ void BaseSettings<TTraits>::checkCanSet(std::string_view name, const Field & val
castValueUtil(name, value);
}
template <typename TTraits>
void BaseSettings<TTraits>::checkCanSetString(std::string_view name, const String & str)
{
name = TTraits::resolveName(name);
stringToValueUtil(name, str);
}
template <typename TTraits>
Field BaseSettings<TTraits>::castValueUtil(std::string_view name, const Field & value)
{
@ -794,17 +732,6 @@ void BaseSettings<TTraits>::SettingFieldRef::setValue(const Field & value)
accessor->setValue(*settings, index, value);
}
template <typename TTraits>
Field BaseSettings<TTraits>::SettingFieldRef::getDefaultValue() const
{
if constexpr (Traits::allow_custom_settings)
{
if (custom_setting)
return static_cast<Field>(custom_setting->second);
}
return accessor->getDefaultValue(index);
}
template <typename TTraits>
String BaseSettings<TTraits>::SettingFieldRef::getValueString() const
{
@ -921,7 +848,6 @@ using AliasMap = std::unordered_map<std::string_view, std::string_view>;
void resetValueToDefault(Data & data, size_t index) const { return field_infos[index].reset_value_to_default_function(data); } \
void writeBinary(const Data & data, size_t index, WriteBuffer & out) const { return field_infos[index].write_binary_function(data, out); } \
void readBinary(Data & data, size_t index, ReadBuffer & in) const { return field_infos[index].read_binary_function(data, in); } \
Field getDefaultValue(size_t index) const { return field_infos[index].get_default_value_function(); } \
String getDefaultValueString(size_t index) const { return field_infos[index].get_default_value_string_function(); } \
private: \
Accessor(); \
@ -943,7 +869,6 @@ using AliasMap = std::unordered_map<std::string_view, std::string_view>;
void (*reset_value_to_default_function)(Data &) ; \
void (*write_binary_function)(const Data &, WriteBuffer &) ; \
void (*read_binary_function)(Data &, ReadBuffer &) ; \
Field (*get_default_value_function)() ; \
String (*get_default_value_string_function)() ; \
}; \
std::vector<FieldInfo> field_infos; \
@ -1056,7 +981,6 @@ struct DefineAliases
[](Data & data) { data.NAME = SettingField##TYPE{DEFAULT}; }, \
[](const Data & data, WriteBuffer & out) { data.NAME.writeBinary(out); }, \
[](Data & data, ReadBuffer & in) { data.NAME.readBinary(in); }, \
[]() -> Field { return static_cast<Field>(SettingField##TYPE{DEFAULT}); }, \
[]() -> String { return SettingField##TYPE{DEFAULT}.toString(); } \
});
}

View File

@ -4,6 +4,7 @@
#include <Core/BaseSettingsFwdMacros.h>
#include <Core/BaseSettingsFwdMacrosImpl.h>
#include <Core/BaseSettingsProgramOptions.h>
#include <Core/DistributedCacheProtocol.h>
#include <Core/FormatFactorySettings.h>
#include <Core/Settings.h>
#include <Core/SettingsChangesHistory.h>

View File

@ -5,9 +5,7 @@
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>
#include <Core/SettingsWriteFormat.h>
#include <Core/ParallelReplicasMode.h>
#include <base/types.h>
#include <Common/SettingConstraintWritability.h>
#include <Common/SettingsChanges.h>
#include <string_view>

View File

@ -12,7 +12,9 @@
#include <Core/ShortCircuitFunctionEvaluation.h>
#include <Core/StreamingHandleErrorMode.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadSettings.h>
#include <IO/DistributedCacheLogMode.h>
#include <IO/DistributedCachePoolBehaviourOnLimit.h>
#include <IO/ReadMethod.h>
#include <Parsers/IdentifierQuotingStyle.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/ShellCommandSettings.h>

View File

@ -1,13 +1,13 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <string>
#include <memory>
#include <string>
#include <IO/ReadBufferFromFileBase.h>
namespace DB
{
struct ReadSettings;
/** Create an object to read data from a file.
*

View File

@ -195,7 +195,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
S3::RequestSettings request_settings = s3_settings.get()->request_settings;
S3::S3RequestSettings request_settings = s3_settings.get()->request_settings;
/// NOTE: For background operations settings are not propagated from session or query. They are taken from
/// default user's .xml config. It's obscure and unclear behavior. For them it's always better
/// to rely on settings from disk.

View File

@ -20,8 +20,8 @@ struct S3ObjectStorageSettings
S3ObjectStorageSettings() = default;
S3ObjectStorageSettings(
const S3::RequestSettings & request_settings_,
const S3::AuthSettings & auth_settings_,
const S3::S3RequestSettings & request_settings_,
const S3::S3AuthSettings & auth_settings_,
uint64_t min_bytes_for_seek_,
int32_t list_object_keys_size_,
int32_t objects_chunk_size_to_delete_,
@ -34,8 +34,8 @@ struct S3ObjectStorageSettings
, read_only(read_only_)
{}
S3::RequestSettings request_settings;
S3::AuthSettings auth_settings;
S3::S3RequestSettings request_settings;
S3::S3AuthSettings auth_settings;
uint64_t min_bytes_for_seek;
int32_t list_object_keys_size;

View File

@ -33,6 +33,27 @@ namespace Setting
extern const SettingsUInt64 s3_retry_attempts;
}
namespace S3AuthSetting
{
extern const S3AuthSettingsString access_key_id;
extern const S3AuthSettingsUInt64 connect_timeout_ms;
extern const S3AuthSettingsBool disable_checksum;
extern const S3AuthSettingsUInt64 expiration_window_seconds;
extern const S3AuthSettingsBool gcs_issue_compose_request;
extern const S3AuthSettingsUInt64 http_keep_alive_max_requests;
extern const S3AuthSettingsUInt64 http_keep_alive_timeout;
extern const S3AuthSettingsUInt64 max_connections;
extern const S3AuthSettingsBool no_sign_request;
extern const S3AuthSettingsString region;
extern const S3AuthSettingsUInt64 request_timeout_ms;
extern const S3AuthSettingsString secret_access_key;
extern const S3AuthSettingsString server_side_encryption_customer_key_base64;
extern const S3AuthSettingsString session_token;
extern const S3AuthSettingsBool use_adaptive_timeouts;
extern const S3AuthSettingsBool use_environment_credentials;
extern const S3AuthSettingsBool use_insecure_imds_request;
}
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
@ -47,8 +68,8 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
{
const auto & settings = context->getSettingsRef();
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
auto request_settings = S3::RequestSettings(config, settings, config_prefix, "s3_", validate_settings);
auto auth_settings = S3::S3AuthSettings(config, settings, config_prefix);
auto request_settings = S3::S3RequestSettings(config, settings, config_prefix, "s3_", validate_settings);
request_settings.proxy_resolver = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
ProxyConfiguration::protocolFromString(S3::URI(endpoint).uri.getScheme()), config_prefix, config);
@ -85,7 +106,7 @@ std::unique_ptr<S3::Client> getClient(
const auto & request_settings = settings.request_settings;
const bool is_s3_express_bucket = S3::isS3ExpressEndpoint(url.endpoint);
if (is_s3_express_bucket && auth_settings.region.value.empty())
if (is_s3_express_bucket && auth_settings[S3AuthSetting::region].value.empty())
{
throw Exception(
ErrorCodes::NO_ELEMENTS_IN_CONFIG,
@ -107,7 +128,7 @@ std::unique_ptr<S3::Client> getClient(
enable_s3_requests_logging = local_settings[Setting::enable_s3_requests_logging];
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
auth_settings.region,
auth_settings[S3AuthSetting::region],
context->getRemoteHostFilter(),
s3_max_redirects,
s3_retry_attempts,
@ -117,14 +138,14 @@ std::unique_ptr<S3::Client> getClient(
request_settings.put_request_throttler,
url.uri.getScheme());
client_configuration.connectTimeoutMs = auth_settings.connect_timeout_ms;
client_configuration.requestTimeoutMs = auth_settings.request_timeout_ms;
client_configuration.maxConnections = static_cast<uint32_t>(auth_settings.max_connections);
client_configuration.http_keep_alive_timeout = auth_settings.http_keep_alive_timeout;
client_configuration.http_keep_alive_max_requests = auth_settings.http_keep_alive_max_requests;
client_configuration.connectTimeoutMs = auth_settings[S3AuthSetting::connect_timeout_ms];
client_configuration.requestTimeoutMs = auth_settings[S3AuthSetting::request_timeout_ms];
client_configuration.maxConnections = static_cast<uint32_t>(auth_settings[S3AuthSetting::max_connections]);
client_configuration.http_keep_alive_timeout = auth_settings[S3AuthSetting::http_keep_alive_timeout];
client_configuration.http_keep_alive_max_requests = auth_settings[S3AuthSetting::http_keep_alive_max_requests];
client_configuration.endpointOverride = url.endpoint;
client_configuration.s3_use_adaptive_timeouts = auth_settings.use_adaptive_timeouts;
client_configuration.s3_use_adaptive_timeouts = auth_settings[S3AuthSetting::use_adaptive_timeouts];
if (request_settings.proxy_resolver)
{
@ -137,28 +158,28 @@ std::unique_ptr<S3::Client> getClient(
S3::ClientSettings client_settings{
.use_virtual_addressing = url.is_virtual_hosted_style,
.disable_checksum = auth_settings.disable_checksum,
.gcs_issue_compose_request = auth_settings.gcs_issue_compose_request,
.disable_checksum = auth_settings[S3AuthSetting::disable_checksum],
.gcs_issue_compose_request = auth_settings[S3AuthSetting::gcs_issue_compose_request],
};
auto credentials_configuration = S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials,
auth_settings.use_insecure_imds_request,
auth_settings.expiration_window_seconds,
auth_settings.no_sign_request,
auth_settings[S3AuthSetting::use_environment_credentials],
auth_settings[S3AuthSetting::use_insecure_imds_request],
auth_settings[S3AuthSetting::expiration_window_seconds],
auth_settings[S3AuthSetting::no_sign_request],
};
return S3::ClientFactory::instance().create(
client_configuration,
client_settings,
auth_settings.access_key_id,
auth_settings.secret_access_key,
auth_settings.server_side_encryption_customer_key_base64,
auth_settings[S3AuthSetting::access_key_id],
auth_settings[S3AuthSetting::secret_access_key],
auth_settings[S3AuthSetting::server_side_encryption_customer_key_base64],
auth_settings.server_side_encryption_kms_config,
auth_settings.headers,
credentials_configuration,
auth_settings.session_token);
auth_settings[S3AuthSetting::session_token]);
}
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class DistributedCacheLogMode
{
LOG_NOTHING,
LOG_ON_ERROR,
LOG_ALL,
};
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class DistributedCachePoolBehaviourOnLimit
{
WAIT,
ALLOCATE_NEW_BYPASSING_POOL,
};
}

View File

@ -1,25 +1,13 @@
#pragma once
#include <Core/Types.h>
#include <Core/DistributedCacheProtocol.h>
#include <Core/Types.h>
#include <IO/DistributedCacheLogMode.h>
#include <IO/DistributedCachePoolBehaviourOnLimit.h>
namespace DB
{
enum class DistributedCachePoolBehaviourOnLimit
{
WAIT,
ALLOCATE_NEW_BYPASSING_POOL,
};
enum class DistributedCacheLogMode
{
LOG_NOTHING,
LOG_ON_ERROR,
LOG_ALL,
};
struct DistributedCacheSettings
{
bool throw_on_error = false;

View File

@ -33,6 +33,12 @@ namespace ProfileEvents
namespace DB
{
namespace S3RequestSetting
{
extern const S3RequestSettingsUInt64 max_single_read_retries;
}
namespace ErrorCodes
{
extern const int S3_ERROR;
@ -48,7 +54,7 @@ ReadBufferFromS3::ReadBufferFromS3(
const String & bucket_,
const String & key_,
const String & version_id_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t offset_,
@ -111,7 +117,7 @@ bool ReadBufferFromS3::nextImpl()
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 1; !next_result; ++attempt)
{
bool last_attempt = attempt >= request_settings.max_single_read_retries;
bool last_attempt = attempt >= request_settings[S3RequestSetting::max_single_read_retries];
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
@ -176,7 +182,7 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 1; n > 0; ++attempt)
{
bool last_attempt = attempt >= request_settings.max_single_read_retries;
bool last_attempt = attempt >= request_settings[S3RequestSetting::max_single_read_retries];
size_t bytes_copied = 0;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
@ -227,7 +233,7 @@ bool ReadBufferFromS3::processException(size_t read_offset, size_t attempt) cons
log,
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, "
"Attempt: {}/{}, Message: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, request_settings.max_single_read_retries,
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, request_settings[S3RequestSetting::max_single_read_retries],
getCurrentExceptionMessage(/* with_stacktrace = */ false));

View File

@ -28,7 +28,7 @@ private:
String bucket;
String key;
String version_id;
const S3::RequestSettings request_settings;
const S3::S3RequestSettings request_settings;
/// These variables are atomic because they can be used for `logging only`
/// (where it is not important to get consistent result)
@ -47,7 +47,7 @@ public:
const String & bucket_,
const String & key_,
const String & version_id_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
const ReadSettings & settings_,
bool use_external_buffer = false,
size_t offset_ = 0,

58
src/IO/ReadMethod.h Normal file
View File

@ -0,0 +1,58 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class LocalFSReadMethod : uint8_t
{
/**
* Simple synchronous reads with 'read'.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
read,
/**
* Simple synchronous reads with 'pread'.
* In contrast to 'read', shares single file descriptor from multiple threads.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
pread,
/**
* Use mmap after specified size or simple synchronous reads with 'pread'.
* Can use prefetch by asking OS to perform readahead.
*/
mmap,
/**
* Use the io_uring Linux subsystem for asynchronous reads.
* Can use direct IO after specified size.
* Can do prefetch with double buffering.
*/
io_uring,
/**
* Checks if data is in page cache with 'preadv2' on modern Linux kernels.
* If data is in page cache, read from the same thread.
* If not, offload IO to separate threadpool.
* Can do prefetch with double buffering.
* Can use specified priorities and limit the number of concurrent reads.
*/
pread_threadpool,
/// Use asynchronous reader with fake backend that in fact synchronous.
/// @attention Use only for testing purposes.
pread_fake_async
};
enum class RemoteFSReadMethod : uint8_t
{
read,
threadpool,
};
}

View File

@ -2,64 +2,16 @@
#include <cstddef>
#include <Core/Defines.h>
#include <IO/DistributedCacheSettings.h>
#include <IO/ReadMethod.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Interpreters/Cache/UserInfo.h>
#include <Common/Priority.h>
#include <Common/Scheduler/ResourceLink.h>
#include <IO/DistributedCacheSettings.h>
#include <Interpreters/Cache/UserInfo.h>
#include <Common/Throttler_fwd.h>
namespace DB
{
enum class LocalFSReadMethod : uint8_t
{
/**
* Simple synchronous reads with 'read'.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
read,
/**
* Simple synchronous reads with 'pread'.
* In contrast to 'read', shares single file descriptor from multiple threads.
* Can use direct IO after specified size.
* Can use prefetch by asking OS to perform readahead.
*/
pread,
/**
* Use mmap after specified size or simple synchronous reads with 'pread'.
* Can use prefetch by asking OS to perform readahead.
*/
mmap,
/**
* Use the io_uring Linux subsystem for asynchronous reads.
* Can use direct IO after specified size.
* Can do prefetch with double buffering.
*/
io_uring,
/**
* Checks if data is in page cache with 'preadv2' on modern Linux kernels.
* If data is in page cache, read from the same thread.
* If not, offload IO to separate threadpool.
* Can do prefetch with double buffering.
* Can use specified priorities and limit the number of concurrent reads.
*/
pread_threadpool,
/// Use asynchronous reader with fake backend that in fact synchronous.
/// @attention Use only for testing purposes.
pread_fake_async
};
enum class RemoteFSReadMethod : uint8_t
{
read,
threadpool,
};
class MMappedFileCache;
class PageCache;

View File

@ -46,6 +46,18 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace S3RequestSetting
{
extern const S3RequestSettingsBool allow_native_copy;
extern const S3RequestSettingsBool check_objects_after_upload;
extern const S3RequestSettingsUInt64 max_part_number;
extern const S3RequestSettingsUInt64 max_single_operation_copy_size;
extern const S3RequestSettingsUInt64 max_single_part_upload_size;
extern const S3RequestSettingsUInt64 max_unexpected_write_error_retries;
extern const S3RequestSettingsUInt64 max_upload_part_size;
extern const S3RequestSettingsUInt64 min_upload_part_size;
extern const S3RequestSettingsString storage_class_name;
}
namespace
{
@ -56,7 +68,7 @@ namespace
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
bool for_disk_s3_,
@ -80,7 +92,7 @@ namespace
std::shared_ptr<const S3::Client> client_ptr;
const String & dest_bucket;
const String & dest_key;
const S3::RequestSettings & request_settings;
const S3::S3RequestSettings & request_settings;
const std::optional<std::map<String, String>> & object_metadata;
ThreadPoolCallbackRunnerUnsafe<void> schedule;
bool for_disk_s3;
@ -125,7 +137,7 @@ namespace
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
const auto & storage_class_name = request_settings.storage_class_name;
const auto & storage_class_name = request_settings[S3RequestSetting::storage_class_name];
if (!storage_class_name.value.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
@ -185,7 +197,7 @@ namespace
request.SetMultipartUpload(multipart_upload);
size_t max_retries = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
size_t max_retries = std::max<UInt64>(request_settings[S3RequestSetting::max_unexpected_write_error_retries].value, 1UL);
for (size_t retries = 1;; ++retries)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
@ -290,9 +302,9 @@ namespace
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
UInt64 max_part_number = request_settings.max_part_number;
UInt64 min_upload_part_size = request_settings.min_upload_part_size;
UInt64 max_upload_part_size = request_settings.max_upload_part_size;
UInt64 max_part_number = request_settings[S3RequestSetting::max_part_number];
UInt64 min_upload_part_size = request_settings[S3RequestSetting::min_upload_part_size];
UInt64 max_upload_part_size = request_settings[S3RequestSetting::max_upload_part_size];
if (!max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_part_number must not be 0");
@ -469,7 +481,7 @@ namespace
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
bool for_disk_s3_,
@ -483,12 +495,12 @@ namespace
void performCopy()
{
if (size <= request_settings.max_single_part_upload_size)
if (size <= request_settings[S3RequestSetting::max_single_part_upload_size])
performSinglepartUpload();
else
performMultipartUpload();
if (request_settings.check_objects_after_upload)
if (request_settings[S3RequestSetting::check_objects_after_upload])
checkObjectAfterUpload();
}
@ -516,7 +528,7 @@ namespace
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
const auto & storage_class_name = request_settings.storage_class_name;
const auto & storage_class_name = request_settings[S3RequestSetting::storage_class_name];
if (!storage_class_name.value.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
@ -528,7 +540,7 @@ namespace
void processPutRequest(S3::PutObjectRequest & request)
{
size_t max_retries = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
size_t max_retries = std::max<UInt64>(request_settings[S3RequestSetting::max_unexpected_write_error_retries].value, 1UL);
for (size_t retries = 1;; ++retries)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);
@ -651,7 +663,7 @@ namespace
size_t src_size_,
const String & dest_bucket_,
const String & dest_key_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
const ReadSettings & read_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
@ -681,12 +693,12 @@ namespace
void performCopy()
{
LOG_TEST(log, "Copy object {} to {} using native copy", src_key, dest_key);
if (!supports_multipart_copy || size <= request_settings.max_single_operation_copy_size)
if (!supports_multipart_copy || size <= request_settings[S3RequestSetting::max_single_operation_copy_size])
performSingleOperationCopy();
else
performMultipartUploadCopy();
if (request_settings.check_objects_after_upload)
if (request_settings[S3RequestSetting::check_objects_after_upload])
checkObjectAfterUpload();
}
@ -718,7 +730,7 @@ namespace
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
}
const auto & storage_class_name = request_settings.storage_class_name;
const auto & storage_class_name = request_settings[S3RequestSetting::storage_class_name];
if (!storage_class_name.value.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
@ -730,7 +742,7 @@ namespace
void processCopyRequest(S3::CopyObjectRequest & request)
{
size_t max_retries = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
size_t max_retries = std::max<UInt64>(request_settings[S3RequestSetting::max_unexpected_write_error_retries].value, 1UL);
for (size_t retries = 1;; ++retries)
{
ProfileEvents::increment(ProfileEvents::S3CopyObject);
@ -852,7 +864,7 @@ void copyDataToS3File(
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3::RequestSettings & settings,
const S3::S3RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunnerUnsafe<void> schedule,
@ -883,7 +895,7 @@ void copyS3File(
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3::RequestSettings & settings,
const S3::S3RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
@ -912,7 +924,7 @@ void copyS3File(
for_disk_s3);
};
if (!settings.allow_native_copy)
if (!settings[S3RequestSetting::allow_native_copy])
{
fallback_method();
return;

View File

@ -39,7 +39,7 @@ void copyS3File(
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3::RequestSettings & settings,
const S3::S3RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
@ -58,7 +58,7 @@ void copyDataToS3File(
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3::RequestSettings & settings,
const S3::S3RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},

View File

@ -30,6 +30,12 @@
#include "TestPocoHTTPServer.h"
namespace DB::S3RequestSetting
{
extern const S3RequestSettingsUInt64 max_single_read_retries;
extern const S3RequestSettingsUInt64 max_unexpected_write_error_retries;
}
/*
* When all tests are executed together, `Context::getGlobalContextInstance()` is not null. Global context is used by
* ProxyResolvers to get proxy configuration (used by S3 clients). If global context does not have a valid ConfigRef, it relies on
@ -69,8 +75,8 @@ void doReadRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::U
UInt64 max_single_read_retries = 1;
DB::ReadSettings read_settings;
DB::S3::RequestSettings request_settings;
request_settings.max_single_read_retries = max_single_read_retries;
DB::S3::S3RequestSettings request_settings;
request_settings[DB::S3RequestSetting::max_single_read_retries] = max_single_read_retries;
DB::ReadBufferFromS3 read_buffer(
client,
uri.bucket,
@ -88,8 +94,8 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
{
UInt64 max_unexpected_write_error_retries = 1;
DB::S3::RequestSettings request_settings;
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
DB::S3::S3RequestSettings request_settings;
request_settings[DB::S3RequestSetting::max_unexpected_write_error_retries] = max_unexpected_write_error_retries;
DB::WriteBufferFromS3 write_buffer(
client,
uri.bucket,

210
src/IO/S3AuthSettings.cpp Normal file
View File

@ -0,0 +1,210 @@
#include <Core/BaseSettings.h>
#include <Core/BaseSettingsFwdMacrosImpl.h>
#include <Core/Settings.h>
#include <IO/S3AuthSettings.h>
#include <IO/S3Defines.h>
#include <IO/S3Common.h>
#include <Common/Exception.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
#define CLIENT_SETTINGS(DECLARE, ALIAS) \
DECLARE(UInt64, connect_timeout_ms, S3::DEFAULT_CONNECT_TIMEOUT_MS, "", 0) \
DECLARE(UInt64, request_timeout_ms, S3::DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
DECLARE(UInt64, max_connections, S3::DEFAULT_MAX_CONNECTIONS, "", 0) \
DECLARE(UInt64, http_keep_alive_timeout, S3::DEFAULT_KEEP_ALIVE_TIMEOUT, "", 0) \
DECLARE(UInt64, http_keep_alive_max_requests, S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS, "", 0) \
DECLARE(UInt64, expiration_window_seconds, S3::DEFAULT_EXPIRATION_WINDOW_SECONDS, "", 0) \
DECLARE(Bool, use_environment_credentials, S3::DEFAULT_USE_ENVIRONMENT_CREDENTIALS, "", 0) \
DECLARE(Bool, no_sign_request, S3::DEFAULT_NO_SIGN_REQUEST, "", 0) \
DECLARE(Bool, use_insecure_imds_request, false, "", 0) \
DECLARE(Bool, use_adaptive_timeouts, S3::DEFAULT_USE_ADAPTIVE_TIMEOUTS, "", 0) \
DECLARE(Bool, is_virtual_hosted_style, false, "", 0) \
DECLARE(Bool, disable_checksum, S3::DEFAULT_DISABLE_CHECKSUM, "", 0) \
DECLARE(Bool, gcs_issue_compose_request, false, "", 0)
#define AUTH_SETTINGS(DECLARE, ALIAS) \
DECLARE(String, access_key_id, "", "", 0) \
DECLARE(String, secret_access_key, "", "", 0) \
DECLARE(String, session_token, "", "", 0) \
DECLARE(String, region, "", "", 0) \
DECLARE(String, server_side_encryption_customer_key_base64, "", "", 0)
#define CLIENT_SETTINGS_LIST(M, ALIAS) \
CLIENT_SETTINGS(M, ALIAS) \
AUTH_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(S3AuthSettingsTraits, CLIENT_SETTINGS_LIST)
IMPLEMENT_SETTINGS_TRAITS(S3AuthSettingsTraits, CLIENT_SETTINGS_LIST)
struct S3AuthSettingsImpl : public BaseSettings<S3AuthSettingsTraits>
{
};
#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) S3AuthSettings##TYPE NAME = &S3AuthSettingsImpl ::NAME;
namespace S3AuthSetting
{
CLIENT_SETTINGS_LIST(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS)
}
#undef INITIALIZE_SETTING_EXTERN
namespace S3
{
namespace
{
bool setValueFromConfig(
const Poco::Util::AbstractConfiguration & config, const std::string & path, typename S3AuthSettingsImpl::SettingFieldRef & field)
{
if (!config.has(path))
return false;
auto which = field.getValue().getType();
if (isInt64OrUInt64FieldType(which))
field.setValue(config.getUInt64(path));
else if (which == Field::Types::String)
field.setValue(config.getString(path));
else if (which == Field::Types::Bool)
field.setValue(config.getBool(path));
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
return true;
}
}
S3AuthSettings::S3AuthSettings() : impl(std::make_unique<S3AuthSettingsImpl>())
{
}
S3AuthSettings::S3AuthSettings(
const Poco::Util::AbstractConfiguration & config, const DB::Settings & settings, const std::string & config_prefix)
: S3AuthSettings()
{
for (auto & field : impl->allMutable())
{
auto path = fmt::format("{}.{}", config_prefix, field.getName());
bool updated = setValueFromConfig(config, path, field);
if (!updated)
{
auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && settings.isChanged(setting_name))
field.setValue(settings.get(setting_name));
}
}
headers = getHTTPHeaders(config_prefix, config);
server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (startsWith(key, "user"))
users.insert(config.getString(config_prefix + "." + key));
}
}
S3AuthSettings::S3AuthSettings(const S3AuthSettings & settings)
: headers(settings.headers)
, users(settings.users)
, server_side_encryption_kms_config(settings.server_side_encryption_kms_config)
, impl(std::make_unique<S3AuthSettingsImpl>(*settings.impl))
{
}
S3AuthSettings::S3AuthSettings(S3AuthSettings && settings) noexcept
: headers(std::move(settings.headers))
, users(std::move(settings.users))
, server_side_encryption_kms_config(std::move(settings.server_side_encryption_kms_config))
, impl(std::make_unique<S3AuthSettingsImpl>(std::move(*settings.impl)))
{
}
S3AuthSettings::S3AuthSettings(const DB::Settings & settings) : impl(std::make_unique<S3AuthSettingsImpl>())
{
updateFromSettings(settings, /* if_changed */ false);
}
S3AuthSettings::~S3AuthSettings() = default;
S3AUTH_SETTINGS_SUPPORTED_TYPES(S3AuthSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
S3AuthSettings & S3AuthSettings::operator=(S3AuthSettings && settings) noexcept
{
headers = std::move(settings.headers);
users = std::move(settings.users);
server_side_encryption_kms_config = std::move(settings.server_side_encryption_kms_config);
*impl = std::move(*settings.impl);
return *this;
}
bool S3AuthSettings::operator==(const S3AuthSettings & right)
{
if (headers != right.headers)
return false;
if (users != right.users)
return false;
if (server_side_encryption_kms_config != right.server_side_encryption_kms_config)
return false;
return *impl == *right.impl;
}
void S3AuthSettings::updateFromSettings(const DB::Settings & settings, bool if_changed)
{
for (auto & field : impl->allMutable())
{
const auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
{
field.setValue(settings.get(setting_name));
}
}
}
bool S3AuthSettings::hasUpdates(const S3AuthSettings & other) const
{
S3AuthSettings copy{*this};
copy.updateIfChanged(other);
return *this != copy;
}
void S3AuthSettings::updateIfChanged(const S3AuthSettings & settings)
{
for (auto & setting : settings.impl->all())
{
if (setting.isValueChanged())
impl->set(setting.getName(), setting.getValue());
}
if (!settings.headers.empty())
headers = settings.headers;
if (!settings.users.empty())
users.insert(settings.users.begin(), settings.users.end());
if (settings.server_side_encryption_kms_config.key_id.has_value()
|| settings.server_side_encryption_kms_config.encryption_context.has_value()
|| settings.server_side_encryption_kms_config.key_id.has_value())
server_side_encryption_kms_config = settings.server_side_encryption_kms_config;
}
}
}

69
src/IO/S3AuthSettings.h Normal file
View File

@ -0,0 +1,69 @@
#pragma once
#include <Core/BaseSettingsFwdMacros.h>
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3/Client.h>
namespace Poco::Util
{
class AbstractConfiguration;
};
namespace DB
{
struct Settings;
struct S3AuthSettingsImpl;
/// List of available types supported in MaterializedMySQLSettings object
#define S3AUTH_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
M(CLASS_NAME, Bool) \
M(CLASS_NAME, UInt64) \
M(CLASS_NAME, String)
S3AUTH_SETTINGS_SUPPORTED_TYPES(S3AuthSettings, DECLARE_SETTING_TRAIT)
namespace S3
{
/// We use s3 settings for DiskS3, StorageS3 (StorageS3Cluster, S3Queue, etc), BackupIO_S3, etc.
/// 1. For DiskS3 we usually have configuration in disk section in configuration file.
/// REQUEST_SETTINGS, PART_UPLOAD_SETTINGS start with "s3_" prefix there, while AUTH_SETTINGS and CLIENT_SETTINGS do not
/// (does not make sense, but it happened this way).
/// If some setting is absent from disk configuration, we look up for it in the "s3." server config section,
/// where s3 settings no longer have "s3_" prefix like in disk configuration section.
/// If the settings is absent there as well, we look up for it in Users config (where query/session settings are also updated).
/// 2. For StorageS3 and similar - we look up to "s3." config section (again - settings there do not have "s3_" prefix).
/// If some setting is absent from there, we look up for it in Users config.
struct S3AuthSettings
{
S3AuthSettings();
S3AuthSettings(const S3AuthSettings & settings);
S3AuthSettings(S3AuthSettings && settings) noexcept;
S3AuthSettings(const Poco::Util::AbstractConfiguration & config, const DB::Settings & settings, const std::string & config_prefix);
explicit S3AuthSettings(const DB::Settings & settings);
~S3AuthSettings();
S3AuthSettings & operator=(S3AuthSettings && settings) noexcept;
bool operator==(const S3AuthSettings & right);
S3AUTH_SETTINGS_SUPPORTED_TYPES(S3AuthSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
void updateFromSettings(const DB::Settings & settings, bool if_changed);
bool hasUpdates(const S3AuthSettings & other) const;
void updateIfChanged(const S3AuthSettings & settings);
bool canBeUsedByUser(const String & user) const { return users.empty() || users.contains(user); }
HTTPHeaderEntries headers;
std::unordered_set<std::string> users;
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
private:
std::unique_ptr<S3AuthSettingsImpl> impl;
};
}
}

View File

@ -1,7 +1,6 @@
#include <IO/S3Common.h>
#include <Common/Exception.h>
#include <Common/StringUtils.h>
#include <Common/formatReadable.h>
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
@ -71,7 +70,6 @@ namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int INVALID_SETTING_VALUE;
}
namespace S3
@ -134,296 +132,6 @@ static bool setValueFromConfig(
return true;
}
AuthSettings::AuthSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix)
{
for (auto & field : allMutable())
{
auto path = fmt::format("{}.{}", config_prefix, field.getName());
bool updated = setValueFromConfig<AuthSettings>(config, path, field);
if (!updated)
{
auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && settings.isChanged(setting_name))
field.setValue(settings.get(setting_name));
}
}
headers = getHTTPHeaders(config_prefix, config);
server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (startsWith(key, "user"))
users.insert(config.getString(config_prefix + "." + key));
}
}
AuthSettings::AuthSettings(const DB::Settings & settings)
{
updateFromSettings(settings, /* if_changed */false);
}
void AuthSettings::updateFromSettings(const DB::Settings & settings, bool if_changed)
{
for (auto & field : allMutable())
{
const auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
{
field.setValue(settings.get(setting_name));
}
}
}
bool AuthSettings::hasUpdates(const AuthSettings & other) const
{
AuthSettings copy = *this;
copy.updateIfChanged(other);
return *this != copy;
}
void AuthSettings::updateIfChanged(const AuthSettings & settings)
{
for (auto & setting : settings.all())
{
if (setting.isValueChanged())
set(setting.getName(), setting.getValue());
}
if (!settings.headers.empty())
headers = settings.headers;
if (!settings.users.empty())
users.insert(settings.users.begin(), settings.users.end());
if (settings.server_side_encryption_kms_config.key_id.has_value()
|| settings.server_side_encryption_kms_config.encryption_context.has_value()
|| settings.server_side_encryption_kms_config.key_id.has_value())
server_side_encryption_kms_config = settings.server_side_encryption_kms_config;
}
RequestSettings::RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix,
const std::string & setting_name_prefix,
bool validate_settings)
{
for (auto & field : allMutable())
{
auto path = fmt::format("{}.{}{}", config_prefix, setting_name_prefix, field.getName());
bool updated = setValueFromConfig<RequestSettings>(config, path, field);
if (!updated)
{
auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && settings.isChanged(setting_name))
field.setValue(settings.get(setting_name));
}
}
finishInit(settings, validate_settings);
}
RequestSettings::RequestSettings(
const NamedCollection & collection,
const DB::Settings & settings,
bool validate_settings)
{
auto values = allMutable();
for (auto & field : values)
{
const auto path = field.getName();
if (collection.has(path))
{
auto which = field.getValue().getType();
if (isInt64OrUInt64FieldType(which))
field.setValue(collection.get<UInt64>(path));
else if (which == Field::Types::String)
field.setValue(collection.get<String>(path));
else if (which == Field::Types::Bool)
field.setValue(collection.get<bool>(path));
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
}
}
finishInit(settings, validate_settings);
}
RequestSettings::RequestSettings(const DB::Settings & settings, bool validate_settings)
{
updateFromSettings(settings, /* if_changed */false, validate_settings);
finishInit(settings, validate_settings);
}
void RequestSettings::updateFromSettings(
const DB::Settings & settings, bool if_changed, bool validate_settings)
{
for (auto & field : allMutable())
{
const auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
{
set(field.getName(), settings.get(setting_name));
}
}
normalizeSettings();
if (validate_settings)
validateUploadSettings();
}
void RequestSettings::updateIfChanged(const RequestSettings & settings)
{
for (auto & setting : settings.all())
{
if (setting.isValueChanged())
set(setting.getName(), setting.getValue());
}
}
void RequestSettings::normalizeSettings()
{
if (!storage_class_name.value.empty() && storage_class_name.changed)
storage_class_name = Poco::toUpperInPlace(storage_class_name.value);
}
void RequestSettings::finishInit(const DB::Settings & settings, bool validate_settings)
{
normalizeSettings();
if (validate_settings)
validateUploadSettings();
/// NOTE: it would be better to reuse old throttlers
/// to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time.
/// But it is good enough unless very high `burst` values are used.
if (UInt64 max_get_rps = isChanged("max_get_rps") ? get("max_get_rps").safeGet<UInt64>() : settings[Setting::s3_max_get_rps])
{
size_t default_max_get_burst
= settings[Setting::s3_max_get_burst] ? settings[Setting::s3_max_get_burst] : (Throttler::default_burst_seconds * max_get_rps);
size_t max_get_burst = isChanged("max_get_burts") ? get("max_get_burst").safeGet<UInt64>() : default_max_get_burst;
get_request_throttler = std::make_shared<Throttler>(max_get_rps, max_get_burst);
}
if (UInt64 max_put_rps = isChanged("max_put_rps") ? get("max_put_rps").safeGet<UInt64>() : settings[Setting::s3_max_put_rps])
{
size_t default_max_put_burst
= settings[Setting::s3_max_put_burst] ? settings[Setting::s3_max_put_burst] : (Throttler::default_burst_seconds * max_put_rps);
size_t max_put_burst = isChanged("max_put_burts") ? get("max_put_burst").safeGet<UInt64>() : default_max_put_burst;
put_request_throttler = std::make_shared<Throttler>(max_put_rps, max_put_burst);
}
}
void RequestSettings::validateUploadSettings()
{
static constexpr size_t min_upload_part_size_limit = 5 * 1024 * 1024;
if (strict_upload_part_size && strict_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting strict_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(strict_upload_part_size), ReadableSize(min_upload_part_size_limit));
if (min_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting min_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(min_upload_part_size), ReadableSize(min_upload_part_size_limit));
static constexpr size_t max_upload_part_size_limit = 5ull * 1024 * 1024 * 1024;
if (max_upload_part_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size has invalid value {} which is greater than the s3 API limit {}",
ReadableSize(max_upload_part_size), ReadableSize(max_upload_part_size_limit));
if (max_single_part_upload_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_part_upload_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_single_part_upload_size), ReadableSize(max_upload_part_size_limit));
if (max_single_operation_copy_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_operation_copy_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_single_operation_copy_size), ReadableSize(max_upload_part_size_limit));
if (max_upload_part_size < min_upload_part_size)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size ({}) can't be less than setting min_upload_part_size {}",
ReadableSize(max_upload_part_size), ReadableSize(min_upload_part_size));
if (!upload_part_size_multiply_factor)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor cannot be zero");
if (!upload_part_size_multiply_parts_count_threshold)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_parts_count_threshold cannot be zero");
if (!max_part_number)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number cannot be zero");
static constexpr size_t max_part_number_limit = 10000;
if (max_part_number > max_part_number_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_part_number), ReadableSize(max_part_number_limit));
size_t maybe_overflow;
if (common::mulOverflow(max_upload_part_size.value, upload_part_size_multiply_factor.value, maybe_overflow))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor is too big ({}). "
"Multiplication to max_upload_part_size ({}) will cause integer overflow",
ReadableSize(max_part_number), ReadableSize(max_part_number_limit));
std::unordered_set<String> storage_class_names {"STANDARD", "INTELLIGENT_TIERING"};
if (!storage_class_name.value.empty() && !storage_class_names.contains(storage_class_name))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting storage_class has invalid value {} which only supports STANDARD and INTELLIGENT_TIERING",
storage_class_name.value);
/// TODO: it's possible to set too small limits.
/// We can check that max possible object size is not too small.
}
bool operator==(const AuthSettings & left, const AuthSettings & right)
{
if (left.headers != right.headers)
return false;
if (left.users != right.users)
return false;
if (left.server_side_encryption_kms_config != right.server_side_encryption_kms_config)
return false;
auto l = left.begin();
for (const auto & r : right)
{
if ((l == left.end()) || (*l != r))
return false;
++l;
}
return l == left.end();
}
}
IMPLEMENT_SETTINGS_TRAITS(S3::AuthSettingsTraits, CLIENT_SETTINGS_LIST)
IMPLEMENT_SETTINGS_TRAITS(S3::RequestSettingsTraits, REQUEST_SETTINGS_LIST)
}

View File

@ -1,16 +1,10 @@
#pragma once
#include <IO/S3/Client.h>
#include <IO/S3/PocoHTTPClient.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3Defines.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
#include <Common/Throttler.h>
#include <Core/SettingsEnums.h>
#include <Core/BaseSettings.h>
#include <Interpreters/Context.h>
#include <unordered_set>
#include "config.h"
@ -70,137 +64,12 @@ namespace Poco::Util
namespace DB
{
class NamedCollection;
struct ProxyConfigurationResolver;
namespace S3
{
/// We use s3 settings for DiskS3, StorageS3 (StorageS3Cluster, S3Queue, etc), BackupIO_S3, etc.
/// 1. For DiskS3 we usually have configuration in disk section in configuration file.
/// REQUEST_SETTINGS, PART_UPLOAD_SETTINGS start with "s3_" prefix there, while AUTH_SETTINGS and CLIENT_SETTINGS do not
/// (does not make sense, but it happened this way).
/// If some setting is absent from disk configuration, we look up for it in the "s3." server config section,
/// where s3 settings no longer have "s3_" prefix like in disk configuration section.
/// If the settings is absent there as well, we look up for it in Users config (where query/session settings are also updated).
/// 2. For StorageS3 and similar - we look up to "s3." config section (again - settings there do not have "s3_" prefix).
/// If some setting is absent from there, we lool up for it in Users config.
#define AUTH_SETTINGS(M, ALIAS) \
M(String, access_key_id, "", "", 0) \
M(String, secret_access_key, "", "", 0) \
M(String, session_token, "", "", 0) \
M(String, region, "", "", 0) \
M(String, server_side_encryption_customer_key_base64, "", "", 0) \
#define CLIENT_SETTINGS(M, ALIAS) \
M(UInt64, connect_timeout_ms, DEFAULT_CONNECT_TIMEOUT_MS, "", 0) \
M(UInt64, request_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
M(UInt64, max_connections, DEFAULT_MAX_CONNECTIONS, "", 0) \
M(UInt64, http_keep_alive_timeout, DEFAULT_KEEP_ALIVE_TIMEOUT, "", 0) \
M(UInt64, http_keep_alive_max_requests, DEFAULT_KEEP_ALIVE_MAX_REQUESTS, "", 0) \
M(UInt64, expiration_window_seconds, DEFAULT_EXPIRATION_WINDOW_SECONDS, "", 0) \
M(Bool, use_environment_credentials, DEFAULT_USE_ENVIRONMENT_CREDENTIALS, "", 0) \
M(Bool, no_sign_request, DEFAULT_NO_SIGN_REQUEST, "", 0) \
M(Bool, use_insecure_imds_request, false, "", 0) \
M(Bool, use_adaptive_timeouts, DEFAULT_USE_ADAPTIVE_TIMEOUTS, "", 0) \
M(Bool, is_virtual_hosted_style, false, "", 0) \
M(Bool, disable_checksum, DEFAULT_DISABLE_CHECKSUM, "", 0) \
M(Bool, gcs_issue_compose_request, false, "", 0) \
#define REQUEST_SETTINGS(M, ALIAS) \
M(UInt64, max_single_read_retries, 4, "", 0) \
M(UInt64, request_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
M(UInt64, list_object_keys_size, DEFAULT_LIST_OBJECT_KEYS_SIZE, "", 0) \
M(Bool, allow_native_copy, DEFAULT_ALLOW_NATIVE_COPY, "", 0) \
M(Bool, check_objects_after_upload, DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD, "", 0) \
M(Bool, throw_on_zero_files_match, false, "", 0) \
M(UInt64, max_single_operation_copy_size, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "", 0) \
M(String, storage_class_name, "", "", 0) \
#define PART_UPLOAD_SETTINGS(M, ALIAS) \
M(UInt64, strict_upload_part_size, 0, "", 0) \
M(UInt64, min_upload_part_size, DEFAULT_MIN_UPLOAD_PART_SIZE, "", 0) \
M(UInt64, max_upload_part_size, DEFAULT_MAX_UPLOAD_PART_SIZE, "", 0) \
M(UInt64, upload_part_size_multiply_factor, DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR, "", 0) \
M(UInt64, upload_part_size_multiply_parts_count_threshold, DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD, "", 0) \
M(UInt64, max_inflight_parts_for_one_file, DEFAULT_MAX_INFLIGHT_PARTS_FOR_ONE_FILE, "", 0) \
M(UInt64, max_part_number, DEFAULT_MAX_PART_NUMBER, "", 0) \
M(UInt64, max_single_part_upload_size, DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE, "", 0) \
M(UInt64, max_unexpected_write_error_retries, 4, "", 0) \
#define CLIENT_SETTINGS_LIST(M, ALIAS) \
CLIENT_SETTINGS(M, ALIAS) \
AUTH_SETTINGS(M, ALIAS)
#define REQUEST_SETTINGS_LIST(M, ALIAS) \
REQUEST_SETTINGS(M, ALIAS) \
PART_UPLOAD_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(AuthSettingsTraits, CLIENT_SETTINGS_LIST)
DECLARE_SETTINGS_TRAITS(RequestSettingsTraits, REQUEST_SETTINGS_LIST)
struct AuthSettings : public BaseSettings<AuthSettingsTraits>
{
AuthSettings() = default;
AuthSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix);
explicit AuthSettings(const DB::Settings & settings);
explicit AuthSettings(const DB::NamedCollection & collection);
void updateFromSettings(const DB::Settings & settings, bool if_changed);
bool hasUpdates(const AuthSettings & other) const;
void updateIfChanged(const AuthSettings & settings);
bool canBeUsedByUser(const String & user) const { return users.empty() || users.contains(user); }
HTTPHeaderEntries headers;
std::unordered_set<std::string> users;
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
/// Note: if you add any field, do not forget to update operator ==.
};
bool operator==(const AuthSettings & left, const AuthSettings & right);
struct RequestSettings : public BaseSettings<RequestSettingsTraits>
{
RequestSettings() = default;
/// Create request settings from Config.
RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix,
const std::string & setting_name_prefix = "",
bool validate_settings = true);
/// Create request settings from DB::Settings.
explicit RequestSettings(const DB::Settings & settings, bool validate_settings = true);
/// Create request settings from NamedCollection.
RequestSettings(
const NamedCollection & collection,
const DB::Settings & settings,
bool validate_settings = true);
void updateFromSettings(const DB::Settings & settings, bool if_changed, bool validate_settings = true);
void updateIfChanged(const RequestSettings & settings);
void validateUploadSettings();
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
std::shared_ptr<ProxyConfigurationResolver> proxy_resolver;
private:
void finishInit(const DB::Settings & settings, bool validate_settings);
void normalizeSettings();
};
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
}

View File

@ -0,0 +1,319 @@
#include <Core/BaseSettings.h>
#include <Core/BaseSettingsFwdMacrosImpl.h>
#include <Core/Settings.h>
#include <IO/S3Common.h>
#include <IO/S3Defines.h>
#include <IO/S3RequestSettings.h>
#include <Common/Exception.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/Throttler.h>
#include <Common/formatReadable.h>
#include <Poco/String.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 s3_max_get_burst;
extern const SettingsUInt64 s3_max_get_rps;
extern const SettingsUInt64 s3_max_put_burst;
extern const SettingsUInt64 s3_max_put_rps;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int INVALID_SETTING_VALUE;
}
#define REQUEST_SETTINGS(DECLARE, ALIAS) \
DECLARE(UInt64, max_single_read_retries, 4, "", 0) \
DECLARE(UInt64, request_timeout_ms, S3::DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
DECLARE(UInt64, list_object_keys_size, S3::DEFAULT_LIST_OBJECT_KEYS_SIZE, "", 0) \
DECLARE(Bool, allow_native_copy, S3::DEFAULT_ALLOW_NATIVE_COPY, "", 0) \
DECLARE(Bool, check_objects_after_upload, S3::DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD, "", 0) \
DECLARE(Bool, throw_on_zero_files_match, false, "", 0) \
DECLARE(UInt64, max_single_operation_copy_size, S3::DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "", 0) \
DECLARE(String, storage_class_name, "", "", 0)
#define PART_UPLOAD_SETTINGS(DECLARE, ALIAS) \
DECLARE(UInt64, strict_upload_part_size, 0, "", 0) \
DECLARE(UInt64, min_upload_part_size, S3::DEFAULT_MIN_UPLOAD_PART_SIZE, "", 0) \
DECLARE(UInt64, max_upload_part_size, S3::DEFAULT_MAX_UPLOAD_PART_SIZE, "", 0) \
DECLARE(UInt64, upload_part_size_multiply_factor, S3::DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR, "", 0) \
DECLARE(UInt64, upload_part_size_multiply_parts_count_threshold, S3::DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD, "", 0) \
DECLARE(UInt64, max_inflight_parts_for_one_file, S3::DEFAULT_MAX_INFLIGHT_PARTS_FOR_ONE_FILE, "", 0) \
DECLARE(UInt64, max_part_number, S3::DEFAULT_MAX_PART_NUMBER, "", 0) \
DECLARE(UInt64, max_single_part_upload_size, S3::DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE, "", 0) \
DECLARE(UInt64, max_unexpected_write_error_retries, 4, "", 0)
#define REQUEST_SETTINGS_LIST(M, ALIAS) \
REQUEST_SETTINGS(M, ALIAS) \
PART_UPLOAD_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(S3RequestSettingsTraits, REQUEST_SETTINGS_LIST)
IMPLEMENT_SETTINGS_TRAITS(S3RequestSettingsTraits, REQUEST_SETTINGS_LIST)
struct S3RequestSettingsImpl : public BaseSettings<S3RequestSettingsTraits>
{
};
#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) S3RequestSettings##TYPE NAME = &S3RequestSettingsImpl ::NAME;
namespace S3RequestSetting
{
REQUEST_SETTINGS_LIST(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS)
}
#undef INITIALIZE_SETTING_EXTERN
namespace S3
{
namespace
{
bool setValueFromConfig(
const Poco::Util::AbstractConfiguration & config, const std::string & path, typename S3RequestSettingsImpl::SettingFieldRef & field)
{
if (!config.has(path))
return false;
auto which = field.getValue().getType();
if (isInt64OrUInt64FieldType(which))
field.setValue(config.getUInt64(path));
else if (which == Field::Types::String)
field.setValue(config.getString(path));
else if (which == Field::Types::Bool)
field.setValue(config.getBool(path));
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
return true;
}
}
S3RequestSettings::S3RequestSettings() : impl(std::make_unique<S3RequestSettingsImpl>())
{
}
S3RequestSettings::S3RequestSettings(const S3RequestSettings & settings)
: get_request_throttler(settings.get_request_throttler)
, put_request_throttler(settings.put_request_throttler)
, proxy_resolver(settings.proxy_resolver)
, impl(std::make_unique<S3RequestSettingsImpl>(*settings.impl))
{
}
S3RequestSettings::S3RequestSettings(S3RequestSettings && settings) noexcept
: get_request_throttler(std::move(settings.get_request_throttler))
, put_request_throttler(std::move(settings.put_request_throttler))
, proxy_resolver(std::move(settings.proxy_resolver))
, impl(std::make_unique<S3RequestSettingsImpl>(std::move(*settings.impl)))
{
}
S3RequestSettings::S3RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix,
const std::string & setting_name_prefix,
bool validate_settings)
: S3RequestSettings()
{
for (auto & field : impl->allMutable())
{
auto path = fmt::format("{}.{}{}", config_prefix, setting_name_prefix, field.getName());
bool updated = setValueFromConfig(config, path, field);
if (!updated)
{
auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && settings.isChanged(setting_name))
field.setValue(settings.get(setting_name));
}
}
finishInit(settings, validate_settings);
}
S3RequestSettings::S3RequestSettings(const NamedCollection & collection, const DB::Settings & settings, bool validate_settings)
: S3RequestSettings()
{
auto values = impl->allMutable();
for (auto & field : values)
{
const auto path = field.getName();
if (collection.has(path))
{
auto which = field.getValue().getType();
if (isInt64OrUInt64FieldType(which))
field.setValue(collection.get<UInt64>(path));
else if (which == Field::Types::String)
field.setValue(collection.get<String>(path));
else if (which == Field::Types::Bool)
field.setValue(collection.get<bool>(path));
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
}
}
finishInit(settings, validate_settings);
}
S3RequestSettings::~S3RequestSettings() = default;
S3REQUEST_SETTINGS_SUPPORTED_TYPES(S3RequestSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
S3RequestSettings & S3RequestSettings::operator=(S3RequestSettings && settings) noexcept
{
get_request_throttler = std::move(settings.get_request_throttler);
put_request_throttler = std::move(settings.put_request_throttler);
proxy_resolver = std::move(settings.proxy_resolver);
*impl = std::move(*settings.impl);
return *this;
}
void S3RequestSettings::updateFromSettings(const DB::Settings & settings, bool if_changed, bool validate_settings)
{
for (auto & field : impl->allMutable())
{
const auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
{
impl->set(field.getName(), settings.get(setting_name));
}
}
normalizeSettings();
if (validate_settings)
validateUploadSettings();
}
void S3RequestSettings::updateIfChanged(const S3RequestSettings & settings)
{
for (auto & setting : settings.impl->all())
{
if (setting.isValueChanged())
impl->set(setting.getName(), setting.getValue());
}
}
void S3RequestSettings::validateUploadSettings()
{
static constexpr size_t min_upload_part_size_limit = 5 * 1024 * 1024;
if (impl->strict_upload_part_size && impl->strict_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting strict_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(impl->strict_upload_part_size), ReadableSize(min_upload_part_size_limit));
if (impl->min_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting min_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(impl->min_upload_part_size), ReadableSize(min_upload_part_size_limit));
static constexpr size_t max_upload_part_size_limit = 5ull * 1024 * 1024 * 1024;
if (impl->max_upload_part_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size has invalid value {} which is greater than the s3 API limit {}",
ReadableSize(impl->max_upload_part_size), ReadableSize(max_upload_part_size_limit));
if (impl->max_single_part_upload_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_part_upload_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(impl->max_single_part_upload_size), ReadableSize(max_upload_part_size_limit));
if (impl->max_single_operation_copy_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_operation_copy_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(impl->max_single_operation_copy_size), ReadableSize(max_upload_part_size_limit));
if (impl->max_upload_part_size < impl->min_upload_part_size)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size ({}) can't be less than setting min_upload_part_size {}",
ReadableSize(impl->max_upload_part_size), ReadableSize(impl->min_upload_part_size));
if (!impl->upload_part_size_multiply_factor)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor cannot be zero");
if (!impl->upload_part_size_multiply_parts_count_threshold)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_parts_count_threshold cannot be zero");
if (!impl->max_part_number)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number cannot be zero");
static constexpr size_t max_part_number_limit = 10000;
if (impl->max_part_number > max_part_number_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(impl->max_part_number), ReadableSize(max_part_number_limit));
size_t maybe_overflow;
if (common::mulOverflow(impl->max_upload_part_size.value, impl->upload_part_size_multiply_factor.value, maybe_overflow))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor is too big ({}). "
"Multiplication to max_upload_part_size ({}) will cause integer overflow",
ReadableSize(impl->max_part_number), ReadableSize(max_part_number_limit));
std::unordered_set<String> storage_class_names {"STANDARD", "INTELLIGENT_TIERING"};
if (!impl->storage_class_name.value.empty() && !storage_class_names.contains(impl->storage_class_name))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting storage_class has invalid value {} which only supports STANDARD and INTELLIGENT_TIERING",
impl->storage_class_name.value);
/// TODO: it's possible to set too small limits.
/// We can check that max possible object size is not too small.
}
void S3RequestSettings::finishInit(const DB::Settings & settings, bool validate_settings)
{
normalizeSettings();
if (validate_settings)
validateUploadSettings();
/// NOTE: it would be better to reuse old throttlers
/// to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time.
/// But it is good enough unless very high `burst` values are used.
if (UInt64 max_get_rps = impl->isChanged("max_get_rps") ? impl->get("max_get_rps").safeGet<UInt64>() : settings[Setting::s3_max_get_rps])
{
size_t default_max_get_burst
= settings[Setting::s3_max_get_burst] ? settings[Setting::s3_max_get_burst] : (Throttler::default_burst_seconds * max_get_rps);
size_t max_get_burst = impl->isChanged("max_get_burst") ? impl->get("max_get_burst").safeGet<UInt64>() : default_max_get_burst;
get_request_throttler = std::make_shared<Throttler>(max_get_rps, max_get_burst);
}
if (UInt64 max_put_rps = impl->isChanged("max_put_rps") ? impl->get("max_put_rps").safeGet<UInt64>() : settings[Setting::s3_max_put_rps])
{
size_t default_max_put_burst
= settings[Setting::s3_max_put_burst] ? settings[Setting::s3_max_put_burst] : (Throttler::default_burst_seconds * max_put_rps);
size_t max_put_burst = impl->isChanged("max_put_burst") ? impl->get("max_put_burst").safeGet<UInt64>() : default_max_put_burst;
put_request_throttler = std::make_shared<Throttler>(max_put_rps, max_put_burst);
}
}
void S3RequestSettings::normalizeSettings()
{
if (!impl->storage_class_name.value.empty() && impl->storage_class_name.changed)
impl->storage_class_name = Poco::toUpperInPlace(impl->storage_class_name.value);
}
}
}

View File

@ -0,0 +1,83 @@
#pragma once
#include <Core/BaseSettingsFwdMacros.h>
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>
namespace Poco::Util
{
class AbstractConfiguration;
};
namespace DB
{
class NamedCollection;
struct ProxyConfigurationResolver;
struct S3RequestSettingsImpl;
struct Settings;
/// List of available types supported in MaterializedMySQLSettings object
#define S3REQUEST_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
M(CLASS_NAME, Bool) \
M(CLASS_NAME, UInt64) \
M(CLASS_NAME, String)
S3REQUEST_SETTINGS_SUPPORTED_TYPES(S3RequestSettings, DECLARE_SETTING_TRAIT)
namespace S3
{
/// We use s3 settings for DiskS3, StorageS3 (StorageS3Cluster, S3Queue, etc), BackupIO_S3, etc.
/// 1. For DiskS3 we usually have configuration in disk section in configuration file.
/// REQUEST_SETTINGS, PART_UPLOAD_SETTINGS start with "s3_" prefix there, while AUTH_SETTINGS and CLIENT_SETTINGS do not
/// (does not make sense, but it happened this way).
/// If some setting is absent from disk configuration, we look up for it in the "s3." server config section,
/// where s3 settings no longer have "s3_" prefix like in disk configuration section.
/// If the settings is absent there as well, we look up for it in Users config (where query/session settings are also updated).
/// 2. For StorageS3 and similar - we look up to "s3." config section (again - settings there do not have "s3_" prefix).
/// If some setting is absent from there, we look up for it in Users config.
struct S3RequestSettings
{
S3RequestSettings();
S3RequestSettings(const S3RequestSettings & settings);
S3RequestSettings(S3RequestSettings && settings) noexcept;
/// Create request settings from Config.
S3RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix,
const std::string & setting_name_prefix = "",
bool validate_settings = true);
/// Create request settings from DB::Settings.
explicit S3RequestSettings(const DB::Settings & settings, bool validate_settings = true);
/// Create request settings from NamedCollection.
S3RequestSettings(const NamedCollection & collection, const DB::Settings & settings, bool validate_settings = true);
~S3RequestSettings();
S3RequestSettings & operator=(S3RequestSettings && settings) noexcept;
S3REQUEST_SETTINGS_SUPPORTED_TYPES(S3RequestSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
void updateFromSettings(const DB::Settings & settings, bool if_changed, bool validate_settings = true);
void updateIfChanged(const S3RequestSettings & settings);
void validateUploadSettings();
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
std::shared_ptr<ProxyConfigurationResolver> proxy_resolver;
private:
void finishInit(const DB::Settings & settings, bool validate_settings);
void normalizeSettings();
std::unique_ptr<S3RequestSettingsImpl> impl;
};
}
}

View File

@ -19,8 +19,8 @@ void S3Settings::loadFromConfig(
const std::string & config_prefix,
const DB::Settings & settings)
{
auth_settings = S3::AuthSettings(config, settings, config_prefix);
request_settings = S3::RequestSettings(config, settings, config_prefix);
auth_settings = S3::S3AuthSettings(config, settings, config_prefix);
request_settings = S3::S3RequestSettings(config, settings, config_prefix);
}
void S3Settings::updateIfChanged(const S3Settings & settings)
@ -41,8 +41,8 @@ void S3SettingsByEndpoint::loadFromConfig(
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
auto default_auth_settings = S3::AuthSettings(config, settings, config_prefix);
auto default_request_settings = S3::RequestSettings(config, settings, config_prefix);
auto default_auth_settings = S3::S3AuthSettings(config, settings, config_prefix);
auto default_request_settings = S3::S3RequestSettings(config, settings, config_prefix);
for (const String & key : config_keys)
{
@ -51,10 +51,10 @@ void S3SettingsByEndpoint::loadFromConfig(
if (config.has(endpoint_path))
{
auto auth_settings{default_auth_settings};
auth_settings.updateIfChanged(S3::AuthSettings(config, settings, key_path));
auth_settings.updateIfChanged(S3::S3AuthSettings(config, settings, key_path));
auto request_settings{default_request_settings};
request_settings.updateIfChanged(S3::RequestSettings(config, settings, key_path, "", settings[Setting::s3_validate_request_settings]));
request_settings.updateIfChanged(S3::S3RequestSettings(config, settings, key_path, "", settings[Setting::s3_validate_request_settings]));
s3_settings.emplace(
config.getString(endpoint_path),

View File

@ -8,7 +8,9 @@
#include <Common/Throttler_fwd.h>
#include <IO/S3Common.h>
#include <IO/S3AuthSettings.h>
#include <IO/S3Defines.h>
#include <IO/S3RequestSettings.h>
namespace Poco::Util { class AbstractConfiguration; }
@ -19,8 +21,8 @@ struct Settings;
struct S3Settings
{
S3::AuthSettings auth_settings;
S3::RequestSettings request_settings;
S3::S3AuthSettings auth_settings;
S3::S3RequestSettings request_settings;
void loadFromConfig(
const Poco::Util::AbstractConfiguration & config,

View File

@ -46,6 +46,21 @@ namespace ProfileEvents
namespace DB
{
namespace S3RequestSetting
{
extern const S3RequestSettingsBool check_objects_after_upload;
extern const S3RequestSettingsUInt64 max_inflight_parts_for_one_file;
extern const S3RequestSettingsUInt64 max_part_number;
extern const S3RequestSettingsUInt64 max_single_part_upload_size;
extern const S3RequestSettingsUInt64 max_unexpected_write_error_retries;
extern const S3RequestSettingsUInt64 max_upload_part_size;
extern const S3RequestSettingsUInt64 min_upload_part_size;
extern const S3RequestSettingsString storage_class_name;
extern const S3RequestSettingsUInt64 strict_upload_part_size;
extern const S3RequestSettingsUInt64 upload_part_size_multiply_factor;
extern const S3RequestSettingsUInt64 upload_part_size_multiply_parts_count_threshold;
}
namespace ErrorCodes
{
extern const int S3_ERROR;
@ -71,15 +86,15 @@ struct WriteBufferFromS3::PartData
}
};
BufferAllocationPolicyPtr createBufferAllocationPolicy(const S3::RequestSettings & settings)
BufferAllocationPolicyPtr createBufferAllocationPolicy(const S3::S3RequestSettings & settings)
{
BufferAllocationPolicy::Settings allocation_settings;
allocation_settings.strict_size = settings.strict_upload_part_size;
allocation_settings.min_size = settings.min_upload_part_size;
allocation_settings.max_size = settings.max_upload_part_size;
allocation_settings.multiply_factor = settings.upload_part_size_multiply_factor;
allocation_settings.multiply_parts_count_threshold = settings.upload_part_size_multiply_parts_count_threshold;
allocation_settings.max_single_size = settings.max_single_part_upload_size;
allocation_settings.strict_size = settings[S3RequestSetting::strict_upload_part_size];
allocation_settings.min_size = settings[S3RequestSetting::min_upload_part_size];
allocation_settings.max_size = settings[S3RequestSetting::max_upload_part_size];
allocation_settings.multiply_factor = settings[S3RequestSetting::upload_part_size_multiply_factor];
allocation_settings.multiply_parts_count_threshold = settings[S3RequestSetting::upload_part_size_multiply_parts_count_threshold];
allocation_settings.max_single_size = settings[S3RequestSetting::max_single_part_upload_size];
return BufferAllocationPolicy::create(allocation_settings);
}
@ -90,7 +105,7 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
@ -106,7 +121,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, task_tracker(
std::make_unique<TaskTracker>(
std::move(schedule_),
request_settings.max_inflight_parts_for_one_file,
request_settings[S3RequestSetting::max_inflight_parts_for_one_file],
limited_log))
, blob_log(std::move(blob_log_))
{
@ -163,7 +178,7 @@ void WriteBufferFromS3::preFinalize()
if (multipart_upload_id.empty() && detached_part_data.size() <= 1)
{
if (detached_part_data.empty() || detached_part_data.front().data_size <= request_settings.max_single_part_upload_size)
if (detached_part_data.empty() || detached_part_data.front().data_size <= request_settings[S3RequestSetting::max_single_part_upload_size])
do_single_part_upload = true;
}
@ -210,7 +225,7 @@ void WriteBufferFromS3::finalizeImpl()
multipart_upload_finished = true;
}
if (request_settings.check_objects_after_upload)
if (request_settings[S3RequestSetting::check_objects_after_upload])
{
S3::checkObjectExists(*client_ptr, bucket, key, {}, "Immediately after upload");
@ -518,18 +533,18 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
"Unable to write a part without multipart_upload_id, details: WriteBufferFromS3 created for bucket {}, key {}",
bucket, key);
if (part_number > request_settings.max_part_number)
if (part_number > request_settings[S3RequestSetting::max_part_number])
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}",
request_settings.max_part_number, count(), request_settings.min_upload_part_size, request_settings.max_upload_part_size,
request_settings.upload_part_size_multiply_factor, request_settings.upload_part_size_multiply_parts_count_threshold,
request_settings.max_single_part_upload_size);
request_settings[S3RequestSetting::max_part_number], count(), request_settings[S3RequestSetting::min_upload_part_size], request_settings[S3RequestSetting::max_upload_part_size],
request_settings[S3RequestSetting::upload_part_size_multiply_factor], request_settings[S3RequestSetting::upload_part_size_multiply_parts_count_threshold],
request_settings[S3RequestSetting::max_single_part_upload_size]);
}
if (data.data_size > request_settings.max_upload_part_size)
if (data.data_size > request_settings[S3RequestSetting::max_upload_part_size])
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -537,7 +552,7 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
getShortLogDetails(),
part_number,
data.data_size,
request_settings.max_upload_part_size
request_settings[S3RequestSetting::max_upload_part_size]
);
}
@ -622,7 +637,7 @@ void WriteBufferFromS3::completeMultipartUpload()
req.SetMultipartUpload(multipart_upload);
size_t max_retry = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
size_t max_retry = std::max<UInt64>(request_settings[S3RequestSetting::max_unexpected_write_error_retries].value, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
@ -680,8 +695,8 @@ S3::PutObjectRequest WriteBufferFromS3::getPutRequest(PartData & data)
req.SetBody(data.createAwsBuffer());
if (object_metadata.has_value())
req.SetMetadata(object_metadata.value());
if (!request_settings.storage_class_name.value.empty())
req.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(request_settings.storage_class_name));
if (!request_settings[S3RequestSetting::storage_class_name].value.empty())
req.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(request_settings[S3RequestSetting::storage_class_name]));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
req.SetContentType("binary/octet-stream");
@ -705,7 +720,7 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
auto & request = std::get<0>(*worker_data);
size_t content_length = request.GetContentLength();
size_t max_retry = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
size_t max_retry = std::max<UInt64>(request_settings[S3RequestSetting::max_unexpected_write_error_retries].value, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);

View File

@ -38,7 +38,7 @@ public:
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3::RequestSettings & request_settings_,
const S3::S3RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},
@ -79,7 +79,7 @@ private:
const String bucket;
const String key;
const S3::RequestSettings request_settings;
const S3::S3RequestSettings request_settings;
const WriteSettings write_settings;
const std::shared_ptr<const S3::Client> client_ptr;
const std::optional<std::map<String, String>> object_metadata;

View File

@ -557,7 +557,7 @@ public:
std::unique_ptr<WriteBufferFromS3> getWriteBuffer(String file_name = "file")
{
S3::RequestSettings request_settings;
S3::S3RequestSettings request_settings;
request_settings.updateFromSettings(settings, /* if_changed */true, /* validate_settings */false);
client->resetCounters();

View File

@ -6,8 +6,6 @@
#include <unordered_map>
#include <boost/functional/hash.hpp>
#include <IO/ReadSettings.h>
#include <Common/callOnce.h>
#include <Common/ThreadPool.h>
#include <Common/StatusFile.h>
@ -25,6 +23,7 @@
namespace DB
{
struct ReadSettings;
/// Track acquired space in cache during reservation
/// to make error messages when no space left more informative.

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/QueryLimit.h>
#include <IO/ReadSettings.h>
#include <Common/CurrentThread.h>
namespace DB

View File

@ -7,14 +7,13 @@
#include <mutex>
#include <set>
#include <Core/BackgroundSchedulePool.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Storages/Cache/IRemoteFileMetadata.h>
#include <Storages/Cache/RemoteCacheController.h>

View File

@ -1,5 +1,4 @@
#pragma once
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <IO/WriteBufferFromFileBase.h>
#include <base/types.h>
@ -16,7 +15,7 @@
namespace DB
{
struct ReadSettings;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;

View File

@ -2,6 +2,7 @@
#include <cstddef>
#include <Compression/CompressionFactory.h>
#include <Compression/ICompressionCodec.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>

View File

@ -1,9 +1,8 @@
#pragma once
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool_fwd.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
#include <Common/ThreadPool_fwd.h>
namespace DB
@ -11,6 +10,7 @@ namespace DB
struct MergeTreeIndexGranularityInfo;
using MarksPtr = MarkCache::MappedPtr;
struct ReadSettings;
class Threadpool;
/// Class that helps to get marks by indexes.

View File

@ -37,6 +37,16 @@ namespace Setting
extern const SettingsBool schema_inference_use_cache_for_s3;
}
namespace S3AuthSetting
{
extern const S3AuthSettingsString access_key_id;
extern const S3AuthSettingsUInt64 expiration_window_seconds;
extern const S3AuthSettingsBool no_sign_request;
extern const S3AuthSettingsString secret_access_key;
extern const S3AuthSettingsString session_token;
extern const S3AuthSettingsBool use_environment_credentials;
}
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
@ -162,19 +172,19 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
else
url = S3::URI(collection.get<String>("url"), settings[Setting::allow_archive_path_syntax]);
auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
auth_settings.use_environment_credentials = collection.getOrDefault<UInt64>("use_environment_credentials", 1);
auth_settings.no_sign_request = collection.getOrDefault<bool>("no_sign_request", false);
auth_settings.expiration_window_seconds = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
auth_settings[S3AuthSetting::access_key_id] = collection.getOrDefault<String>("access_key_id", "");
auth_settings[S3AuthSetting::secret_access_key] = collection.getOrDefault<String>("secret_access_key", "");
auth_settings[S3AuthSetting::use_environment_credentials] = collection.getOrDefault<UInt64>("use_environment_credentials", 1);
auth_settings[S3AuthSetting::no_sign_request] = collection.getOrDefault<bool>("no_sign_request", false);
auth_settings[S3AuthSetting::expiration_window_seconds] = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
format = collection.getOrDefault<String>("format", format);
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
request_settings = S3::RequestSettings(collection, settings, /* validate_settings */true);
request_settings = S3::S3RequestSettings(collection, settings, /* validate_settings */true);
static_configuration = !auth_settings.access_key_id.value.empty() || auth_settings.no_sign_request.changed;
static_configuration = !auth_settings[S3AuthSetting::access_key_id].value.empty() || auth_settings[S3AuthSetting::no_sign_request].changed;
keys = {url.key};
}
@ -367,19 +377,19 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
compression_method = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["compression_method"]], "compression_method");
if (engine_args_to_idx.contains("access_key_id"))
auth_settings.access_key_id = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["access_key_id"]], "access_key_id");
auth_settings[S3AuthSetting::access_key_id] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["access_key_id"]], "access_key_id");
if (engine_args_to_idx.contains("secret_access_key"))
auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["secret_access_key"]], "secret_access_key");
auth_settings[S3AuthSetting::secret_access_key] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["secret_access_key"]], "secret_access_key");
if (engine_args_to_idx.contains("session_token"))
auth_settings.session_token = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["session_token"]], "session_token");
auth_settings[S3AuthSetting::session_token] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["session_token"]], "session_token");
if (no_sign_request)
auth_settings.no_sign_request = no_sign_request;
auth_settings[S3AuthSetting::no_sign_request] = no_sign_request;
static_configuration = !auth_settings.access_key_id.value.empty() || auth_settings.no_sign_request.changed;
auth_settings.no_sign_request = no_sign_request;
static_configuration = !auth_settings[S3AuthSetting::access_key_id].value.empty() || auth_settings[S3AuthSetting::no_sign_request].changed;
auth_settings[S3AuthSetting::no_sign_request] = no_sign_request;
keys = {url.key};
}

View File

@ -99,8 +99,8 @@ private:
S3::URI url;
std::vector<String> keys;
S3::AuthSettings auth_settings;
S3::RequestSettings request_settings;
S3::S3AuthSettings auth_settings;
S3::S3RequestSettings request_settings;
HTTPHeaderEntries headers_from_ast; /// Headers from ast is a part of static configuration.
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.

View File

@ -9,6 +9,7 @@
#include <IO/Operators.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Compression/CompressedReadBufferFromFile.h>

View File

@ -39,6 +39,8 @@ ALL_DECLARATION_FILES="
$ROOT_PATH/src/Storages/ExecutableSettings.cpp
$ROOT_PATH/src/Storages/MySQL/MySQLSettings.cpp
$ROOT_PATH/src/Databases/MySQL/MaterializedMySQLSettings.cpp
$ROOT_PATH/src/IO/S3AuthSettings.cpp
$ROOT_PATH/src/IO/S3RequestSettings.cpp
"
# We create an initial file with the shape {setting_name} {ClassName}{Type} SettingsDeclaration
@ -50,7 +52,7 @@ function add_setting_declaration_file()
fi
filename=$(basename -- "$1")
filename="${filename%.*}"
grep "DECLARE(" "$1" | awk -vfilename="${filename}" '{print substr($2, 0, length($2) - 1) " " filename substr($1, 9, length($1) - 9) " SettingsDeclaration" }' | sort | uniq >> "${SETTINGS_FILE}"
grep "DECLARE(" "$1" | awk -vfilename="${filename}" '{print substr($2, 0, length($2) - 1) " " filename substr($1, 9, length($1) - 9) " SettingsDeclaration" }' >> "${SETTINGS_FILE}"
}
for settings_file in ${ALL_DECLARATION_FILES};