Merge pull request #64412 from ClickHouse/unite-storages3-and-disks3-settings

Refactor s3 settings (move settings parsing into single place)
This commit is contained in:
Kseniia Sumarokova 2024-06-14 12:35:47 +00:00 committed by GitHub
commit 321f62d257
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 902 additions and 817 deletions

View File

@ -154,8 +154,6 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ThreadPoolRemoteFSReader.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ThreadPoolReader.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Storages/StorageS3Settings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Daemon/BaseDaemon.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Daemon/SentryWriter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Daemon/GraphiteWriter.cpp

View File

@ -88,14 +88,10 @@ namespace
std::move(headers),
S3::CredentialsConfiguration
{
settings.auth_settings.use_environment_credentials.value_or(
context->getConfigRef().getBool("s3.use_environment_credentials", true)),
settings.auth_settings.use_insecure_imds_request.value_or(
context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
settings.auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
settings.auth_settings.no_sign_request.value_or(
context->getConfigRef().getBool("s3.no_sign_request", false)),
settings.auth_settings.use_environment_credentials,
settings.auth_settings.use_insecure_imds_request,
settings.auth_settings.expiration_window_seconds,
settings.auth_settings.no_sign_request
});
}
@ -131,12 +127,18 @@ BackupReaderS3::BackupReaderS3(
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString(), context_->getUserName(), /*ignore_user=*/is_internal_backup).value_or(S3Settings{}))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettingsIfChanged(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());
if (auto endpoint_settings = context_->getStorageS3Settings().getSettings(
s3_uri.uri.toString(), context_->getUserName(), /*ignore_user=*/is_internal_backup))
{
s3_settings.updateIfChanged(*endpoint_settings);
}
s3_settings.request_settings.updateFromSettings(context_->getSettingsRef(), /* if_changed */true);
s3_settings.request_settings.allow_native_copy = allow_s3_native_copy;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())
@ -223,13 +225,19 @@ BackupWriterS3::BackupWriterS3(
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString(), context_->getUserName(), /*ignore_user=*/is_internal_backup).value_or(S3Settings{}))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettingsIfChanged(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
request_settings.setStorageClassName(storage_class_name);
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());
if (auto endpoint_settings = context_->getStorageS3Settings().getSettings(
s3_uri.uri.toString(), context_->getUserName(), /*ignore_user=*/is_internal_backup))
{
s3_settings.updateIfChanged(*endpoint_settings);
}
s3_settings.request_settings.updateFromSettings(context_->getSettingsRef(), /* if_changed */true);
s3_settings.request_settings.allow_native_copy = allow_s3_native_copy;
s3_settings.request_settings.storage_class_name = storage_class_name;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())
{

View File

@ -7,7 +7,7 @@
#include <Common/Logger.h>
#include <Disks/DiskType.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Interpreters/Context_fwd.h>
#include <IO/S3/BlobStorageLogWriter.h>

View File

@ -18,6 +18,7 @@
#include <IO/S3/Client.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
#include <aws/core/auth/AWSCredentials.h>
@ -64,7 +65,8 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
return;
}
auto auth_settings = S3::AuthSettings::loadFromConfig(config_prefix, config);
const auto & settings = Context::getGlobalContextInstance()->getSettingsRef();
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
String endpoint = macros->expand(config.getString(config_prefix + ".endpoint"));
auto new_uri = S3::URI{endpoint};
@ -118,10 +120,10 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
std::move(headers),
S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials.value_or(true),
auth_settings.use_insecure_imds_request.value_or(false),
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
auth_settings.no_sign_request.value_or(false),
auth_settings.use_environment_credentials,
auth_settings.use_insecure_imds_request,
auth_settings.expiration_window_seconds,
auth_settings.no_sign_request,
},
credentials.GetSessionToken());
@ -154,7 +156,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
if (s3_client == nullptr)
return;
S3Settings::RequestSettings request_settings_1;
S3::RequestSettings request_settings_1;
const auto create_writer = [&](const auto & key)
{
@ -197,7 +199,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
lock_writer.finalize();
// We read back the written UUID, if it's the same we can upload the file
S3Settings::RequestSettings request_settings_2;
S3::RequestSettings request_settings_2;
request_settings_2.max_single_read_retries = 1;
ReadBufferFromS3 lock_reader
{

View File

@ -5,7 +5,7 @@
#include <Common/ThreadPool.h>
#include <Common/callOnce.h>
#include <Disks/IO/IOUringReader.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Disks/IO/getIOUringReader.h>
#include <Core/ServerSettings.h>
@ -146,7 +146,7 @@ struct ContextSharedPart : boost::noncopyable
mutable ThrottlerPtr local_read_throttler; /// A server-wide throttler for local IO reads
mutable ThrottlerPtr local_write_throttler; /// A server-wide throttler for local IO writes
std::optional<StorageS3Settings> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
std::optional<S3SettingsByEndpoint> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
@ -455,14 +455,14 @@ std::shared_ptr<zkutil::ZooKeeper> Context::getZooKeeper() const
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot connect to ZooKeeper from Keeper");
}
const StorageS3Settings & Context::getStorageS3Settings() const
const S3SettingsByEndpoint & Context::getStorageS3Settings() const
{
std::lock_guard lock(shared->mutex);
if (!shared->storage_s3_settings)
{
const auto & config = shared->config ? *shared->config : Poco::Util::Application::instance().config();
shared->storage_s3_settings.emplace().loadFromConfig("s3", config, getSettingsRef());
shared->storage_s3_settings.emplace().loadFromConfig(config, "s3", getSettingsRef());
}
return *shared->storage_s3_settings;

View File

@ -37,7 +37,7 @@ class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class BlobStorageLog;
class IOUringReader;
class StorageS3Settings;
class S3SettingsByEndpoint;
/// A small class which owns ContextShared.
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
@ -164,7 +164,7 @@ public:
zkutil::ZooKeeperPtr getZooKeeper() const;
const StorageS3Settings & getStorageS3Settings() const;
const S3SettingsByEndpoint & getStorageS3Settings() const;
const String & getUserName() const { static std::string user; return user; }

View File

@ -108,6 +108,7 @@ public:
public:
const String & getName() const;
Field getValue() const;
void setValue(const Field & value);
Field getDefaultValue() const;
String getValueString() const;
String getDefaultValueString() const;
@ -122,10 +123,10 @@ public:
private:
friend class BaseSettings;
const BaseSettings * settings;
BaseSettings * settings;
const typename Traits::Accessor * accessor;
size_t index;
std::conditional_t<Traits::allow_custom_settings, const CustomSettingMap::mapped_type*, boost::blank> custom_setting;
std::conditional_t<Traits::allow_custom_settings, CustomSettingMap::mapped_type*, boost::blank> custom_setting;
};
enum SkipFlags
@ -144,35 +145,50 @@ public:
Iterator & operator++();
Iterator operator++(int); /// NOLINT
const SettingFieldRef & operator *() const { return field_ref; }
SettingFieldRef & operator *() { return field_ref; }
bool operator ==(const Iterator & other) const;
bool operator !=(const Iterator & other) const { return !(*this == other); }
private:
friend class BaseSettings;
Iterator(const BaseSettings & settings_, const typename Traits::Accessor & accessor_, SkipFlags skip_flags_);
Iterator(BaseSettings & settings_, const typename Traits::Accessor & accessor_, SkipFlags skip_flags_);
void doSkip();
void setPointerToCustomSetting();
SettingFieldRef field_ref;
std::conditional_t<Traits::allow_custom_settings, CustomSettingMap::const_iterator, boost::blank> custom_settings_iterator;
std::conditional_t<Traits::allow_custom_settings, CustomSettingMap::iterator, boost::blank> custom_settings_iterator;
SkipFlags skip_flags;
};
class Range
{
public:
Range(const BaseSettings & settings_, SkipFlags skip_flags_) : settings(settings_), accessor(Traits::Accessor::instance()), skip_flags(skip_flags_) {}
Range(BaseSettings & settings_, SkipFlags skip_flags_) : settings(settings_), accessor(Traits::Accessor::instance()), skip_flags(skip_flags_) {}
Iterator begin() const { return Iterator(settings, accessor, skip_flags); }
Iterator end() const { return Iterator(settings, accessor, SKIP_ALL); }
private:
const BaseSettings & settings;
BaseSettings & settings;
const typename Traits::Accessor & accessor;
SkipFlags skip_flags;
};
Range all(SkipFlags skip_flags = SKIP_NONE) const { return Range{*this, skip_flags}; }
class MutableRange
{
public:
MutableRange(BaseSettings & settings_, SkipFlags skip_flags_) : settings(settings_), accessor(Traits::Accessor::instance()), skip_flags(skip_flags_) {}
Iterator begin() { return Iterator(settings, accessor, skip_flags); }
Iterator end() { return Iterator(settings, accessor, SKIP_ALL); }
private:
BaseSettings & settings;
const typename Traits::Accessor & accessor;
SkipFlags skip_flags;
};
Range all(SkipFlags skip_flags = SKIP_NONE) const { return Range{const_cast<BaseSettings<Traits> &>(*this), skip_flags}; }
MutableRange allMutable(SkipFlags skip_flags = SKIP_NONE) { return MutableRange{*this, skip_flags}; }
Range allChanged() const { return all(SKIP_UNCHANGED); }
Range allUnchanged() const { return all(SKIP_CHANGED); }
Range allBuiltin() const { return all(SKIP_CUSTOM); }
@ -608,7 +624,7 @@ const SettingFieldCustom * BaseSettings<TTraits>::tryGetCustomSetting(std::strin
}
template <typename TTraits>
BaseSettings<TTraits>::Iterator::Iterator(const BaseSettings & settings_, const typename Traits::Accessor & accessor_, SkipFlags skip_flags_)
BaseSettings<TTraits>::Iterator::Iterator(BaseSettings & settings_, const typename Traits::Accessor & accessor_, SkipFlags skip_flags_)
: skip_flags(skip_flags_)
{
field_ref.settings = &settings_;
@ -741,6 +757,18 @@ Field BaseSettings<TTraits>::SettingFieldRef::getValue() const
return accessor->getValue(*settings, index);
}
template <typename TTraits>
void BaseSettings<TTraits>::SettingFieldRef::setValue(const Field & value)
{
if constexpr (Traits::allow_custom_settings)
{
if (custom_setting)
custom_setting->second = value;
}
else
accessor->setValue(*settings, index, value);
}
template <typename TTraits>
Field BaseSettings<TTraits>::SettingFieldRef::getDefaultValue() const
{

View File

@ -5,6 +5,7 @@
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
#include <IO/ReadSettings.h>
#include <IO/S3Defines.h>
#include <base/unit.h>
@ -78,34 +79,36 @@ class IColumn;
M(UInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, s3_strict_upload_part_size, S3::DEFAULT_STRICT_UPLOAD_PART_SIZE, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_blocks_in_multipart_upload, 50000, "Maximum number of blocks in multipart upload for Azure.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_min_upload_part_size, S3::DEFAULT_MIN_UPLOAD_PART_SIZE, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, S3::DEFAULT_MAX_UPLOAD_PART_SIZE, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, S3::DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, S3::DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_part_number, S3::DEFAULT_MAX_PART_NUMBER, "Maximum part number number for s3 upload part.", 0) \
M(UInt64, s3_max_single_operation_copy_size, S3::DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "Maximum size for a single copy operation in s3", 0) \
M(UInt64, azure_upload_part_size_multiply_factor, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage.", 0) \
M(UInt64, azure_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, S3::DEFAULT_MAX_INFLIGHT_PARTS_FOR_ONE_FILE, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, azure_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, S3::DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_single_part_copy_size, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_single_read_retries, S3::DEFAULT_MAX_SINGLE_READ_TRIES, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single Azure blob storage read.", 0) \
M(UInt64, azure_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, S3::DEFAULT_MAX_UNEXPECTED_WRITE_ERROR_RETRIES, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, S3::DEFAULT_MAX_REDIRECTS, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, S3::DEFAULT_MAX_CONNECTIONS, "The maximum number of connections per server.", 0) \
M(UInt64, s3_max_get_rps, 0, "Limit on S3 GET request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_get_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_get_rps`", 0) \
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_use_adaptive_timeouts, true, "When adaptive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \
M(UInt64, s3_list_object_keys_size, S3::DEFAULT_LIST_OBJECT_KEYS_SIZE, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_use_adaptive_timeouts, S3::DEFAULT_USE_ADAPTIVE_TIMEOUTS, "When adaptive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
@ -122,10 +125,10 @@ class IColumn;
M(Bool, hdfs_ignore_file_doesnt_exist, false, "Return 0 rows when the requested files don't exist, instead of throwing an exception in HDFS table engine", 0) \
M(Bool, azure_ignore_file_doesnt_exist, false, "Return 0 rows when the requested files don't exist, instead of throwing an exception in AzureBlobStorage table engine", 0) \
M(Bool, s3_validate_request_settings, true, "Validate S3 request settings", 0) \
M(Bool, s3_disable_checksum, false, "Do not calculate a checksum when sending a file to S3. This speeds up writes by avoiding excessive processing passes on a file. It is mostly safe as the data of MergeTree tables is checksummed by ClickHouse anyway, and when S3 is accessed with HTTPS, the TLS layer already provides integrity while transferring through the network. While additional checksums on S3 give defense in depth.", 0) \
M(UInt64, s3_retry_attempts, 100, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 30000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(UInt64, s3_connect_timeout_ms, 1000, "Connection timeout for host from s3 disks.", 0) \
M(Bool, s3_disable_checksum, S3::DEFAULT_DISABLE_CHECKSUM, "Do not calculate a checksum when sending a file to S3. This speeds up writes by avoiding excessive processing passes on a file. It is mostly safe as the data of MergeTree tables is checksummed by ClickHouse anyway, and when S3 is accessed with HTTPS, the TLS layer already provides integrity while transferring through the network. While additional checksums on S3 give defense in depth.", 0) \
M(UInt64, s3_retry_attempts, S3::DEFAULT_RETRY_ATTEMPTS, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, S3::DEFAULT_REQUEST_TIMEOUT_MS, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(UInt64, s3_connect_timeout_ms, S3::DEFAULT_CONNECT_TIMEOUT_MS, "Connection timeout for host from s3 disks.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(Bool, s3queue_enable_logging_to_s3queue_log, false, "Enable writing to system.s3queue_log. The value can be overwritten per table with table settings", 0) \

View File

@ -96,6 +96,8 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"s3_max_part_number", 10000, 10000, "Maximum part number number for s3 upload part"},
{"s3_max_single_operation_copy_size", 32 * 1024 * 1024, 32 * 1024 * 1024, "Maximum size for a single copy operation in s3"},
{"input_format_parquet_max_block_size", 8192, DEFAULT_BLOCK_SIZE, "Increase block size for parquet reader."},
{"input_format_parquet_prefer_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Average block bytes output by parquet reader."},
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},

View File

@ -172,6 +172,14 @@ void checkS3Capabilities(
}
}
static std::string getEndpoint(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context)
{
return context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
}
void registerS3ObjectStorage(ObjectStorageFactory & factory)
{
static constexpr auto disk_type = "s3";
@ -185,8 +193,9 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
{
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings, true);
auto endpoint = getEndpoint(config, config_prefix, context);
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */true);
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
auto key_generator = getKeyGenerator(uri, config, config_prefix);
auto object_storage = createObjectStorage<S3ObjectStorage>(
@ -221,8 +230,9 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings, true);
auto endpoint = getEndpoint(config, config_prefix, context);
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */true);
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
auto key_generator = getKeyGenerator(uri, config, config_prefix);
auto object_storage = std::make_shared<PlainObjectStorage<S3ObjectStorage>>(
@ -255,8 +265,9 @@ void registerS3PlainRewritableObjectStorage(ObjectStorageFactory & factory)
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings, true);
auto endpoint = getEndpoint(config, config_prefix, context);
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */true);
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
auto key_generator = getKeyGenerator(uri, config, config_prefix);
auto metadata_storage_metrics = DB::MetadataStorageMetrics::create<S3ObjectStorage, MetadataStorageType::PlainRewritable>();

View File

@ -168,7 +168,7 @@ private:
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto settings_ptr = s3_settings.get();
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings);
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {});
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@ -258,13 +258,15 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
S3Settings::RequestSettings request_settings = s3_settings.get()->request_settings;
S3::RequestSettings request_settings = s3_settings.get()->request_settings;
/// NOTE: For background operations settings are not propagated from session or query. They are taken from
/// default user's .xml config. It's obscure and unclear behavior. For them it's always better
/// to rely on settings from disk.
if (auto query_context = CurrentThread::getQueryContext(); query_context && !query_context->isBackgroundOperationContext())
if (auto query_context = CurrentThread::getQueryContext();
query_context && !query_context->isBackgroundOperationContext())
{
request_settings.updateFromSettingsIfChanged(query_context->getSettingsRef());
const auto & settings = query_context->getSettingsRef();
request_settings.updateFromSettings(settings, /* if_changed */true, settings.s3_validate_request_settings);
}
ThreadPoolCallbackRunnerUnsafe<void> scheduler;
@ -444,8 +446,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(
*client.get(), uri.bucket, path, {}, settings_ptr->request_settings,
/* with_metadata= */ true, /* throw_on_error= */ false);
*client.get(), uri.bucket, path, {}, /* with_metadata= */ true, /* throw_on_error= */ false);
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
return {};
@ -464,7 +465,7 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
S3::ObjectInfo object_info;
try
{
object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true);
object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, /* with_metadata= */ true);
}
catch (DB::Exception & e)
{
@ -493,7 +494,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
{
auto current_client = dest_s3->client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings);
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {});
auto scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "S3ObjStor_copy");
try
@ -537,7 +538,7 @@ void S3ObjectStorage::copyObject( // NOLINT
{
auto current_client = client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings);
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {});
auto scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(
@ -582,19 +583,22 @@ void S3ObjectStorage::applyNewSettings(
ContextPtr context,
const ApplyNewSettingsOptions & options)
{
auto settings_from_config = getSettings(config, config_prefix, context, context->getSettingsRef().s3_validate_request_settings);
auto settings_from_config = getSettings(config, config_prefix, context, uri.uri_str, context->getSettingsRef().s3_validate_request_settings);
auto modified_settings = std::make_unique<S3ObjectStorageSettings>(*s3_settings.get());
modified_settings->auth_settings.updateFrom(settings_from_config->auth_settings);
modified_settings->request_settings = settings_from_config->request_settings;
modified_settings->auth_settings.updateIfChanged(settings_from_config->auth_settings);
modified_settings->request_settings.updateIfChanged(settings_from_config->request_settings);
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(uri.uri.toString(), context->getUserName()))
modified_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
{
modified_settings->auth_settings.updateIfChanged(endpoint_settings->auth_settings);
modified_settings->request_settings.updateIfChanged(endpoint_settings->request_settings);
}
auto current_settings = s3_settings.get();
if (options.allow_client_change
&& (current_settings->auth_settings.hasUpdates(modified_settings->auth_settings) || for_disk_s3))
{
auto new_client = getClient(config, config_prefix, context, *modified_settings, for_disk_s3, &uri);
auto new_client = getClient(uri, *modified_settings, context, for_disk_s3);
client.set(std::move(new_client));
}
s3_settings.set(std::move(modified_settings));
@ -606,8 +610,9 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
const std::string & config_prefix,
ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings, true);
const auto & settings = context->getSettingsRef();
auto new_s3_settings = getSettings(config, config_prefix, context, uri.uri_str, settings.s3_validate_request_settings);
auto new_client = getClient(uri, *new_s3_settings, context, for_disk_s3);
auto new_uri{uri};
new_uri.bucket = new_namespace;

View File

@ -7,7 +7,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
#include <memory>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Common/MultiVersion.h>
#include <Common/ObjectStorageKeyGenerator.h>
@ -20,7 +20,7 @@ struct S3ObjectStorageSettings
S3ObjectStorageSettings() = default;
S3ObjectStorageSettings(
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
const S3::AuthSettings & auth_settings_,
uint64_t min_bytes_for_seek_,
int32_t list_object_keys_size_,
@ -34,7 +34,7 @@ struct S3ObjectStorageSettings
, read_only(read_only_)
{}
S3Settings::RequestSettings request_settings;
S3::RequestSettings request_settings;
S3::AuthSettings auth_settings;
uint64_t min_bytes_for_seek;

View File

@ -6,6 +6,7 @@
#include <Common/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/Throttler.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <IO/ReadHelpers.h>
@ -18,18 +19,12 @@
#include <IO/S3Common.h>
#include <IO/S3/Credentials.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/DiskLocal.h>
#include <Common/Macros.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
@ -39,11 +34,16 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const std::string & endpoint,
bool validate_settings)
{
const Settings & settings = context->getSettingsRef();
auto request_settings = S3Settings::RequestSettings(config, config_prefix, settings, "s3_", validate_settings);
auto auth_settings = S3::AuthSettings::loadFromConfig(config_prefix, config);
const auto & settings = context->getSettingsRef();
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
auto request_settings = S3::RequestSettings(config, settings, config_prefix, "s3_", validate_settings);
request_settings.proxy_resolver = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
ProxyConfiguration::protocolFromString(S3::URI(endpoint).uri.getScheme()), config_prefix, config);
return std::make_unique<S3ObjectStorageSettings>(
request_settings,
@ -55,38 +55,33 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
}
std::unique_ptr<S3::Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const std::string & endpoint,
const S3ObjectStorageSettings & settings,
bool for_disk_s3,
const S3::URI * url_)
ContextPtr context,
bool for_disk_s3)
{
auto url = S3::URI(endpoint);
if (!url.key.ends_with('/'))
url.key.push_back('/');
return getClient(url, settings, context, for_disk_s3);
}
std::unique_ptr<S3::Client> getClient(
const S3::URI & url,
const S3ObjectStorageSettings & settings,
ContextPtr context,
bool for_disk_s3)
{
const Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const Settings & local_settings = context->getSettingsRef();
const auto & auth_settings = settings.auth_settings;
const auto & request_settings = settings.request_settings;
S3::URI url;
if (for_disk_s3)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
url = S3::URI(endpoint);
if (!url.key.ends_with('/'))
url.key.push_back('/');
}
else
{
if (!url_)
throw Exception(ErrorCodes::LOGICAL_ERROR, "URL not passed");
url = *url_;
}
const bool is_s3_express_bucket = S3::isS3ExpressEndpoint(url.endpoint);
if (is_s3_express_bucket && !config.has(config_prefix + ".region"))
if (is_s3_express_bucket && auth_settings.region.value.empty())
{
throw Exception(
ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Region should be explicitly specified for directory buckets ({})", config_prefix);
ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Region should be explicitly specified for directory buckets");
}
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
@ -96,49 +91,40 @@ std::unique_ptr<S3::Client> getClient(
static_cast<int>(global_settings.s3_retry_attempts),
global_settings.enable_s3_requests_logging,
for_disk_s3,
settings.request_settings.get_request_throttler,
settings.request_settings.put_request_throttler,
request_settings.get_request_throttler,
request_settings.put_request_throttler,
url.uri.getScheme());
client_configuration.connectTimeoutMs = config.getUInt64(config_prefix + ".connect_timeout_ms", local_settings.s3_connect_timeout_ms.value);
client_configuration.requestTimeoutMs = config.getUInt64(config_prefix + ".request_timeout_ms", local_settings.s3_request_timeout_ms.value);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", static_cast<unsigned>(request_settings.max_connections));
client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", S3::DEFAULT_KEEP_ALIVE_TIMEOUT);
client_configuration.http_keep_alive_max_requests = config.getUInt(config_prefix + ".http_keep_alive_max_requests", S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS);
client_configuration.connectTimeoutMs = auth_settings.connect_timeout_ms;
client_configuration.requestTimeoutMs = auth_settings.request_timeout_ms;
client_configuration.maxConnections = static_cast<uint32_t>(auth_settings.max_connections);
client_configuration.http_keep_alive_timeout = auth_settings.http_keep_alive_timeout;
client_configuration.http_keep_alive_max_requests = auth_settings.http_keep_alive_max_requests;
client_configuration.endpointOverride = url.endpoint;
client_configuration.s3_use_adaptive_timeouts = config.getBool(
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
client_configuration.s3_use_adaptive_timeouts = auth_settings.use_adaptive_timeouts;
if (for_disk_s3)
if (request_settings.proxy_resolver)
{
/*
* Override proxy configuration for backwards compatibility with old configuration format.
* */
if (auto proxy_config = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
ProxyConfiguration::protocolFromString(url.uri.getScheme()), config_prefix, config))
{
client_configuration.per_request_configuration
= [proxy_config]() { return proxy_config->resolve(); };
client_configuration.error_report
= [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
}
client_configuration.per_request_configuration = [=]() { return request_settings.proxy_resolver->resolve(); };
client_configuration.error_report = [=](const auto & request_config) { request_settings.proxy_resolver->errorReport(request_config); };
}
S3::ServerSideEncryptionKMSConfig sse_kms_config = S3::getSSEKMSConfig(config_prefix, config);
S3::ClientSettings client_settings{
.use_virtual_addressing = url.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = config.getBool("s3.gcs_issue_compose_request", false),
.is_s3express_bucket = is_s3_express_bucket,
.disable_checksum = auth_settings.disable_checksum,
.gcs_issue_compose_request = auth_settings.gcs_issue_compose_request,
};
auto credentials_configuration = S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
auth_settings.use_environment_credentials,
auth_settings.use_insecure_imds_request,
auth_settings.expiration_window_seconds,
auth_settings.no_sign_request,
};
return S3::ClientFactory::instance().create(
@ -147,7 +133,7 @@ std::unique_ptr<S3::Client> getClient(
auth_settings.access_key_id,
auth_settings.secret_access_key,
auth_settings.server_side_encryption_customer_key_base64,
std::move(sse_kms_config),
auth_settings.server_side_encryption_kms_config,
auth_settings.headers,
credentials_configuration,
auth_settings.session_token);

View File

@ -18,15 +18,20 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
bool validate_settings = true);
const std::string & endpoint,
bool validate_settings);
std::unique_ptr<S3::Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const std::string & endpoint,
const S3ObjectStorageSettings & settings,
bool for_disk_s3,
const S3::URI * url_ = nullptr);
ContextPtr context,
bool for_disk_s3);
std::unique_ptr<S3::Client> getClient(
const S3::URI & url_,
const S3ObjectStorageSettings & settings,
ContextPtr context,
bool for_disk_s3);
}

View File

@ -47,7 +47,7 @@ namespace
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
auto disk = DiskFactory::instance().create(
disk_name, *config, "", context, disks_map, /* attach */attach, /* custom_disk */true);
disk_name, *config, /* config_path */"", context, disks_map, /* attach */attach, /* custom_disk */true);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom();
return disk;

View File

@ -51,7 +51,7 @@ ReadBufferFromS3::ReadBufferFromS3(
const String & bucket_,
const String & key_,
const String & version_id_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t offset_,
@ -318,7 +318,7 @@ size_t ReadBufferFromS3::getFileSize()
if (file_size)
return *file_size;
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings);
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id);
file_size = object_size;
return *file_size;

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include "config.h"
#if USE_AWS_S3
@ -28,7 +28,7 @@ private:
String bucket;
String key;
String version_id;
const S3Settings::RequestSettings request_settings;
const S3::RequestSettings request_settings;
/// These variables are atomic because they can be used for `logging only`
/// (where it is not important to get consistent result)
@ -47,7 +47,7 @@ public:
const String & bucket_,
const String & key_,
const String & version_id_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
const ReadSettings & settings_,
bool use_external_buffer = false,
size_t offset_ = 0,

View File

@ -13,18 +13,12 @@
# include <aws/core/auth/bearer-token-provider/SSOBearerTokenProvider.h>
# include <IO/S3/PocoHTTPClient.h>
# include <IO/S3Defines.h>
namespace DB::S3
{
inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000;
inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000;
inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 100;
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_TIMEOUT = 5;
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_MAX_REQUESTS = 100;
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";

View File

@ -56,7 +56,7 @@ namespace
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
bool for_disk_s3_,
@ -66,7 +66,6 @@ namespace
, dest_bucket(dest_bucket_)
, dest_key(dest_key_)
, request_settings(request_settings_)
, upload_settings(request_settings.getUploadSettings())
, object_metadata(object_metadata_)
, schedule(schedule_)
, for_disk_s3(for_disk_s3_)
@ -81,8 +80,7 @@ namespace
std::shared_ptr<const S3::Client> client_ptr;
const String & dest_bucket;
const String & dest_key;
const S3Settings::RequestSettings & request_settings;
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const S3::RequestSettings & request_settings;
const std::optional<std::map<String, String>> & object_metadata;
ThreadPoolCallbackRunnerUnsafe<void> schedule;
bool for_disk_s3;
@ -127,8 +125,8 @@ namespace
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
const auto & storage_class_name = upload_settings.storage_class_name;
if (!storage_class_name.empty())
const auto & storage_class_name = request_settings.storage_class_name;
if (!storage_class_name.value.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
client_ptr->setKMSHeaders(request);
@ -187,7 +185,7 @@ namespace
request.SetMultipartUpload(multipart_upload);
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retries = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
for (size_t retries = 1;; ++retries)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
@ -241,7 +239,7 @@ namespace
void checkObjectAfterUpload()
{
LOG_TRACE(log, "Checking object {} exists after upload", dest_key);
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, request_settings, "Immediately after upload");
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, "Immediately after upload");
LOG_TRACE(log, "Object {} exists after upload", dest_key);
}
@ -292,9 +290,9 @@ namespace
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
auto max_part_number = upload_settings.max_part_number;
auto min_upload_part_size = upload_settings.min_upload_part_size;
auto max_upload_part_size = upload_settings.max_upload_part_size;
auto max_part_number = request_settings.max_part_number;
auto min_upload_part_size = request_settings.min_upload_part_size;
auto max_upload_part_size = request_settings.max_upload_part_size;
if (!max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_part_number must not be 0");
@ -467,7 +465,7 @@ namespace
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
bool for_disk_s3_,
@ -481,7 +479,7 @@ namespace
void performCopy()
{
if (size <= upload_settings.max_single_part_upload_size)
if (size <= request_settings.max_single_part_upload_size)
performSinglepartUpload();
else
performMultipartUpload();
@ -514,8 +512,8 @@ namespace
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
const auto & storage_class_name = upload_settings.storage_class_name;
if (!storage_class_name.empty())
const auto & storage_class_name = request_settings.storage_class_name;
if (!storage_class_name.value.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
@ -526,7 +524,7 @@ namespace
void processPutRequest(S3::PutObjectRequest & request)
{
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retries = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
for (size_t retries = 1;; ++retries)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);
@ -649,7 +647,7 @@ namespace
size_t src_size_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
const ReadSettings & read_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
@ -679,7 +677,7 @@ namespace
void performCopy()
{
LOG_TEST(log, "Copy object {} to {} using native copy", src_key, dest_key);
if (!supports_multipart_copy || size <= upload_settings.max_single_operation_copy_size)
if (!supports_multipart_copy || size <= request_settings.max_single_operation_copy_size)
performSingleOperationCopy();
else
performMultipartUploadCopy();
@ -716,8 +714,8 @@ namespace
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
}
const auto & storage_class_name = upload_settings.storage_class_name;
if (!storage_class_name.empty())
const auto & storage_class_name = request_settings.storage_class_name;
if (!storage_class_name.value.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
@ -728,7 +726,7 @@ namespace
void processCopyRequest(S3::CopyObjectRequest & request)
{
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retries = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
for (size_t retries = 1;; ++retries)
{
ProfileEvents::increment(ProfileEvents::S3CopyObject);
@ -852,7 +850,7 @@ void copyDataToS3File(
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const S3::RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunnerUnsafe<void> schedule,
@ -883,7 +881,7 @@ void copyS3File(
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const S3::RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,

View File

@ -4,7 +4,7 @@
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Common/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <base/types.h>
@ -39,7 +39,7 @@ void copyS3File(
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const S3::RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
@ -58,7 +58,7 @@ void copyDataToS3File(
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const S3::RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},

View File

@ -44,7 +44,7 @@ namespace
/// Performs a request to get the size and last modification time of an object.
std::pair<std::optional<ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
const S3::Client & client, const String & bucket, const String & key, const String & version_id,
const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata)
bool with_metadata)
{
auto outcome = headObject(client, bucket, key, version_id);
if (!outcome.IsSuccess())
@ -73,11 +73,10 @@ ObjectInfo getObjectInfo(
const String & bucket,
const String & key,
const String & version_id,
const S3Settings::RequestSettings & request_settings,
bool with_metadata,
bool throw_on_error)
{
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata);
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, with_metadata);
if (object_info)
{
return *object_info;
@ -96,20 +95,18 @@ size_t getObjectSize(
const String & bucket,
const String & key,
const String & version_id,
const S3Settings::RequestSettings & request_settings,
bool throw_on_error)
{
return getObjectInfo(client, bucket, key, version_id, request_settings, {}, throw_on_error).size;
return getObjectInfo(client, bucket, key, version_id, {}, throw_on_error).size;
}
bool objectExists(
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id,
const S3Settings::RequestSettings & request_settings)
const String & version_id)
{
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {});
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, {});
if (object_info)
return true;
@ -126,10 +123,9 @@ void checkObjectExists(
const String & bucket,
const String & key,
const String & version_id,
const S3Settings::RequestSettings & request_settings,
std::string_view description)
{
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {});
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, {});
if (object_info)
return;
throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",

View File

@ -3,7 +3,7 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <base/types.h>
#include <IO/S3/Client.h>
@ -24,7 +24,6 @@ ObjectInfo getObjectInfo(
const String & bucket,
const String & key,
const String & version_id = {},
const S3Settings::RequestSettings & request_settings = {},
bool with_metadata = false,
bool throw_on_error = true);
@ -33,15 +32,13 @@ size_t getObjectSize(
const String & bucket,
const String & key,
const String & version_id = {},
const S3Settings::RequestSettings & request_settings = {},
bool throw_on_error = true);
bool objectExists(
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id = {},
const S3Settings::RequestSettings & request_settings = {});
const String & version_id = {});
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
void checkObjectExists(
@ -49,7 +46,6 @@ void checkObjectExists(
const String & bucket,
const String & key,
const String & version_id = {},
const S3Settings::RequestSettings & request_settings = {},
std::string_view description = {});
bool isNotFoundError(Aws::S3::S3Errors error);

View File

@ -25,7 +25,7 @@
#include <IO/S3Common.h>
#include <IO/S3/Client.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Poco/Util/ServerApplication.h>
#include "TestPocoHTTPServer.h"
@ -69,7 +69,7 @@ void doReadRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::U
UInt64 max_single_read_retries = 1;
DB::ReadSettings read_settings;
DB::S3Settings::RequestSettings request_settings;
DB::S3::RequestSettings request_settings;
request_settings.max_single_read_retries = max_single_read_retries;
DB::ReadBufferFromS3 read_buffer(
client,
@ -88,7 +88,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
{
UInt64 max_unexpected_write_error_retries = 1;
DB::S3Settings::RequestSettings request_settings;
DB::S3::RequestSettings request_settings;
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
DB::WriteBufferFromS3 write_buffer(
client,

View File

@ -2,17 +2,19 @@
#include <Common/Exception.h>
#include <Common/StringUtils.h>
#include <Common/formatReadable.h>
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "config.h"
#if USE_AWS_S3
# include <IO/HTTPHeaderEntries.h>
# include <IO/S3/Client.h>
# include <IO/S3/Requests.h>
# include <Common/quoteString.h>
# include <Common/logger_useful.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3/Client.h>
#include <IO/S3/Requests.h>
namespace ProfileEvents
@ -58,6 +60,8 @@ namespace DB
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int BAD_ARGUMENTS;
extern const int INVALID_SETTING_VALUE;
}
namespace S3
@ -98,104 +102,320 @@ ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, c
return sse_kms_config;
}
AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config)
template <typename Settings>
static bool setValueFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & path,
typename Settings::SettingFieldRef & field)
{
auto access_key_id = config.getString(config_elem + ".access_key_id", "");
auto secret_access_key = config.getString(config_elem + ".secret_access_key", "");
auto session_token = config.getString(config_elem + ".session_token", "");
if (!config.has(path))
return false;
auto region = config.getString(config_elem + ".region", "");
auto server_side_encryption_customer_key_base64 = config.getString(config_elem + ".server_side_encryption_customer_key_base64", "");
auto which = field.getValue().getType();
if (isInt64OrUInt64FieldType(which))
field.setValue(config.getUInt64(path));
else if (which == Field::Types::String)
field.setValue(config.getString(path));
else if (which == Field::Types::Bool)
field.setValue(config.getBool(path));
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
std::optional<bool> use_environment_credentials;
if (config.has(config_elem + ".use_environment_credentials"))
use_environment_credentials = config.getBool(config_elem + ".use_environment_credentials");
return true;
}
std::optional<bool> use_insecure_imds_request;
if (config.has(config_elem + ".use_insecure_imds_request"))
use_insecure_imds_request = config.getBool(config_elem + ".use_insecure_imds_request");
AuthSettings::AuthSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix)
{
for (auto & field : allMutable())
{
auto path = fmt::format("{}.{}", config_prefix, field.getName());
std::optional<uint64_t> expiration_window_seconds;
if (config.has(config_elem + ".expiration_window_seconds"))
expiration_window_seconds = config.getUInt64(config_elem + ".expiration_window_seconds");
bool updated = setValueFromConfig<AuthSettings>(config, path, field);
if (!updated)
{
auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && settings.isChanged(setting_name))
field.setValue(settings.get(setting_name));
}
}
std::optional<bool> no_sign_request;
if (config.has(config_elem + ".no_sign_request"))
no_sign_request = config.getBool(config_elem + ".no_sign_request");
headers = getHTTPHeaders(config_prefix, config);
server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
HTTPHeaderEntries headers = getHTTPHeaders(config_elem, config);
ServerSideEncryptionKMSConfig sse_kms_config = getSSEKMSConfig(config_elem, config);
std::unordered_set<std::string> users;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_elem, keys);
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (startsWith(key, "user"))
users.insert(config.getString(config_elem + "." + key));
users.insert(config.getString(config_prefix + "." + key));
}
return AuthSettings
{
std::move(access_key_id), std::move(secret_access_key), std::move(session_token),
std::move(region),
std::move(server_side_encryption_customer_key_base64),
std::move(sse_kms_config),
std::move(headers),
use_environment_credentials,
use_insecure_imds_request,
expiration_window_seconds,
no_sign_request,
std::move(users)
};
}
bool AuthSettings::canBeUsedByUser(const String & user) const
AuthSettings::AuthSettings(const DB::Settings & settings)
{
return users.empty() || users.contains(user);
updateFromSettings(settings, /* if_changed */false);
}
void AuthSettings::updateFromSettings(const DB::Settings & settings, bool if_changed)
{
for (auto & field : allMutable())
{
const auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
{
field.setValue(settings.get(setting_name));
}
}
}
bool AuthSettings::hasUpdates(const AuthSettings & other) const
{
AuthSettings copy = *this;
copy.updateFrom(other);
copy.updateIfChanged(other);
return *this != copy;
}
void AuthSettings::updateFrom(const AuthSettings & from)
void AuthSettings::updateIfChanged(const AuthSettings & settings)
{
/// Update with check for emptyness only parameters which
/// can be passed not only from config, but via ast.
for (auto & setting : settings.all())
{
if (setting.isValueChanged())
set(setting.getName(), setting.getValue());
}
if (!from.access_key_id.empty())
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
if (!from.session_token.empty())
session_token = from.session_token;
if (!settings.headers.empty())
headers = settings.headers;
if (!from.headers.empty())
headers = from.headers;
if (!from.region.empty())
region = from.region;
if (!settings.users.empty())
users.insert(settings.users.begin(), settings.users.end());
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
server_side_encryption_kms_config = from.server_side_encryption_kms_config;
if (from.use_environment_credentials.has_value())
use_environment_credentials = from.use_environment_credentials;
if (from.use_insecure_imds_request.has_value())
use_insecure_imds_request = from.use_insecure_imds_request;
if (from.expiration_window_seconds.has_value())
expiration_window_seconds = from.expiration_window_seconds;
if (from.no_sign_request.has_value())
no_sign_request = from.no_sign_request;
users.insert(from.users.begin(), from.users.end());
if (settings.server_side_encryption_kms_config.key_id.has_value()
|| settings.server_side_encryption_kms_config.encryption_context.has_value()
|| settings.server_side_encryption_kms_config.key_id.has_value())
server_side_encryption_kms_config = settings.server_side_encryption_kms_config;
}
RequestSettings::RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix,
const std::string & setting_name_prefix,
bool validate_settings)
{
for (auto & field : allMutable())
{
auto path = fmt::format("{}.{}{}", config_prefix, setting_name_prefix, field.getName());
bool updated = setValueFromConfig<RequestSettings>(config, path, field);
if (!updated)
{
auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && settings.isChanged(setting_name))
field.setValue(settings.get(setting_name));
}
}
finishInit(settings, validate_settings);
}
RequestSettings::RequestSettings(
const NamedCollection & collection,
const DB::Settings & settings,
bool validate_settings)
{
auto values = allMutable();
for (auto & field : values)
{
const auto path = field.getName();
if (collection.has(path))
{
auto which = field.getValue().getType();
if (isInt64OrUInt64FieldType(which))
field.setValue(collection.get<UInt64>(path));
else if (which == Field::Types::String)
field.setValue(collection.get<String>(path));
else if (which == Field::Types::Bool)
field.setValue(collection.get<bool>(path));
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
}
}
finishInit(settings, validate_settings);
}
RequestSettings::RequestSettings(const DB::Settings & settings, bool validate_settings)
{
updateFromSettings(settings, /* if_changed */false, validate_settings);
finishInit(settings, validate_settings);
}
void RequestSettings::updateFromSettings(
const DB::Settings & settings, bool if_changed, bool validate_settings)
{
for (auto & field : allMutable())
{
const auto setting_name = "s3_" + field.getName();
if (settings.has(setting_name) && (!if_changed || settings.isChanged(setting_name)))
{
set(field.getName(), settings.get(setting_name));
}
}
normalizeSettings();
if (validate_settings)
validateUploadSettings();
}
void RequestSettings::updateIfChanged(const RequestSettings & settings)
{
for (auto & setting : settings.all())
{
if (setting.isValueChanged())
set(setting.getName(), setting.getValue());
}
}
void RequestSettings::normalizeSettings()
{
if (!storage_class_name.value.empty() && storage_class_name.changed)
storage_class_name = Poco::toUpperInPlace(storage_class_name.value);
}
void RequestSettings::finishInit(const DB::Settings & settings, bool validate_settings)
{
normalizeSettings();
if (validate_settings)
validateUploadSettings();
/// NOTE: it would be better to reuse old throttlers
/// to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time.
/// But it is good enough unless very high `burst` values are used.
if (UInt64 max_get_rps = isChanged("max_get_rps") ? get("max_get_rps").get<UInt64>() : settings.s3_max_get_rps)
{
size_t default_max_get_burst = settings.s3_max_get_burst
? settings.s3_max_get_burst
: (Throttler::default_burst_seconds * max_get_rps);
size_t max_get_burst = isChanged("max_get_burts") ? get("max_get_burst").get<UInt64>() : default_max_get_burst;
get_request_throttler = std::make_shared<Throttler>(max_get_rps, max_get_burst);
}
if (UInt64 max_put_rps = isChanged("max_put_rps") ? get("max_put_rps").get<UInt64>() : settings.s3_max_put_rps)
{
size_t default_max_put_burst = settings.s3_max_put_burst
? settings.s3_max_put_burst
: (Throttler::default_burst_seconds * max_put_rps);
size_t max_put_burst = isChanged("max_put_burts") ? get("max_put_burst").get<UInt64>() : default_max_put_burst;
put_request_throttler = std::make_shared<Throttler>(max_put_rps, max_put_burst);
}
}
void RequestSettings::validateUploadSettings()
{
static constexpr size_t min_upload_part_size_limit = 5 * 1024 * 1024;
if (strict_upload_part_size && strict_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting strict_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(strict_upload_part_size), ReadableSize(min_upload_part_size_limit));
if (min_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting min_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(min_upload_part_size), ReadableSize(min_upload_part_size_limit));
static constexpr size_t max_upload_part_size_limit = 5ull * 1024 * 1024 * 1024;
if (max_upload_part_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size has invalid value {} which is greater than the s3 API limit {}",
ReadableSize(max_upload_part_size), ReadableSize(max_upload_part_size_limit));
if (max_single_part_upload_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_part_upload_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_single_part_upload_size), ReadableSize(max_upload_part_size_limit));
if (max_single_operation_copy_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_operation_copy_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_single_operation_copy_size), ReadableSize(max_upload_part_size_limit));
if (max_upload_part_size < min_upload_part_size)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size ({}) can't be less than setting min_upload_part_size {}",
ReadableSize(max_upload_part_size), ReadableSize(min_upload_part_size));
if (!upload_part_size_multiply_factor)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor cannot be zero");
if (!upload_part_size_multiply_parts_count_threshold)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_parts_count_threshold cannot be zero");
if (!max_part_number)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number cannot be zero");
static constexpr size_t max_part_number_limit = 10000;
if (max_part_number > max_part_number_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_part_number), ReadableSize(max_part_number_limit));
size_t maybe_overflow;
if (common::mulOverflow(max_upload_part_size.value, upload_part_size_multiply_factor.value, maybe_overflow))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor is too big ({}). "
"Multiplication to max_upload_part_size ({}) will cause integer overflow",
ReadableSize(max_part_number), ReadableSize(max_part_number_limit));
std::unordered_set<String> storage_class_names {"STANDARD", "INTELLIGENT_TIERING"};
if (!storage_class_name.value.empty() && !storage_class_names.contains(storage_class_name))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting storage_class has invalid value {} which only supports STANDARD and INTELLIGENT_TIERING",
storage_class_name.value);
/// TODO: it's possible to set too small limits.
/// We can check that max possible object size is not too small.
}
bool operator==(const AuthSettings & left, const AuthSettings & right)
{
if (left.headers != right.headers)
return false;
if (left.users != right.users)
return false;
if (left.server_side_encryption_kms_config != right.server_side_encryption_kms_config)
return false;
auto l = left.begin();
for (const auto & r : right)
{
if ((l == left.end()) || (*l != r))
return false;
++l;
}
return l == left.end();
}
}
IMPLEMENT_SETTINGS_TRAITS(S3::AuthSettingsTraits, CLIENT_SETTINGS_LIST)
IMPLEMENT_SETTINGS_TRAITS(S3::RequestSettingsTraits, REQUEST_SETTINGS_LIST)
}

View File

@ -3,22 +3,22 @@
#include <IO/S3/Client.h>
#include <IO/S3/PocoHTTPClient.h>
#include <IO/HTTPHeaderEntries.h>
#include <string>
#include <optional>
#include <IO/S3Defines.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
#include <Common/Throttler.h>
#include <Core/Settings.h>
#include <Core/BaseSettings.h>
#include <Interpreters/Context.h>
#include <unordered_set>
#include "config.h"
#if USE_AWS_S3
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
#include <IO/S3/URI.h>
#include <IO/S3/Credentials.h>
#include <aws/core/Aws.h>
#include <aws/s3/S3Errors.h>
@ -30,8 +30,6 @@ namespace ErrorCodes
extern const int S3_ERROR;
}
class RemoteHostFilter;
class S3Exception : public Exception
{
public:
@ -68,40 +66,140 @@ namespace Poco::Util
class AbstractConfiguration;
};
namespace DB::S3
namespace DB
{
class NamedCollection;
struct ProxyConfigurationResolver;
namespace S3
{
/// We use s3 settings for DiskS3, StorageS3 (StorageS3Cluster, S3Queue, etc), BackupIO_S3, etc.
/// 1. For DiskS3 we usually have configuration in disk section in configuration file.
/// REQUEST_SETTINGS, PART_UPLOAD_SETTINGS start with "s3_" prefix there, while AUTH_SETTINGS and CLIENT_SETTINGS do not
/// (does not make sense, but it happened this way).
/// If some setting is absent from disk configuration, we look up for it in the "s3." server config section,
/// where s3 settings no longer have "s3_" prefix like in disk configuration section.
/// If the settings is absent there as well, we look up for it in Users config (where query/session settings are also updated).
/// 2. For StorageS3 and similar - we look up to "s3." config section (again - settings there do not have "s3_" prefix).
/// If some setting is absent from there, we lool up for it in Users config.
#define AUTH_SETTINGS(M, ALIAS) \
M(String, access_key_id, "", "", 0) \
M(String, secret_access_key, "", "", 0) \
M(String, session_token, "", "", 0) \
M(String, region, "", "", 0) \
M(String, server_side_encryption_customer_key_base64, "", "", 0) \
#define CLIENT_SETTINGS(M, ALIAS) \
M(UInt64, connect_timeout_ms, DEFAULT_CONNECT_TIMEOUT_MS, "", 0) \
M(UInt64, request_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
M(UInt64, max_connections, DEFAULT_MAX_CONNECTIONS, "", 0) \
M(UInt64, http_keep_alive_timeout, DEFAULT_KEEP_ALIVE_TIMEOUT, "", 0) \
M(UInt64, http_keep_alive_max_requests, DEFAULT_KEEP_ALIVE_MAX_REQUESTS, "", 0) \
M(UInt64, expiration_window_seconds, DEFAULT_EXPIRATION_WINDOW_SECONDS, "", 0) \
M(Bool, use_environment_credentials, DEFAULT_USE_ENVIRONMENT_CREDENTIALS, "", 0) \
M(Bool, no_sign_request, DEFAULT_NO_SIGN_REQUEST, "", 0) \
M(Bool, use_insecure_imds_request, false, "", 0) \
M(Bool, use_adaptive_timeouts, DEFAULT_USE_ADAPTIVE_TIMEOUTS, "", 0) \
M(Bool, is_virtual_hosted_style, false, "", 0) \
M(Bool, disable_checksum, DEFAULT_DISABLE_CHECKSUM, "", 0) \
M(Bool, gcs_issue_compose_request, false, "", 0) \
#define REQUEST_SETTINGS(M, ALIAS) \
M(UInt64, max_single_read_retries, 4, "", 0) \
M(UInt64, request_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS, "", 0) \
M(UInt64, list_object_keys_size, DEFAULT_LIST_OBJECT_KEYS_SIZE, "", 0) \
M(Bool, allow_native_copy, DEFAULT_ALLOW_NATIVE_COPY, "", 0) \
M(Bool, check_objects_after_upload, DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD, "", 0) \
M(Bool, throw_on_zero_files_match, false, "", 0) \
M(UInt64, max_single_operation_copy_size, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE, "", 0) \
M(String, storage_class_name, "", "", 0) \
#define PART_UPLOAD_SETTINGS(M, ALIAS) \
M(UInt64, strict_upload_part_size, 0, "", 0) \
M(UInt64, min_upload_part_size, DEFAULT_MIN_UPLOAD_PART_SIZE, "", 0) \
M(UInt64, max_upload_part_size, DEFAULT_MAX_UPLOAD_PART_SIZE, "", 0) \
M(UInt64, upload_part_size_multiply_factor, DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR, "", 0) \
M(UInt64, upload_part_size_multiply_parts_count_threshold, DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD, "", 0) \
M(UInt64, max_inflight_parts_for_one_file, DEFAULT_MAX_INFLIGHT_PARTS_FOR_ONE_FILE, "", 0) \
M(UInt64, max_part_number, DEFAULT_MAX_PART_NUMBER, "", 0) \
M(UInt64, max_single_part_upload_size, DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE, "", 0) \
M(UInt64, max_unexpected_write_error_retries, 4, "", 0) \
#define CLIENT_SETTINGS_LIST(M, ALIAS) \
CLIENT_SETTINGS(M, ALIAS) \
AUTH_SETTINGS(M, ALIAS)
#define REQUEST_SETTINGS_LIST(M, ALIAS) \
REQUEST_SETTINGS(M, ALIAS) \
PART_UPLOAD_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(AuthSettingsTraits, CLIENT_SETTINGS_LIST)
DECLARE_SETTINGS_TRAITS(RequestSettingsTraits, REQUEST_SETTINGS_LIST)
struct AuthSettings : public BaseSettings<AuthSettingsTraits>
{
AuthSettings() = default;
AuthSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix);
explicit AuthSettings(const DB::Settings & settings);
explicit AuthSettings(const DB::NamedCollection & collection);
void updateFromSettings(const DB::Settings & settings, bool if_changed);
bool hasUpdates(const AuthSettings & other) const;
void updateIfChanged(const AuthSettings & settings);
bool canBeUsedByUser(const String & user) const { return users.empty() || users.contains(user); }
HTTPHeaderEntries headers;
std::unordered_set<std::string> users;
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
/// Note: if you add any field, do not forget to update operator ==.
};
bool operator==(const AuthSettings & left, const AuthSettings & right);
struct RequestSettings : public BaseSettings<RequestSettingsTraits>
{
RequestSettings() = default;
/// Create request settings from Config.
RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const DB::Settings & settings,
const std::string & config_prefix,
const std::string & setting_name_prefix = "",
bool validate_settings = true);
/// Create request settings from DB::Settings.
explicit RequestSettings(const DB::Settings & settings, bool validate_settings = true);
/// Create request settings from NamedCollection.
RequestSettings(
const NamedCollection & collection,
const DB::Settings & settings,
bool validate_settings = true);
void updateFromSettings(const DB::Settings & settings, bool if_changed, bool validate_settings = true);
void updateIfChanged(const RequestSettings & settings);
void validateUploadSettings();
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
std::shared_ptr<ProxyConfigurationResolver> proxy_resolver;
private:
void finishInit(const DB::Settings & settings, bool validate_settings);
void normalizeSettings();
};
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
struct AuthSettings
{
static AuthSettings loadFromConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
std::string access_key_id;
std::string secret_access_key;
std::string session_token;
std::string region;
std::string server_side_encryption_customer_key_base64;
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
HTTPHeaderEntries headers;
std::optional<bool> use_environment_credentials;
std::optional<bool> use_insecure_imds_request;
std::optional<uint64_t> expiration_window_seconds;
std::optional<bool> no_sign_request;
std::unordered_set<std::string> users;
bool hasUpdates(const AuthSettings & other) const;
void updateFrom(const AuthSettings & from);
bool canBeUsedByUser(const String & user) const;
private:
bool operator==(const AuthSettings & other) const = default;
};
}
}

41
src/IO/S3Defines.h Normal file
View File

@ -0,0 +1,41 @@
#pragma once
#include <Core/Types.h>
namespace DB::S3
{
/// Client settings.
inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000;
inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000;
inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 1024;
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_TIMEOUT = 5;
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_MAX_REQUESTS = 100;
inline static constexpr bool DEFAULT_USE_ENVIRONMENT_CREDENTIALS = true;
inline static constexpr bool DEFAULT_NO_SIGN_REQUEST = false;
inline static constexpr bool DEFAULT_DISABLE_CHECKSUM = false;
inline static constexpr bool DEFAULT_USE_ADAPTIVE_TIMEOUTS = true;
/// Upload settings.
inline static constexpr uint64_t DEFAULT_MIN_UPLOAD_PART_SIZE = 16 * 1024 * 1024;
inline static constexpr uint64_t DEFAULT_MAX_UPLOAD_PART_SIZE = 5ull * 1024 * 1024 * 1024;
inline static constexpr uint64_t DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE = 32 * 1024 * 1024;
inline static constexpr uint64_t DEFAULT_STRICT_UPLOAD_PART_SIZE = 0;
inline static constexpr uint64_t DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR = 2;
inline static constexpr uint64_t DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD = 500;
inline static constexpr uint64_t DEFAULT_MAX_PART_NUMBER = 10000;
/// Other settings.
inline static constexpr uint64_t DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 32 * 1024 * 1024;
inline static constexpr uint64_t DEFAULT_MAX_INFLIGHT_PARTS_FOR_ONE_FILE = 20;
inline static constexpr uint64_t DEFAULT_LIST_OBJECT_KEYS_SIZE = 1000;
inline static constexpr uint64_t DEFAULT_MAX_SINGLE_READ_TRIES = 4;
inline static constexpr uint64_t DEFAULT_MAX_UNEXPECTED_WRITE_ERROR_RETRIES = 4;
inline static constexpr uint64_t DEFAULT_MAX_REDIRECTS = 10;
inline static constexpr uint64_t DEFAULT_RETRY_ATTEMPTS = 100;
inline static constexpr bool DEFAULT_ALLOW_NATIVE_COPY = true;
inline static constexpr bool DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD = false;
}

80
src/IO/S3Settings.cpp Normal file
View File

@ -0,0 +1,80 @@
#include <IO/S3Settings.h>
#include <IO/S3Common.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
void S3Settings::loadFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DB::Settings & settings)
{
auth_settings = S3::AuthSettings(config, settings, config_prefix);
request_settings = S3::RequestSettings(config, settings, config_prefix);
}
void S3Settings::updateIfChanged(const S3Settings & settings)
{
auth_settings.updateIfChanged(settings.auth_settings);
request_settings.updateIfChanged(settings.request_settings);
}
void S3SettingsByEndpoint::loadFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DB::Settings & settings)
{
std::lock_guard lock(mutex);
s3_settings.clear();
if (!config.has(config_prefix))
return;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
auto default_auth_settings = S3::AuthSettings(config, settings, config_prefix);
auto default_request_settings = S3::RequestSettings(config, settings, config_prefix);
for (const String & key : config_keys)
{
const auto key_path = config_prefix + "." + key;
const auto endpoint_path = key_path + ".endpoint";
if (config.has(endpoint_path))
{
auto auth_settings{default_auth_settings};
auth_settings.updateIfChanged(S3::AuthSettings(config, settings, key_path));
auto request_settings{default_request_settings};
request_settings.updateIfChanged(S3::RequestSettings(config, settings, key_path, "", settings.s3_validate_request_settings));
s3_settings.emplace(
config.getString(endpoint_path),
S3Settings{std::move(auth_settings), std::move(request_settings)});
}
}
}
std::optional<S3Settings> S3SettingsByEndpoint::getSettings(
const String & endpoint,
const String & user,
bool ignore_user) const
{
std::lock_guard lock(mutex);
auto next_prefix_setting = s3_settings.upper_bound(endpoint);
/// Linear time algorithm may be replaced with logarithmic with prefix tree map.
for (auto possible_prefix_setting = next_prefix_setting; possible_prefix_setting != s3_settings.begin();)
{
std::advance(possible_prefix_setting, -1);
const auto & [endpoint_prefix, settings] = *possible_prefix_setting;
if (endpoint.starts_with(endpoint_prefix) && (ignore_user || settings.auth_settings.canBeUsedByUser(user)))
return possible_prefix_setting->second;
}
return {};
}
}

52
src/IO/S3Settings.h Normal file
View File

@ -0,0 +1,52 @@
#pragma once
#include <map>
#include <mutex>
#include <optional>
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Throttler_fwd.h>
#include <IO/S3Common.h>
#include <IO/S3Defines.h>
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
struct Settings;
struct S3Settings
{
S3::AuthSettings auth_settings;
S3::RequestSettings request_settings;
void loadFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DB::Settings & settings);
void updateIfChanged(const S3Settings & settings);
};
class S3SettingsByEndpoint
{
public:
void loadFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const DB::Settings & settings);
std::optional<S3Settings> getSettings(
const std::string & endpoint,
const std::string & user,
bool ignore_user = false) const;
private:
mutable std::mutex mutex;
std::map<const String, const S3Settings> s3_settings;
};
}

View File

@ -72,7 +72,7 @@ struct WriteBufferFromS3::PartData
}
};
BufferAllocationPolicyPtr createBufferAllocationPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings)
BufferAllocationPolicyPtr createBufferAllocationPolicy(const S3::RequestSettings & settings)
{
BufferAllocationPolicy::Settings allocation_settings;
allocation_settings.strict_size = settings.strict_upload_part_size;
@ -91,7 +91,7 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
@ -100,15 +100,14 @@ WriteBufferFromS3::WriteBufferFromS3(
, bucket(bucket_)
, key(key_)
, request_settings(request_settings_)
, upload_settings(request_settings.getUploadSettings())
, write_settings(write_settings_)
, client_ptr(std::move(client_ptr_))
, object_metadata(std::move(object_metadata_))
, buffer_allocation_policy(createBufferAllocationPolicy(upload_settings))
, buffer_allocation_policy(createBufferAllocationPolicy(request_settings))
, task_tracker(
std::make_unique<TaskTracker>(
std::move(schedule_),
upload_settings.max_inflight_parts_for_one_file,
request_settings.max_inflight_parts_for_one_file,
limitedLog))
, blob_log(std::move(blob_log_))
{
@ -165,7 +164,7 @@ void WriteBufferFromS3::preFinalize()
if (multipart_upload_id.empty() && detached_part_data.size() <= 1)
{
if (detached_part_data.empty() || detached_part_data.front().data_size <= upload_settings.max_single_part_upload_size)
if (detached_part_data.empty() || detached_part_data.front().data_size <= request_settings.max_single_part_upload_size)
do_single_part_upload = true;
}
@ -214,9 +213,9 @@ void WriteBufferFromS3::finalizeImpl()
if (request_settings.check_objects_after_upload)
{
S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, "Immediately after upload");
S3::checkObjectExists(*client_ptr, bucket, key, {}, "Immediately after upload");
size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {}, request_settings);
size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {});
if (actual_size != total_size)
throw Exception(
ErrorCodes::S3_ERROR,
@ -505,18 +504,18 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
"Unable to write a part without multipart_upload_id, details: WriteBufferFromS3 created for bucket {}, key {}",
bucket, key);
if (part_number > upload_settings.max_part_number)
if (part_number > request_settings.max_part_number)
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}",
upload_settings.max_part_number, count(), upload_settings.min_upload_part_size, upload_settings.max_upload_part_size,
upload_settings.upload_part_size_multiply_factor, upload_settings.upload_part_size_multiply_parts_count_threshold,
upload_settings.max_single_part_upload_size);
request_settings.max_part_number, count(), request_settings.min_upload_part_size, request_settings.max_upload_part_size,
request_settings.upload_part_size_multiply_factor, request_settings.upload_part_size_multiply_parts_count_threshold,
request_settings.max_single_part_upload_size);
}
if (data.data_size > upload_settings.max_upload_part_size)
if (data.data_size > request_settings.max_upload_part_size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -524,7 +523,7 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
getShortLogDetails(),
part_number,
data.data_size,
upload_settings.max_upload_part_size
request_settings.max_upload_part_size
);
}
@ -611,7 +610,7 @@ void WriteBufferFromS3::completeMultipartUpload()
req.SetMultipartUpload(multipart_upload);
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retry = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
@ -669,8 +668,8 @@ S3::PutObjectRequest WriteBufferFromS3::getPutRequest(PartData & data)
req.SetBody(data.createAwsBuffer());
if (object_metadata.has_value())
req.SetMetadata(object_metadata.value());
if (!upload_settings.storage_class_name.empty())
req.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(upload_settings.storage_class_name));
if (!request_settings.storage_class_name.value.empty())
req.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(request_settings.storage_class_name));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
req.SetContentType("binary/octet-stream");
@ -694,7 +693,7 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
auto & request = std::get<0>(*worker_data);
size_t content_length = request.GetContentLength();
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retry = std::max<UInt64>(request_settings.max_unexpected_write_error_retries.value, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);

View File

@ -9,7 +9,7 @@
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Common/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/BufferAllocationPolicy.h>
@ -38,7 +38,7 @@ public:
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
const S3::RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},
@ -78,8 +78,7 @@ private:
const String bucket;
const String key;
const S3Settings::RequestSettings request_settings;
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const S3::RequestSettings request_settings;
const WriteSettings write_settings;
const std::shared_ptr<const S3::Client> client_ptr;
const std::optional<std::map<String, String>> object_metadata;

View File

@ -546,8 +546,8 @@ public:
std::unique_ptr<WriteBufferFromS3> getWriteBuffer(String file_name = "file")
{
S3Settings::RequestSettings request_settings;
request_settings.updateFromSettingsIfChanged(settings);
S3::RequestSettings request_settings;
request_settings.updateFromSettings(settings, /* if_changed */true, /* validate_settings */false);
client->resetCounters();

View File

@ -32,7 +32,7 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
@ -371,7 +371,7 @@ struct ContextSharedPart : boost::noncopyable
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
OnceFlag system_logs_initialized;
std::unique_ptr<SystemLogs> system_logs TSA_GUARDED_BY(mutex); /// Used to log queries and operations on parts
std::optional<StorageS3Settings> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
std::optional<S3SettingsByEndpoint> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
std::vector<String> warnings TSA_GUARDED_BY(mutex); /// Store warning messages about server configuration.
/// Background executors for *MergeTree tables
@ -4296,7 +4296,7 @@ void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration
{
std::lock_guard lock(shared->mutex);
if (shared->storage_s3_settings)
shared->storage_s3_settings->loadFromConfig("s3", config, getSettingsRef());
shared->storage_s3_settings->loadFromConfig(config, /* config_prefix */"s3", getSettingsRef());
}
}
@ -4348,14 +4348,14 @@ const DistributedSettings & Context::getDistributedSettings() const
return *shared->distributed_settings;
}
const StorageS3Settings & Context::getStorageS3Settings() const
const S3SettingsByEndpoint & Context::getStorageS3Settings() const
{
std::lock_guard lock(shared->mutex);
if (!shared->storage_s3_settings)
{
const auto & config = shared->getConfigRefWithLock(lock);
shared->storage_s3_settings.emplace().loadFromConfig("s3", config, getSettingsRef());
shared->storage_s3_settings.emplace().loadFromConfig(config, "s3", getSettingsRef());
}
return *shared->storage_s3_settings;

View File

@ -117,7 +117,7 @@ struct DistributedSettings;
struct InitialAllRangesAnnouncement;
struct ParallelReadRequest;
struct ParallelReadResponse;
class StorageS3Settings;
class S3SettingsByEndpoint;
class IDatabase;
class DDLWorker;
class ITableFunction;
@ -1115,7 +1115,7 @@ public:
const MergeTreeSettings & getMergeTreeSettings() const;
const MergeTreeSettings & getReplicatedMergeTreeSettings() const;
const DistributedSettings & getDistributedSettings() const;
const StorageS3Settings & getStorageS3Settings() const;
const S3SettingsByEndpoint & getStorageS3Settings() const;
/// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check)
void setMaxTableSizeToDrop(size_t max_size);

View File

@ -2,7 +2,7 @@
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <IO/HTTPHeaderEntries.h>

View File

@ -249,7 +249,7 @@ AzureClientPtr StorageAzureConfiguration::createClient(bool is_read_only, bool a
return result;
}
void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & collection)
void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
{
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);

View File

@ -51,7 +51,7 @@ public:
ContextPtr context) override;
protected:
void fromNamedCollection(const NamedCollection & collection) override;
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
using AzureClient = Azure::Storage::Blobs::BlobContainerClient;

View File

@ -119,7 +119,7 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
setURL(url_str);
}
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & collection)
void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
{
std::string url_str;

View File

@ -46,7 +46,7 @@ public:
ContextPtr context) override;
private:
void fromNamedCollection(const NamedCollection &) override;
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
void setURL(const std::string & url_);

View File

@ -106,15 +106,18 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
const auto & config = context->getConfigRef();
const auto & settings = context->getSettingsRef();
const std::string config_prefix = "s3.";
auto s3_settings = getSettings(config, config_prefix, context, settings.s3_validate_request_settings);
auto s3_settings = getSettings(
config, "s3"/* config_prefix */, context, url.uri_str, settings.s3_validate_request_settings);
request_settings.updateFromSettingsIfChanged(settings);
auth_settings.updateFrom(s3_settings->auth_settings);
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(url.uri.toString(), context->getUserName()))
{
s3_settings->auth_settings.updateIfChanged(endpoint_settings->auth_settings);
s3_settings->request_settings.updateIfChanged(endpoint_settings->request_settings);
}
s3_settings->auth_settings = auth_settings;
s3_settings->request_settings = request_settings;
s3_settings->auth_settings.updateIfChanged(auth_settings);
s3_settings->request_settings.updateIfChanged(request_settings);
if (!headers_from_ast.empty())
{
@ -123,10 +126,7 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
headers_from_ast.begin(), headers_from_ast.end());
}
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(url.uri.toString(), context->getUserName()))
s3_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
auto client = getClient(config, config_prefix, context, *s3_settings, false, &url);
auto client = getClient(url, *s3_settings, context, /* for_disk_s3 */false);
auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(url.key);
auto s3_capabilities = S3Capabilities
{
@ -139,8 +139,9 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
key_generator, "StorageS3", false);
}
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection)
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection, ContextPtr context)
{
const auto settings = context->getSettingsRef();
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
auto filename = collection.getOrDefault<String>("filename", "");
@ -159,9 +160,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
request_settings = S3Settings::RequestSettings(collection);
request_settings = S3::RequestSettings(collection, settings, /* validate_settings */true);
static_configuration = !auth_settings.access_key_id.empty() || auth_settings.no_sign_request.has_value();
static_configuration = !auth_settings.access_key_id.value.empty() || auth_settings.no_sign_request.changed;
keys = {url.key};
}
@ -357,7 +358,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
if (no_sign_request)
auth_settings.no_sign_request = no_sign_request;
static_configuration = !auth_settings.access_key_id.empty() || auth_settings.no_sign_request.has_value();
static_configuration = !auth_settings.access_key_id.value.empty() || auth_settings.no_sign_request.changed;
auth_settings.no_sign_request = no_sign_request;
keys = {url.key};

View File

@ -3,7 +3,7 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
namespace DB
@ -51,14 +51,14 @@ public:
ContextPtr context) override;
private:
void fromNamedCollection(const NamedCollection & collection) override;
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
S3::URI url;
std::vector<String> keys;
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;
S3::RequestSettings request_settings;
HTTPHeaderEntries headers_from_ast; /// Headers from ast is a part of static configuration.
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.

View File

@ -424,7 +424,7 @@ void StorageObjectStorage::Configuration::initialize(
bool with_table_structure)
{
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context))
configuration.fromNamedCollection(*named_collection);
configuration.fromNamedCollection(*named_collection, local_context);
else
configuration.fromAST(engine_args, local_context, with_table_structure);

View File

@ -193,7 +193,7 @@ public:
String structure = "auto";
protected:
virtual void fromNamedCollection(const NamedCollection & collection) = 0;
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0;
void assertInitialized() const;

View File

@ -10,7 +10,7 @@
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueUnorderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/StorageS3Settings.h>
#include <IO/S3Settings.h>
#include <Storages/StorageSnapshot.h>
#include <base/sleep.h>
#include <Common/CurrentThread.h>

View File

@ -1,315 +0,0 @@
#include <Storages/StorageS3Settings.h>
#include <IO/S3Common.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/Exception.h>
#include <Common/Throttler.h>
#include <Common/formatReadable.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SETTING_VALUE;
}
S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const Settings & settings, bool validate_settings)
{
updateFromSettings(settings, false);
if (validate_settings)
validate();
}
S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Settings & settings,
String setting_name_prefix,
bool validate_settings)
: PartUploadSettings(settings, validate_settings)
{
String key = config_prefix + "." + setting_name_prefix;
strict_upload_part_size = config.getUInt64(key + "strict_upload_part_size", strict_upload_part_size);
min_upload_part_size = config.getUInt64(key + "min_upload_part_size", min_upload_part_size);
max_upload_part_size = config.getUInt64(key + "max_upload_part_size", max_upload_part_size);
upload_part_size_multiply_factor = config.getUInt64(key + "upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = config.getUInt64(key + "upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
max_inflight_parts_for_one_file = config.getUInt64(key + "max_inflight_parts_for_one_file", max_inflight_parts_for_one_file);
max_part_number = config.getUInt64(key + "max_part_number", max_part_number);
max_single_part_upload_size = config.getUInt64(key + "max_single_part_upload_size", max_single_part_upload_size);
max_single_operation_copy_size = config.getUInt64(key + "max_single_operation_copy_size", max_single_operation_copy_size);
/// This configuration is only applicable to s3. Other types of object storage are not applicable or have different meanings.
storage_class_name = config.getString(config_prefix + ".s3_storage_class", storage_class_name);
storage_class_name = Poco::toUpperInPlace(storage_class_name);
if (validate_settings)
validate();
}
S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const NamedCollection & collection)
{
strict_upload_part_size = collection.getOrDefault<UInt64>("strict_upload_part_size", strict_upload_part_size);
min_upload_part_size = collection.getOrDefault<UInt64>("min_upload_part_size", min_upload_part_size);
max_single_part_upload_size = collection.getOrDefault<UInt64>("max_single_part_upload_size", max_single_part_upload_size);
upload_part_size_multiply_factor = collection.getOrDefault<UInt64>("upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = collection.getOrDefault<UInt64>("upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
max_inflight_parts_for_one_file = collection.getOrDefault<UInt64>("max_inflight_parts_for_one_file", max_inflight_parts_for_one_file);
/// This configuration is only applicable to s3. Other types of object storage are not applicable or have different meanings.
storage_class_name = collection.getOrDefault<String>("s3_storage_class", storage_class_name);
storage_class_name = Poco::toUpperInPlace(storage_class_name);
validate();
}
void S3Settings::RequestSettings::PartUploadSettings::updateFromSettings(const Settings & settings, bool if_changed)
{
if (!if_changed || settings.s3_strict_upload_part_size.changed)
strict_upload_part_size = settings.s3_strict_upload_part_size;
if (!if_changed || settings.s3_min_upload_part_size.changed)
min_upload_part_size = settings.s3_min_upload_part_size;
if (!if_changed || settings.s3_max_upload_part_size.changed)
max_upload_part_size = settings.s3_max_upload_part_size;
if (!if_changed || settings.s3_upload_part_size_multiply_factor.changed)
upload_part_size_multiply_factor = settings.s3_upload_part_size_multiply_factor;
if (!if_changed || settings.s3_upload_part_size_multiply_parts_count_threshold.changed)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!if_changed || settings.s3_max_inflight_parts_for_one_file.changed)
max_inflight_parts_for_one_file = settings.s3_max_inflight_parts_for_one_file;
if (!if_changed || settings.s3_max_single_part_upload_size.changed)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
}
void S3Settings::RequestSettings::PartUploadSettings::validate()
{
static constexpr size_t min_upload_part_size_limit = 5 * 1024 * 1024;
if (strict_upload_part_size && strict_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting strict_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(strict_upload_part_size), ReadableSize(min_upload_part_size_limit));
if (min_upload_part_size < min_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting min_upload_part_size has invalid value {} which is less than the s3 API limit {}",
ReadableSize(min_upload_part_size), ReadableSize(min_upload_part_size_limit));
static constexpr size_t max_upload_part_size_limit = 5ull * 1024 * 1024 * 1024;
if (max_upload_part_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size has invalid value {} which is greater than the s3 API limit {}",
ReadableSize(max_upload_part_size), ReadableSize(max_upload_part_size_limit));
if (max_single_part_upload_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_part_upload_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_single_part_upload_size), ReadableSize(max_upload_part_size_limit));
if (max_single_operation_copy_size > max_upload_part_size_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_single_operation_copy_size has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_single_operation_copy_size), ReadableSize(max_upload_part_size_limit));
if (max_upload_part_size < min_upload_part_size)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_upload_part_size ({}) can't be less than setting min_upload_part_size {}",
ReadableSize(max_upload_part_size), ReadableSize(min_upload_part_size));
if (!upload_part_size_multiply_factor)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor cannot be zero");
if (!upload_part_size_multiply_parts_count_threshold)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_parts_count_threshold cannot be zero");
if (!max_part_number)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number cannot be zero");
static constexpr size_t max_part_number_limit = 10000;
if (max_part_number > max_part_number_limit)
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting max_part_number has invalid value {} which is grater than the s3 API limit {}",
ReadableSize(max_part_number), ReadableSize(max_part_number_limit));
size_t maybe_overflow;
if (common::mulOverflow(max_upload_part_size, upload_part_size_multiply_factor, maybe_overflow))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting upload_part_size_multiply_factor is too big ({}). "
"Multiplication to max_upload_part_size ({}) will cause integer overflow",
ReadableSize(max_part_number), ReadableSize(max_part_number_limit));
std::unordered_set<String> storage_class_names {"STANDARD", "INTELLIGENT_TIERING"};
if (!storage_class_name.empty() && !storage_class_names.contains(storage_class_name))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Setting storage_class has invalid value {} which only supports STANDARD and INTELLIGENT_TIERING",
storage_class_name);
/// TODO: it's possible to set too small limits. We can check that max possible object size is not too small.
}
S3Settings::RequestSettings::RequestSettings(const Settings & settings, bool validate_settings)
: upload_settings(settings, validate_settings)
{
updateFromSettingsImpl(settings, false);
}
S3Settings::RequestSettings::RequestSettings(const NamedCollection & collection)
: upload_settings(collection)
{
max_single_read_retries = collection.getOrDefault<UInt64>("max_single_read_retries", max_single_read_retries);
max_connections = collection.getOrDefault<UInt64>("max_connections", max_connections);
list_object_keys_size = collection.getOrDefault<UInt64>("list_object_keys_size", list_object_keys_size);
allow_native_copy = collection.getOrDefault<bool>("allow_native_copy", allow_native_copy);
throw_on_zero_files_match = collection.getOrDefault<bool>("throw_on_zero_files_match", throw_on_zero_files_match);
}
S3Settings::RequestSettings::RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Settings & settings,
String setting_name_prefix,
bool validate_settings)
: upload_settings(config, config_prefix, settings, setting_name_prefix, validate_settings)
{
String key = config_prefix + "." + setting_name_prefix;
max_single_read_retries = config.getUInt64(key + "max_single_read_retries", settings.s3_max_single_read_retries);
max_connections = config.getUInt64(key + "max_connections", settings.s3_max_connections);
check_objects_after_upload = config.getBool(key + "check_objects_after_upload", settings.s3_check_objects_after_upload);
list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size);
allow_native_copy = config.getBool(key + "allow_native_copy", allow_native_copy);
throw_on_zero_files_match = config.getBool(key + "throw_on_zero_files_match", settings.s3_throw_on_zero_files_match);
retry_attempts = config.getUInt64(key + "retry_attempts", settings.s3_retry_attempts);
request_timeout_ms = config.getUInt64(key + "request_timeout_ms", settings.s3_request_timeout_ms);
/// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
if (UInt64 max_get_rps = config.getUInt64(key + "max_get_rps", settings.s3_max_get_rps))
{
size_t default_max_get_burst = settings.s3_max_get_burst
? settings.s3_max_get_burst
: (Throttler::default_burst_seconds * max_get_rps);
size_t max_get_burst = config.getUInt64(key + "max_get_burst", default_max_get_burst);
get_request_throttler = std::make_shared<Throttler>(max_get_rps, max_get_burst);
}
if (UInt64 max_put_rps = config.getUInt64(key + "max_put_rps", settings.s3_max_put_rps))
{
size_t default_max_put_burst = settings.s3_max_put_burst
? settings.s3_max_put_burst
: (Throttler::default_burst_seconds * max_put_rps);
size_t max_put_burst = config.getUInt64(key + "max_put_burst", default_max_put_burst);
put_request_throttler = std::make_shared<Throttler>(max_put_rps, max_put_burst);
}
}
void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settings, bool if_changed)
{
if (!if_changed || settings.s3_max_single_read_retries.changed)
max_single_read_retries = settings.s3_max_single_read_retries;
if (!if_changed || settings.s3_max_connections.changed)
max_connections = settings.s3_max_connections;
if (!if_changed || settings.s3_check_objects_after_upload.changed)
check_objects_after_upload = settings.s3_check_objects_after_upload;
if (!if_changed || settings.s3_max_unexpected_write_error_retries.changed)
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
if (!if_changed || settings.s3_list_object_keys_size.changed)
list_object_keys_size = settings.s3_list_object_keys_size;
if ((!if_changed || settings.s3_max_get_rps.changed || settings.s3_max_get_burst.changed) && settings.s3_max_get_rps)
get_request_throttler = std::make_shared<Throttler>(
settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps);
if ((!if_changed || settings.s3_max_put_rps.changed || settings.s3_max_put_burst.changed) && settings.s3_max_put_rps)
put_request_throttler = std::make_shared<Throttler>(
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
if (!if_changed || settings.s3_throw_on_zero_files_match.changed)
throw_on_zero_files_match = settings.s3_throw_on_zero_files_match;
if (!if_changed || settings.s3_retry_attempts.changed)
retry_attempts = settings.s3_retry_attempts;
if (!if_changed || settings.s3_request_timeout_ms.changed)
request_timeout_ms = settings.s3_request_timeout_ms;
}
void S3Settings::RequestSettings::updateFromSettingsIfChanged(const Settings & settings)
{
updateFromSettingsImpl(settings, true);
upload_settings.updateFromSettings(settings, true);
}
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
{
std::lock_guard lock(mutex);
s3_settings.clear();
if (!config.has(config_elem))
return;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
for (const String & key : config_keys)
{
if (config.has(config_elem + "." + key + ".endpoint"))
{
auto endpoint = config.getString(config_elem + "." + key + ".endpoint");
auto auth_settings = S3::AuthSettings::loadFromConfig(config_elem + "." + key, config);
S3Settings::RequestSettings request_settings(config, config_elem + "." + key, settings);
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(request_settings)});
}
}
}
std::optional<S3Settings> StorageS3Settings::getSettings(const String & endpoint, const String & user, bool ignore_user) const
{
std::lock_guard lock(mutex);
auto next_prefix_setting = s3_settings.upper_bound(endpoint);
/// Linear time algorithm may be replaced with logarithmic with prefix tree map.
for (auto possible_prefix_setting = next_prefix_setting; possible_prefix_setting != s3_settings.begin();)
{
std::advance(possible_prefix_setting, -1);
const auto & [endpoint_prefix, settings] = *possible_prefix_setting;
if (endpoint.starts_with(endpoint_prefix) && (ignore_user || settings.auth_settings.canBeUsedByUser(user)))
return possible_prefix_setting->second;
}
return {};
}
}

View File

@ -1,122 +0,0 @@
#pragma once
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <vector>
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Throttler_fwd.h>
#include <IO/S3Common.h>
namespace Poco::Util
{
class AbstractConfiguration;
}
namespace DB
{
struct Settings;
class NamedCollection;
struct S3Settings
{
struct RequestSettings
{
struct PartUploadSettings
{
size_t strict_upload_part_size = 0;
size_t min_upload_part_size = 16 * 1024 * 1024;
size_t max_upload_part_size = 5ULL * 1024 * 1024 * 1024;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;
size_t max_inflight_parts_for_one_file = 20;
size_t max_part_number = 10000;
size_t max_single_part_upload_size = 32 * 1024 * 1024;
size_t max_single_operation_copy_size = 5ULL * 1024 * 1024 * 1024;
String storage_class_name;
void updateFromSettings(const Settings & settings, bool if_changed);
void validate();
private:
PartUploadSettings() = default;
explicit PartUploadSettings(const Settings & settings, bool validate_settings = true);
explicit PartUploadSettings(const NamedCollection & collection);
PartUploadSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Settings & settings,
String setting_name_prefix = {},
bool validate_settings = true);
friend struct RequestSettings;
};
private:
PartUploadSettings upload_settings = {};
public:
size_t max_single_read_retries = 4;
size_t max_connections = 1024;
bool check_objects_after_upload = false;
size_t max_unexpected_write_error_retries = 4;
size_t list_object_keys_size = 1000;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
size_t retry_attempts = 10;
size_t request_timeout_ms = 30000;
bool allow_native_copy = true;
bool throw_on_zero_files_match = false;
const PartUploadSettings & getUploadSettings() const { return upload_settings; }
PartUploadSettings & getUploadSettings() { return upload_settings; }
void setStorageClassName(const String & storage_class_name) { upload_settings.storage_class_name = storage_class_name; }
RequestSettings() = default;
explicit RequestSettings(const Settings & settings, bool validate_settings = true);
explicit RequestSettings(const NamedCollection & collection);
/// What's the setting_name_prefix, and why do we need it?
/// There are (at least) two config sections where s3 settings can be specified:
/// * settings for s3 disk (clickhouse/storage_configuration/disks)
/// * settings for s3 storage (clickhouse/s3), which are also used for backups
/// Even though settings are the same, in case of s3 disk they are prefixed with "s3_"
/// ("s3_max_single_part_upload_size"), but in case of s3 storage they are not
/// ( "max_single_part_upload_size"). Why this happened is a complete mystery to me.
RequestSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Settings & settings,
String setting_name_prefix = {},
bool validate_settings = true);
void updateFromSettingsIfChanged(const Settings & settings);
private:
void updateFromSettingsImpl(const Settings & settings, bool if_changed);
};
S3::AuthSettings auth_settings;
RequestSettings request_settings;
};
/// Settings for the StorageS3.
class StorageS3Settings
{
public:
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
std::optional<S3Settings> getSettings(const String & endpoint, const String & user, bool ignore_user = false) const;
private:
mutable std::mutex mutex;
std::map<const String, const S3Settings> s3_settings;
};
}