mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #51171 from ClickHouse/retry
Decrease default timeouts for S3 and HTTP requests
This commit is contained in:
commit
ab93967fb4
@ -3537,7 +3537,7 @@ Possible values:
|
||||
- Any positive integer.
|
||||
- 0 - Disabled (infinite timeout).
|
||||
|
||||
Default value: 180.
|
||||
Default value: 30.
|
||||
|
||||
## http_receive_timeout {#http_receive_timeout}
|
||||
|
||||
@ -3548,7 +3548,7 @@ Possible values:
|
||||
- Any positive integer.
|
||||
- 0 - Disabled (infinite timeout).
|
||||
|
||||
Default value: 180.
|
||||
Default value: 30.
|
||||
|
||||
## check_query_single_value_result {#check_query_single_value_result}
|
||||
|
||||
|
@ -253,6 +253,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
|
||||
{
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
client,
|
||||
client, // already has long timeout
|
||||
s3_uri.bucket,
|
||||
fs::path(s3_uri.key) / file_name,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
|
@ -145,14 +145,14 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
|
||||
|
||||
const auto create_writer = [&](const auto & key)
|
||||
{
|
||||
return WriteBufferFromS3
|
||||
{
|
||||
return WriteBufferFromS3(
|
||||
s3_client->client,
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
key,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
request_settings_1
|
||||
};
|
||||
);
|
||||
};
|
||||
|
||||
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_file_info.path);
|
||||
|
@ -41,7 +41,7 @@
|
||||
/// The boundary on which the blocks for asynchronous file operations should be aligned.
|
||||
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
|
||||
|
||||
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 180
|
||||
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 30
|
||||
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
|
||||
/// Maximum number of http-connections between two endpoints
|
||||
/// the number is unmotivated
|
||||
|
@ -102,6 +102,7 @@ class IColumn;
|
||||
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
|
||||
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
|
||||
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
|
||||
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
|
||||
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
|
||||
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
|
||||
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
|
||||
|
@ -80,6 +80,8 @@ namespace SettingsChangesHistory
|
||||
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
|
||||
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
|
||||
{
|
||||
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
|
||||
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
|
||||
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
|
||||
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
|
||||
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},
|
||||
|
@ -149,7 +149,7 @@ private:
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
@ -168,7 +168,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client.get(),
|
||||
clients.get()->client,
|
||||
bucket,
|
||||
path,
|
||||
version_id,
|
||||
@ -218,7 +218,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client.get(),
|
||||
clients.get()->client,
|
||||
bucket,
|
||||
object.remote_path,
|
||||
version_id,
|
||||
@ -243,8 +243,10 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
if (write_settings.s3_allow_parallel_part_upload)
|
||||
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
|
||||
|
||||
auto clients_ = clients.get();
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
client.get(),
|
||||
clients_->client,
|
||||
clients_->client_with_long_timeout,
|
||||
bucket,
|
||||
object.remote_path,
|
||||
buf_size,
|
||||
@ -258,7 +260,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
|
||||
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
|
||||
}
|
||||
@ -266,7 +268,7 @@ ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefi
|
||||
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
|
||||
S3::ListObjectsV2Request request;
|
||||
request.SetBucket(bucket);
|
||||
@ -307,7 +309,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
|
||||
|
||||
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
|
||||
@ -333,7 +335,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
}
|
||||
else
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
auto settings_ptr = s3_settings.get();
|
||||
|
||||
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
|
||||
@ -394,7 +396,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
|
||||
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
|
||||
return {};
|
||||
@ -410,7 +412,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
|
||||
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
|
||||
ObjectMetadata result;
|
||||
result.size_bytes = object_info.size;
|
||||
@ -429,7 +431,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
/// Shortcut for S3
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
@ -445,7 +447,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
void S3ObjectStorage::copyObject( // NOLINT
|
||||
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
@ -458,35 +460,33 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
|
||||
s3_settings.set(std::move(s3_settings_));
|
||||
}
|
||||
|
||||
void S3ObjectStorage::setNewClient(std::unique_ptr<S3::Client> && client_)
|
||||
{
|
||||
client.set(std::move(client_));
|
||||
}
|
||||
|
||||
void S3ObjectStorage::shutdown()
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto clients_ptr = clients.get();
|
||||
/// This call stops any next retry attempts for ongoing S3 requests.
|
||||
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
|
||||
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
|
||||
/// This should significantly speed up shutdown process if S3 is unhealthy.
|
||||
const_cast<S3::Client &>(*client_ptr).DisableRequestProcessing();
|
||||
const_cast<S3::Client &>(*clients_ptr->client).DisableRequestProcessing();
|
||||
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).DisableRequestProcessing();
|
||||
}
|
||||
|
||||
void S3ObjectStorage::startup()
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto clients_ptr = clients.get();
|
||||
|
||||
/// Need to be enabled if it was disabled during shutdown() call.
|
||||
const_cast<S3::Client &>(*client_ptr).EnableRequestProcessing();
|
||||
const_cast<S3::Client &>(*clients_ptr->client).EnableRequestProcessing();
|
||||
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).EnableRequestProcessing();
|
||||
}
|
||||
|
||||
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||
{
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context);
|
||||
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
|
||||
auto new_clients = std::make_unique<Clients>(std::move(new_client), *new_s3_settings);
|
||||
s3_settings.set(std::move(new_s3_settings));
|
||||
client.set(std::move(new_client));
|
||||
clients.set(std::move(new_clients));
|
||||
}
|
||||
|
||||
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
@ -501,7 +501,9 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
endpoint);
|
||||
}
|
||||
|
||||
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
|
||||
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
|
||||
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -39,6 +39,16 @@ struct S3ObjectStorageSettings
|
||||
|
||||
class S3ObjectStorage : public IObjectStorage
|
||||
{
|
||||
public:
|
||||
struct Clients
|
||||
{
|
||||
std::shared_ptr<S3::Client> client;
|
||||
std::shared_ptr<S3::Client> client_with_long_timeout;
|
||||
|
||||
Clients() = default;
|
||||
Clients(std::shared_ptr<S3::Client> client, const S3ObjectStorageSettings & settings);
|
||||
};
|
||||
|
||||
private:
|
||||
friend class S3PlainObjectStorage;
|
||||
|
||||
@ -51,7 +61,7 @@ private:
|
||||
String bucket_,
|
||||
String connection_string)
|
||||
: bucket(bucket_)
|
||||
, client(std::move(client_))
|
||||
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
|
||||
, s3_settings(std::move(s3_settings_))
|
||||
, s3_capabilities(s3_capabilities_)
|
||||
, version_id(std::move(version_id_))
|
||||
@ -159,14 +169,12 @@ public:
|
||||
private:
|
||||
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
|
||||
|
||||
void setNewClient(std::unique_ptr<S3::Client> && client_);
|
||||
|
||||
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
||||
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
||||
|
||||
std::string bucket;
|
||||
|
||||
MultiVersion<S3::Client> client;
|
||||
MultiVersion<Clients> clients;
|
||||
MultiVersion<S3ObjectStorageSettings> s3_settings;
|
||||
S3Capabilities s3_capabilities;
|
||||
|
||||
|
@ -129,7 +129,7 @@ std::unique_ptr<S3::Client> getClient(
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 path must ends with '/', but '{}' doesn't.", uri.key);
|
||||
|
||||
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000);
|
||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000);
|
||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
|
||||
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
|
||||
|
@ -100,7 +100,7 @@ std::unique_ptr<Client> Client::create(
|
||||
size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
|
||||
const Aws::Client::ClientConfiguration & client_configuration,
|
||||
const PocoHTTPClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
bool use_virtual_addressing)
|
||||
{
|
||||
@ -109,9 +109,16 @@ std::unique_ptr<Client> Client::create(
|
||||
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
|
||||
}
|
||||
|
||||
std::unique_ptr<Client> Client::create(const Client & other)
|
||||
std::unique_ptr<Client> Client::clone(
|
||||
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy,
|
||||
std::optional<Int64> override_request_timeout_ms) const
|
||||
{
|
||||
return std::unique_ptr<Client>(new Client(other));
|
||||
PocoHTTPClientConfiguration new_configuration = client_configuration;
|
||||
if (override_retry_strategy.has_value())
|
||||
new_configuration.retryStrategy = *override_retry_strategy;
|
||||
if (override_request_timeout_ms.has_value())
|
||||
new_configuration.requestTimeoutMs = *override_request_timeout_ms;
|
||||
return std::unique_ptr<Client>(new Client(*this, new_configuration));
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -134,11 +141,14 @@ Client::Client(
|
||||
size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
|
||||
const Aws::Client::ClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
bool use_virtual_addressing)
|
||||
: Aws::S3::S3Client(credentials_provider_, client_configuration, std::move(sign_payloads), use_virtual_addressing)
|
||||
const PocoHTTPClientConfiguration & client_configuration_,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads_,
|
||||
bool use_virtual_addressing_)
|
||||
: Aws::S3::S3Client(credentials_provider_, client_configuration_, sign_payloads_, use_virtual_addressing_)
|
||||
, credentials_provider(credentials_provider_)
|
||||
, client_configuration(client_configuration_)
|
||||
, sign_payloads(sign_payloads_)
|
||||
, use_virtual_addressing(use_virtual_addressing_)
|
||||
, max_redirects(max_redirects_)
|
||||
, sse_kms_config(std::move(sse_kms_config_))
|
||||
, log(&Poco::Logger::get("S3Client"))
|
||||
@ -175,10 +185,15 @@ Client::Client(
|
||||
ClientCacheRegistry::instance().registerClient(cache);
|
||||
}
|
||||
|
||||
Client::Client(const Client & other)
|
||||
: Aws::S3::S3Client(other)
|
||||
Client::Client(
|
||||
const Client & other, const PocoHTTPClientConfiguration & client_configuration_)
|
||||
: Aws::S3::S3Client(other.credentials_provider, client_configuration_, other.sign_payloads,
|
||||
other.use_virtual_addressing)
|
||||
, initial_endpoint(other.initial_endpoint)
|
||||
, credentials_provider(other.credentials_provider)
|
||||
, client_configuration(client_configuration_)
|
||||
, sign_payloads(other.sign_payloads)
|
||||
, use_virtual_addressing(other.use_virtual_addressing)
|
||||
, explicit_region(other.explicit_region)
|
||||
, detect_region(other.detect_region)
|
||||
, provider_type(other.provider_type)
|
||||
|
@ -105,6 +105,8 @@ private:
|
||||
class Client : private Aws::S3::S3Client
|
||||
{
|
||||
public:
|
||||
class RetryStrategy;
|
||||
|
||||
/// we use a factory method to verify arguments before creating a client because
|
||||
/// there are certain requirements on arguments for it to work correctly
|
||||
/// e.g. Client::RetryStrategy should be used
|
||||
@ -112,11 +114,19 @@ public:
|
||||
size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
|
||||
const Aws::Client::ClientConfiguration & client_configuration,
|
||||
const PocoHTTPClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
bool use_virtual_addressing);
|
||||
|
||||
static std::unique_ptr<Client> create(const Client & other);
|
||||
/// Create a client with adjusted settings:
|
||||
/// * override_retry_strategy can be used to disable retries to avoid nested retries when we have
|
||||
/// a retry loop outside of S3 client. Specifically, for read and write buffers. Currently not
|
||||
/// actually used.
|
||||
/// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest
|
||||
/// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321
|
||||
std::unique_ptr<Client> clone(
|
||||
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy = std::nullopt,
|
||||
std::optional<Int64> override_request_timeout_ms = std::nullopt) const;
|
||||
|
||||
Client & operator=(const Client &) = delete;
|
||||
|
||||
@ -211,11 +221,12 @@ private:
|
||||
Client(size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
|
||||
const Aws::Client::ClientConfiguration& client_configuration,
|
||||
const PocoHTTPClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
bool use_virtual_addressing);
|
||||
|
||||
Client(const Client & other);
|
||||
Client(
|
||||
const Client & other, const PocoHTTPClientConfiguration & client_configuration);
|
||||
|
||||
/// Leave regular functions private so we don't accidentally use them
|
||||
/// otherwise region and endpoint redirection won't work
|
||||
@ -251,6 +262,9 @@ private:
|
||||
|
||||
String initial_endpoint;
|
||||
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
|
||||
PocoHTTPClientConfiguration client_configuration;
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads;
|
||||
bool use_virtual_addressing;
|
||||
|
||||
std::string explicit_region;
|
||||
mutable bool detect_region = true;
|
||||
|
@ -89,6 +89,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
|
||||
DB::S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
|
||||
DB::WriteBufferFromS3 write_buffer(
|
||||
client,
|
||||
client,
|
||||
uri.bucket,
|
||||
uri.key,
|
||||
|
@ -77,6 +77,7 @@ struct WriteBufferFromS3::PartData
|
||||
|
||||
WriteBufferFromS3::WriteBufferFromS3(
|
||||
std::shared_ptr<const S3::Client> client_ptr_,
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
size_t buf_size_,
|
||||
@ -91,6 +92,7 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
, upload_settings(request_settings.getUploadSettings())
|
||||
, write_settings(write_settings_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, client_with_long_timeout_ptr(std::move(client_with_long_timeout_ptr_))
|
||||
, object_metadata(std::move(object_metadata_))
|
||||
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
|
||||
, task_tracker(
|
||||
@ -552,7 +554,7 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
|
||||
Stopwatch watch;
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
||||
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(req);
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
|
@ -29,6 +29,8 @@ class WriteBufferFromS3 final : public WriteBufferFromFileBase
|
||||
public:
|
||||
WriteBufferFromS3(
|
||||
std::shared_ptr<const S3::Client> client_ptr_,
|
||||
/// for CompleteMultipartUploadRequest, because it blocks on recv() for a few seconds on big uploads
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
size_t buf_size_,
|
||||
@ -86,6 +88,7 @@ private:
|
||||
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
|
||||
const WriteSettings write_settings;
|
||||
const std::shared_ptr<const S3::Client> client_ptr;
|
||||
const std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
|
||||
const std::optional<std::map<String, String>> object_metadata;
|
||||
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
|
||||
|
||||
|
@ -526,6 +526,7 @@ public:
|
||||
getAsyncPolicy().setAutoExecute(false);
|
||||
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
client,
|
||||
client,
|
||||
bucket,
|
||||
file_name,
|
||||
|
@ -150,7 +150,7 @@ public:
|
||||
KeysWithInfo * read_keys_,
|
||||
const S3Settings::RequestSettings & request_settings_)
|
||||
: WithContext(context_)
|
||||
, client(S3::Client::create(client_))
|
||||
, client(client_.clone())
|
||||
, globbed_uri(globbed_uri_)
|
||||
, query(query_)
|
||||
, virtual_header(virtual_header_)
|
||||
@ -783,6 +783,7 @@ public:
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromS3>(
|
||||
configuration_.client,
|
||||
configuration_.client_with_long_timeout,
|
||||
bucket,
|
||||
key,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
@ -1309,6 +1310,8 @@ void StorageS3::Configuration::connect(ContextPtr context)
|
||||
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
|
||||
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
|
||||
});
|
||||
|
||||
client_with_long_timeout = client->clone(std::nullopt, request_settings.long_request_timeout_ms);
|
||||
}
|
||||
|
||||
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
|
||||
|
@ -274,6 +274,7 @@ public:
|
||||
HTTPHeaderEntries headers_from_ast;
|
||||
|
||||
std::shared_ptr<const S3::Client> client;
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout;
|
||||
std::vector<String> keys;
|
||||
};
|
||||
|
||||
|
@ -199,7 +199,7 @@ S3Settings::RequestSettings::RequestSettings(
|
||||
list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size);
|
||||
throw_on_zero_files_match = config.getBool(key + "throw_on_zero_files_match", settings.s3_throw_on_zero_files_match);
|
||||
retry_attempts = config.getUInt64(key + "retry_attempts", settings.s3_retry_attempts);
|
||||
request_timeout_ms = config.getUInt64(key + "request_timeout_ms", request_timeout_ms);
|
||||
request_timeout_ms = config.getUInt64(key + "request_timeout_ms", settings.s3_request_timeout_ms);
|
||||
|
||||
/// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload,
|
||||
/// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
|
||||
@ -255,6 +255,9 @@ void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settin
|
||||
|
||||
if (!if_changed || settings.s3_retry_attempts.changed)
|
||||
retry_attempts = settings.s3_retry_attempts;
|
||||
|
||||
if (!if_changed || settings.s3_request_timeout_ms.changed)
|
||||
request_timeout_ms = settings.s3_request_timeout_ms;
|
||||
}
|
||||
|
||||
void S3Settings::RequestSettings::updateFromSettings(const Settings & settings)
|
||||
|
@ -69,7 +69,8 @@ struct S3Settings
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
size_t retry_attempts = 10;
|
||||
size_t request_timeout_ms = 30000;
|
||||
size_t request_timeout_ms = 3000;
|
||||
size_t long_request_timeout_ms = 30000; // TODO: Take this from config like request_timeout_ms
|
||||
|
||||
bool throw_on_zero_files_match = false;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user