Revert "Revert "s3 adaptive timeouts""

This commit is contained in:
Sema Checherinda 2023-11-20 14:53:22 +01:00 committed by GitHub
parent 535a75b1bc
commit f999337dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 430 additions and 202 deletions

View File

@ -26,7 +26,6 @@ HTTPServerSession::HTTPServerSession(const StreamSocket& socket, HTTPServerParam
_maxKeepAliveRequests(pParams->getMaxKeepAliveRequests())
{
setTimeout(pParams->getTimeout());
this->socket().setReceiveTimeout(pParams->getTimeout());
}

View File

@ -93,9 +93,34 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout)
void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout)
{
_connectionTimeout = connectionTimeout;
_sendTimeout = sendTimeout;
_receiveTimeout = receiveTimeout;
try
{
_connectionTimeout = connectionTimeout;
if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) {
_sendTimeout = sendTimeout;
if (connected())
_socket.setSendTimeout(_sendTimeout);
}
if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) {
_receiveTimeout = receiveTimeout;
if (connected())
_socket.setReceiveTimeout(_receiveTimeout);
}
}
catch (NetException &)
{
#ifndef NDEBUG
throw;
#else
// mute exceptions in release
// just in case when changing settings on socket is not allowed
// however it should be OK for timeouts
#endif
}
}

View File

@ -4826,3 +4826,10 @@ When set to `true` the metadata files are written with `VERSION_FULL_OBJECT_KEY`
When set to `false` the metadata files are written with the previous format version, `VERSION_INLINE_DATA`. With that format only suffixes of object storage key names are are written to the metadata files. The prefix for all of object storage key names is set in configurations files at `storage_configuration.disks` section.
Default value: `false`.
## s3_use_adaptive_timeouts {#s3_use_adaptive_timeouts}
When set to `true` than for all s3 requests first two attempts are made with low send and receive timeouts.
When set to `false` than all attempts are made with identical timeouts.
Default value: `true`.

View File

@ -55,7 +55,9 @@ namespace
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false, request_settings.get_request_throttler, request_settings.put_request_throttler,
/* for_disk_s3 = */ false,
request_settings.get_request_throttler,
request_settings.put_request_throttler,
s3_uri.uri.getScheme());
client_configuration.endpointOverride = s3_uri.endpoint;
@ -167,7 +169,6 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
blob_path.size(), mode);
copyS3File(
client,
client,
s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup,
@ -229,7 +230,6 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
{
LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName());
copyS3File(
client,
client,
/* src_bucket */ blob_path[1],
/* src_key= */ blob_path[0],
@ -268,7 +268,7 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
@ -298,7 +298,6 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
{
return std::make_unique<WriteBufferFromS3>(
client,
client, // already has long timeout
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -148,7 +148,6 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
const auto create_writer = [&](const auto & key)
{
return WriteBufferFromS3(
s3_client->client,
s3_client->client,
s3_client->uri.bucket,
key,

View File

@ -94,6 +94,7 @@ class IColumn;
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_use_adaptive_timeouts, true, "When adaptive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
@ -104,7 +105,7 @@ class IColumn;
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(UInt64, s3_retry_attempts, 100, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(UInt64, s3_request_timeout_ms, 30000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \

View File

@ -155,7 +155,7 @@ private:
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto settings_ptr = s3_settings.get();
return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@ -174,7 +174,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
clients.get()->client,
client.get(),
bucket,
path,
version_id,
@ -224,7 +224,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
{
auto settings_ptr = s3_settings.get();
return std::make_unique<ReadBufferFromS3>(
clients.get()->client,
client.get(),
bucket,
object.remote_path,
version_id,
@ -249,10 +249,8 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto clients_ = clients.get();
return std::make_unique<WriteBufferFromS3>(
clients_->client,
clients_->client_with_long_timeout,
client.get(),
bucket,
object.remote_path,
buf_size,
@ -266,15 +264,12 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = clients.get()->client;
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
}
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = clients.get()->client;
S3::ListObjectsV2Request request;
request.SetBucket(bucket);
@ -289,7 +284,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
outcome = client_ptr->ListObjectsV2(request);
outcome = client.get()->ListObjectsV2(request);
throwIfError(outcome);
auto result = outcome.GetResult();
@ -320,14 +315,12 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
{
auto client_ptr = clients.get()->client;
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
S3::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(object.remote_path);
auto outcome = client_ptr->DeleteObject(request);
auto outcome = client.get()->DeleteObject(request);
throwIfUnexpectedError(outcome, if_exists);
@ -346,7 +339,6 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
}
else
{
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
@ -375,7 +367,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
S3::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
auto outcome = client.get()->DeleteObjects(request);
throwIfUnexpectedError(outcome, if_exists);
@ -407,7 +399,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
return {};
@ -423,7 +415,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
ObjectMetadata result;
result.size_bytes = object_info.size;
@ -444,12 +436,12 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto clients_ = clients.get();
auto client_ = client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(clients_->client,
clients_->client_with_long_timeout,
copyS3File(
client.get(),
bucket,
object_from.remote_path,
0,
@ -473,12 +465,11 @@ void S3ObjectStorage::copyObject( // NOLINT
const WriteSettings &,
std::optional<ObjectAttributes> object_to_attributes)
{
auto clients_ = clients.get();
auto client_ = client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(clients_->client,
clients_->client_with_long_timeout,
copyS3File(client_,
bucket,
object_from.remote_path,
0,
@ -499,31 +490,25 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
void S3ObjectStorage::shutdown()
{
auto clients_ptr = clients.get();
/// This call stops any next retry attempts for ongoing S3 requests.
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
/// This should significantly speed up shutdown process if S3 is unhealthy.
const_cast<S3::Client &>(*clients_ptr->client).DisableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).DisableRequestProcessing();
const_cast<S3::Client &>(*client.get()).DisableRequestProcessing();
}
void S3ObjectStorage::startup()
{
auto clients_ptr = clients.get();
/// Need to be enabled if it was disabled during shutdown() call.
const_cast<S3::Client &>(*clients_ptr->client).EnableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).EnableRequestProcessing();
const_cast<S3::Client &>(*client.get()).EnableRequestProcessing();
}
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
auto new_clients = std::make_unique<Clients>(std::move(new_client), *new_s3_settings);
s3_settings.set(std::move(new_s3_settings));
clients.set(std::move(new_clients));
client.set(std::move(new_client));
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
@ -538,9 +523,6 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
endpoint, object_key_prefix);
}
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const
{
/// Path to store the new S3 object.

View File

@ -39,16 +39,6 @@ struct S3ObjectStorageSettings
class S3ObjectStorage : public IObjectStorage
{
public:
struct Clients
{
std::shared_ptr<S3::Client> client;
std::shared_ptr<S3::Client> client_with_long_timeout;
Clients() = default;
Clients(std::shared_ptr<S3::Client> client, const S3ObjectStorageSettings & settings);
};
private:
friend class S3PlainObjectStorage;
@ -63,7 +53,7 @@ private:
String object_key_prefix_)
: bucket(std::move(bucket_))
, object_key_prefix(std::move(object_key_prefix_))
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
@ -184,7 +174,8 @@ private:
std::string bucket;
String object_key_prefix;
MultiVersion<Clients> clients;
MultiVersion<S3::Client> client;
MultiVersion<S3ObjectStorageSettings> s3_settings;
S3Capabilities s3_capabilities;

View File

@ -60,13 +60,15 @@ std::unique_ptr<S3::Client> getClient(
uri.uri.getScheme());
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;
client_configuration.http_keep_alive_timeout_ms
= config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
client_configuration.http_keep_alive_timeout_ms = config.getUInt(
config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
client_configuration.wait_on_pool_size_limit = false;
client_configuration.s3_use_adaptive_timeouts = config.getBool(
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
/*
* Override proxy configuration for backwards compatibility with old configuration format.

View File

@ -133,4 +133,86 @@ ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings
settings.http_receive_timeout);
}
class SendReceiveTimeoutsForFirstAttempt
{
private:
static constexpr size_t known_methods_count = 6;
using KnownMethodsArray = std::array<String, known_methods_count>;
static const KnownMethodsArray known_methods;
/// HTTP_POST is used for CompleteMultipartUpload requests. Its latency could be high.
/// These requests need longer timeout, especially when minio is used.
/// The same assumption are made for HTTP_DELETE, HTTP_PATCH
/// That requests are more heavy that HTTP_GET, HTTP_HEAD, HTTP_PUT
static constexpr Poco::Timestamp::TimeDiff first_byte_ms[known_methods_count][2] =
{
/* GET */ {200, 200},
/* POST */ {200, 200},
/* DELETE */ {200, 200},
/* PUT */ {200, 200},
/* HEAD */ {200, 200},
/* PATCH */ {200, 200},
};
static constexpr Poco::Timestamp::TimeDiff rest_bytes_ms[known_methods_count][2] =
{
/* GET */ {500, 500},
/* POST */ {1000, 30000},
/* DELETE */ {1000, 10000},
/* PUT */ {1000, 3000},
/* HEAD */ {500, 500},
/* PATCH */ {1000, 10000},
};
static_assert(sizeof(first_byte_ms) == sizeof(rest_bytes_ms));
static_assert(sizeof(first_byte_ms) == known_methods_count * sizeof(Poco::Timestamp::TimeDiff) * 2);
static size_t getMethodIndex(const String & method)
{
KnownMethodsArray::const_iterator it = std::find(known_methods.begin(), known_methods.end(), method);
chassert(it != known_methods.end());
if (it == known_methods.end())
return 0;
return std::distance(known_methods.begin(), it);
}
public:
static std::pair<Poco::Timespan, Poco::Timespan> getSendReceiveTimeout(const String & method, bool first_byte)
{
auto idx = getMethodIndex(method);
if (first_byte)
return std::make_pair(
Poco::Timespan(first_byte_ms[idx][0] * 1000),
Poco::Timespan(first_byte_ms[idx][1] * 1000)
);
return std::make_pair(
Poco::Timespan(rest_bytes_ms[idx][0] * 1000),
Poco::Timespan(rest_bytes_ms[idx][1] * 1000)
);
}
};
const SendReceiveTimeoutsForFirstAttempt::KnownMethodsArray SendReceiveTimeoutsForFirstAttempt::known_methods =
{
"GET", "POST", "DELETE", "PUT", "HEAD", "PATCH"
};
ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(const String & method, bool first_attempt, bool first_byte) const
{
if (!first_attempt)
return *this;
auto [send, recv] = SendReceiveTimeoutsForFirstAttempt::getSendReceiveTimeout(method, first_byte);
auto aggressive = *this;
aggressive.send_timeout = saturate(send, send_timeout);
aggressive.receive_timeout = saturate(recv, receive_timeout);
return aggressive;
}
}

View File

@ -67,6 +67,8 @@ struct ConnectionTimeouts
/// Timeouts for the case when we will try many addresses in a loop.
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings);
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout);
ConnectionTimeouts getAdaptiveTimeouts(const String & method, bool first_attempt, bool first_byte) const;
};
}

View File

@ -50,12 +50,6 @@ namespace ErrorCodes
namespace
{
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
Poco::Net::HTTPClientSession::ProxyConfig proxyConfigurationToPocoProxyConfig(const ProxyConfiguration & proxy_configuration)
{
Poco::Net::HTTPClientSession::ProxyConfig poco_proxy_config;
@ -359,6 +353,12 @@ namespace
};
}
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout)
{
if (!response.getKeepAlive())

View File

@ -113,4 +113,6 @@ std::istream * receiveResponse(
void assertResponseIsOk(
const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, bool allow_redirects = false);
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts);
}

View File

@ -167,9 +167,9 @@ bool ReadBufferFromS3::nextImpl()
}
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 0; !next_result; ++attempt)
for (size_t attempt = 1; !next_result; ++attempt)
{
bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries;
bool last_attempt = attempt >= request_settings.max_single_read_retries;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
@ -177,7 +177,7 @@ bool ReadBufferFromS3::nextImpl()
{
if (!impl)
{
impl = initialize();
impl = initialize(attempt);
if (use_external_buffer)
{
@ -232,9 +232,9 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
{
size_t initial_n = n;
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 0; n > 0; ++attempt)
for (size_t attempt = 1; n > 0; ++attempt)
{
bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries;
bool last_attempt = attempt >= request_settings.max_single_read_retries;
size_t bytes_copied = 0;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
@ -266,7 +266,7 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
try
{
result = sendRequest(range_begin, range_begin + n - 1);
result = sendRequest(attempt, range_begin, range_begin + n - 1);
std::istream & istr = result->GetBody();
copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied);
@ -304,8 +304,8 @@ bool ReadBufferFromS3::processException(Poco::Exception & e, size_t read_offset,
LOG_DEBUG(
log,
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, "
"Attempt: {}, Message: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, e.message());
"Attempt: {}/{}, Message: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, request_settings.max_single_read_retries, e.message());
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
@ -463,7 +463,7 @@ ReadBufferFromS3::~ReadBufferFromS3()
}
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize(size_t attempt)
{
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
read_all_range_successfully = false;
@ -475,13 +475,13 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
if (read_until_position && offset >= read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
read_result = sendRequest(offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
read_result = sendRequest(attempt, offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
return std::make_unique<ReadBufferFromIStream>(read_result->GetBody(), buffer_size);
}
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, size_t range_begin, std::optional<size_t> range_end_incl) const
{
S3::GetObjectRequest req;
req.SetBucket(bucket);
@ -489,6 +489,8 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin
if (!version_id.empty())
req.SetVersionId(version_id);
req.SetAdditionalCustomHeaderValue("clickhouse-request", fmt::format("attempt={}", attempt));
if (range_end_incl)
{
req.SetRange(fmt::format("bytes={}-{}", range_begin, *range_end_incl));

View File

@ -79,7 +79,7 @@ public:
bool supportsReadAt() override { return true; }
private:
std::unique_ptr<ReadBuffer> initialize();
std::unique_ptr<ReadBuffer> initialize(size_t attempt);
/// If true, if we destroy impl now, no work was wasted. Just for metrics.
bool atEndOfRequestedRangeGuess();
@ -88,7 +88,7 @@ private:
/// Returns true if the error looks retriable.
bool processException(Poco::Exception & e, size_t read_offset, size_t attempt) const;
Aws::S3::Model::GetObjectResult sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const;
Aws::S3::Model::GetObjectResult sendRequest(size_t attempt, size_t range_begin, std::optional<size_t> range_end_incl) const;
bool readAllRangeSuccessfully() const;

View File

@ -118,16 +118,9 @@ std::unique_ptr<Client> Client::create(
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
}
std::unique_ptr<Client> Client::clone(
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy,
std::optional<Int64> override_request_timeout_ms) const
std::unique_ptr<Client> Client::clone() const
{
PocoHTTPClientConfiguration new_configuration = client_configuration;
if (override_retry_strategy.has_value())
new_configuration.retryStrategy = *override_retry_strategy;
if (override_request_timeout_ms.has_value())
new_configuration.requestTimeoutMs = *override_request_timeout_ms;
return std::unique_ptr<Client>(new Client(*this, new_configuration));
return std::unique_ptr<Client>(new Client(*this, client_configuration));
}
namespace
@ -905,6 +898,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
s3_retry_attempts,
enable_s3_requests_logging,
for_disk_s3,
context->getGlobalContext()->getSettingsRef().s3_use_adaptive_timeouts,
get_request_throttler,
put_request_throttler,
error_report);

View File

@ -118,15 +118,7 @@ public:
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
/// Create a client with adjusted settings:
/// * override_retry_strategy can be used to disable retries to avoid nested retries when we have
/// a retry loop outside of S3 client. Specifically, for read and write buffers. Currently not
/// actually used.
/// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest
/// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321
std::unique_ptr<Client> clone(
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy = std::nullopt,
std::optional<Int64> override_request_timeout_ms = std::nullopt) const;
std::unique_ptr<Client> clone() const;
Client & operator=(const Client &) = delete;

View File

@ -99,6 +99,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
unsigned int s3_retry_attempts_,
bool enable_s3_requests_logging_,
bool for_disk_s3_,
bool s3_use_adaptive_timeouts_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_,
std::function<void(const DB::ProxyConfiguration &)> error_report_)
@ -111,6 +112,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
, for_disk_s3(for_disk_s3_)
, get_request_throttler(get_request_throttler_)
, put_request_throttler(put_request_throttler_)
, s3_use_adaptive_timeouts(s3_use_adaptive_timeouts_)
, error_report(error_report_)
{
}
@ -157,6 +159,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
Poco::Timespan(client_configuration.http_keep_alive_timeout_ms * 1000))) /// flag indicating whether keep-alive is enabled is set to each session upon creation
, remote_host_filter(client_configuration.remote_host_filter)
, s3_max_redirects(client_configuration.s3_max_redirects)
, s3_use_adaptive_timeouts(client_configuration.s3_use_adaptive_timeouts)
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
, for_disk_s3(client_configuration.for_disk_s3)
, get_request_throttler(client_configuration.get_request_throttler)
@ -268,6 +271,38 @@ void PocoHTTPClient::addMetric(const Aws::Http::HttpRequest & request, S3MetricT
ProfileEvents::increment(disk_s3_events_map[static_cast<unsigned int>(type)][static_cast<unsigned int>(kind)], amount);
}
String extractAttemptFromInfo(const Aws::String & request_info)
{
static auto key = Aws::String("attempt=");
auto key_begin = request_info.find(key, 0);
if (key_begin == Aws::String::npos)
return "1";
auto val_begin = key_begin + key.size();
auto val_end = request_info.find(';', val_begin);
if (val_end == Aws::String::npos)
val_end = request_info.size();
return request_info.substr(val_begin, val_end-val_begin);
}
String getOrEmpty(const Aws::Http::HeaderValueCollection & map, const String & key)
{
auto it = map.find(key);
if (it == map.end())
return {};
return it->second;
}
ConnectionTimeouts PocoHTTPClient::getTimeouts(const String & method, bool first_attempt, bool first_byte) const
{
if (!s3_use_adaptive_timeouts)
return timeouts;
return timeouts.getAdaptiveTimeouts(method, first_attempt, first_byte);
}
void PocoHTTPClient::makeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<PocoHTTPResponse> & response,
@ -282,6 +317,25 @@ void PocoHTTPClient::makeRequestInternal(
makeRequestInternalImpl<false>(request, request_configuration, response, readLimiter, writeLimiter);
}
String getMethod(const Aws::Http::HttpRequest & request)
{
switch (request.GetMethod())
{
case Aws::Http::HttpMethod::HTTP_GET:
return Poco::Net::HTTPRequest::HTTP_GET;
case Aws::Http::HttpMethod::HTTP_POST:
return Poco::Net::HTTPRequest::HTTP_POST;
case Aws::Http::HttpMethod::HTTP_DELETE:
return Poco::Net::HTTPRequest::HTTP_DELETE;
case Aws::Http::HttpMethod::HTTP_PUT:
return Poco::Net::HTTPRequest::HTTP_PUT;
case Aws::Http::HttpMethod::HTTP_HEAD:
return Poco::Net::HTTPRequest::HTTP_HEAD;
case Aws::Http::HttpMethod::HTTP_PATCH:
return Poco::Net::HTTPRequest::HTTP_PATCH;
}
}
template <bool pooled>
void PocoHTTPClient::makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
@ -295,9 +349,14 @@ void PocoHTTPClient::makeRequestInternalImpl(
Poco::Logger * log = &Poco::Logger::get("AWSClient");
auto uri = request.GetUri().GetURIString();
auto method = getMethod(request);
auto sdk_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), Aws::Http::SDK_REQUEST_HEADER));
auto ch_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), "clickhouse-request"));
bool first_attempt = ch_attempt == "1" && sdk_attempt == "1";
if (enable_s3_requests_logging)
LOG_TEST(log, "Make request to: {}", uri);
LOG_TEST(log, "Make request to: {}, aws sdk attempt: {}, clickhouse attempt: {}", uri, sdk_attempt, ch_attempt);
switch (request.GetMethod())
{
@ -348,17 +407,29 @@ void PocoHTTPClient::makeRequestInternalImpl(
/// This can lead to request signature difference on S3 side.
if constexpr (pooled)
session = makePooledHTTPSession(
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration);
target_uri,
getTimeouts(method, first_attempt, /*first_byte*/ true),
http_connection_pool_size,
wait_on_pool_size_limit,
proxy_configuration);
else
session = makeHTTPSession(target_uri, timeouts, proxy_configuration);
session = makeHTTPSession(
target_uri,
getTimeouts(method, first_attempt, /*first_byte*/ true),
proxy_configuration);
}
else
{
if constexpr (pooled)
session = makePooledHTTPSession(
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
target_uri,
getTimeouts(method, first_attempt, /*first_byte*/ true),
http_connection_pool_size,
wait_on_pool_size_limit);
else
session = makeHTTPSession(target_uri, timeouts);
session = makeHTTPSession(
target_uri,
getTimeouts(method, first_attempt, /*first_byte*/ true));
}
/// In case of error this address will be written to logs
@ -392,28 +463,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
path_and_query = "/";
poco_request.setURI(path_and_query);
switch (request.GetMethod())
{
case Aws::Http::HttpMethod::HTTP_GET:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_GET);
break;
case Aws::Http::HttpMethod::HTTP_POST:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_POST);
break;
case Aws::Http::HttpMethod::HTTP_DELETE:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_DELETE);
break;
case Aws::Http::HttpMethod::HTTP_PUT:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PUT);
break;
case Aws::Http::HttpMethod::HTTP_HEAD:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_HEAD);
break;
case Aws::Http::HttpMethod::HTTP_PATCH:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PATCH);
break;
}
poco_request.setMethod(method);
/// Headers coming from SDK are lower-cased.
for (const auto & [header_name, header_value] : request.GetHeaders())
@ -438,6 +488,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
request.GetContentBody()->clear();
request.GetContentBody()->seekg(0);
setTimeouts(*session, getTimeouts(method, first_attempt, /*first_byte*/ false));
auto size = Poco::StreamCopier::copyStream(*request.GetContentBody(), request_body_stream);
if (enable_s3_requests_logging)
LOG_TEST(log, "Written {} bytes to request body", size);
@ -447,6 +498,8 @@ void PocoHTTPClient::makeRequestInternalImpl(
LOG_TEST(log, "Receiving response...");
auto & response_body_stream = session->receiveResponse(poco_response);
setTimeouts(*session, getTimeouts(method, first_attempt, /*first_byte*/ false));
watch.stop();
addMetric(request, S3MetricType::Microseconds, watch.elapsedMicroseconds());
@ -498,6 +551,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
/// Request is successful but for some special requests we can have actual error message in body
if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX && checkRequestCanReturn2xxAndErrorInBody(request))
{
/// reading the full response
std::string response_string((std::istreambuf_iterator<char>(response_body_stream)),
std::istreambuf_iterator<char>());
@ -512,7 +566,6 @@ void PocoHTTPClient::makeRequestInternalImpl(
addMetric(request, S3MetricType::Errors);
if (error_report)
error_report(proxy_configuration);
}
/// Set response from string
@ -531,6 +584,8 @@ void PocoHTTPClient::makeRequestInternalImpl(
if (status_code >= 500 && error_report)
error_report(proxy_configuration);
}
/// expose stream, after that client reads data from that stream without built-in retries
response->SetResponseBody(response_body_stream, session);
}

View File

@ -55,6 +55,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
size_t http_connection_pool_size = 0;
/// See PoolBase::BehaviourOnLimit
bool wait_on_pool_size_limit = true;
bool s3_use_adaptive_timeouts = true;
std::function<void(const DB::ProxyConfiguration &)> error_report;
@ -69,6 +70,7 @@ private:
unsigned int s3_retry_attempts,
bool enable_s3_requests_logging_,
bool for_disk_s3_,
bool s3_use_adaptive_timeouts_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_,
std::function<void(const DB::ProxyConfiguration &)> error_report_
@ -169,6 +171,8 @@ private:
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
ConnectionTimeouts getTimeouts(const String & method, bool first_attempt, bool first_byte) const;
protected:
static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request);
void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const;
@ -178,6 +182,7 @@ protected:
ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
bool s3_use_adaptive_timeouts = true;
bool enable_s3_requests_logging;
bool for_disk_s3;

View File

@ -53,7 +53,6 @@ namespace
public:
UploadHelper(
const std::shared_ptr<const S3::Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
@ -62,7 +61,6 @@ namespace
bool for_disk_s3_,
const Poco::Logger * log_)
: client_ptr(client_ptr_)
, client_with_long_timeout_ptr(client_with_long_timeout_ptr_)
, dest_bucket(dest_bucket_)
, dest_key(dest_key_)
, request_settings(request_settings_)
@ -78,7 +76,6 @@ namespace
protected:
std::shared_ptr<const S3::Client> client_ptr;
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
const String & dest_bucket;
const String & dest_key;
const S3Settings::RequestSettings & request_settings;
@ -179,7 +176,7 @@ namespace
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(request);
auto outcome = client_ptr->CompleteMultipartUpload(request);
if (outcome.IsSuccess())
{
@ -433,14 +430,13 @@ namespace
size_t offset_,
size_t size_,
const std::shared_ptr<const S3::Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
, create_read_buffer(create_read_buffer_)
, offset(offset_)
, size(size_)
@ -602,7 +598,6 @@ namespace
public:
CopyFileHelper(
const std::shared_ptr<const S3::Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
const String & src_bucket_,
const String & src_key_,
size_t src_offset_,
@ -614,7 +609,7 @@ namespace
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
@ -677,7 +672,7 @@ namespace
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request.SetContentType("binary/octet-stream");
client_with_long_timeout_ptr->setKMSHeaders(request);
client_ptr->setKMSHeaders(request);
}
void processCopyRequest(const S3::CopyObjectRequest & request)
@ -689,7 +684,7 @@ namespace
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3CopyObject);
auto outcome = client_with_long_timeout_ptr->CopyObject(request);
auto outcome = client_ptr->CopyObject(request);
if (outcome.IsSuccess())
{
LOG_TRACE(
@ -714,7 +709,6 @@ namespace
offset,
size,
client_ptr,
client_with_long_timeout_ptr,
dest_bucket,
dest_key,
request_settings,
@ -788,7 +782,7 @@ namespace
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy);
auto outcome = client_with_long_timeout_ptr->UploadPartCopy(req);
auto outcome = client_ptr->UploadPartCopy(req);
if (!outcome.IsSuccess())
{
abortMultipartUpload();
@ -806,7 +800,6 @@ void copyDataToS3File(
size_t offset,
size_t size,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const std::shared_ptr<const S3::Client> & dest_s3_client_with_long_timeout,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
@ -814,14 +807,13 @@ void copyDataToS3File(
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3)
{
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
helper.performCopy();
}
void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
const String & src_bucket,
const String & src_key,
size_t src_offset,
@ -836,7 +828,7 @@ void copyS3File(
{
if (settings.allow_native_copy)
{
CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
helper.performCopy();
}
else
@ -845,7 +837,7 @@ void copyS3File(
{
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, read_settings);
};
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
}
}

View File

@ -27,15 +27,9 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// because it is a known issue, it is fallbacks to read-write copy
/// (copyDataToS3File()).
///
/// s3_client_with_long_timeout (may be equal to s3_client) is used for native copy and
/// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often
/// block on them for multiple seconds without sending or receiving data from us (maybe the servers
/// are copying data internally, or maybe throttling, idk).
///
/// read_settings - is used for throttling in case of native copy is not possible
void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
const String & src_bucket,
const String & src_key,
size_t src_offset,
@ -58,7 +52,6 @@ void copyDataToS3File(
size_t offset,
size_t size,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const std::shared_ptr<const S3::Client> & dest_s3_client_with_long_timeout,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,

View File

@ -91,7 +91,6 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
DB::S3Settings::RequestSettings request_settings;
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
DB::WriteBufferFromS3 write_buffer(
client,
client,
uri.bucket,
uri.key,
@ -171,6 +170,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeadersRead)
"authorization: ... SignedHeaders="
"amz-sdk-invocation-id;"
"amz-sdk-request;"
"clickhouse-request;"
"content-type;"
"host;"
"x-amz-api-version;"
@ -216,6 +216,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSEKMSHeadersRead)
"authorization: ... SignedHeaders="
"amz-sdk-invocation-id;"
"amz-sdk-request;"
"clickhouse-request;"
"content-type;"
"host;"
"x-amz-api-version;"

View File

@ -77,7 +77,6 @@ struct WriteBufferFromS3::PartData
WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
@ -92,7 +91,6 @@ WriteBufferFromS3::WriteBufferFromS3(
, upload_settings(request_settings.getUploadSettings())
, write_settings(write_settings_)
, client_ptr(std::move(client_ptr_))
, client_with_long_timeout_ptr(std::move(client_with_long_timeout_ptr_))
, object_metadata(std::move(object_metadata_))
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
, task_tracker(
@ -566,7 +564,7 @@ void WriteBufferFromS3::completeMultipartUpload()
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
Stopwatch watch;
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(req);
auto outcome = client_ptr->CompleteMultipartUpload(req);
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());

View File

@ -30,8 +30,6 @@ class WriteBufferFromS3 final : public WriteBufferFromFileBase
public:
WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
/// for CompleteMultipartUploadRequest, because it blocks on recv() for a few seconds on big uploads
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
@ -90,7 +88,6 @@ private:
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const WriteSettings write_settings;
const std::shared_ptr<const S3::Client> client_ptr;
const std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
const std::optional<std::map<String, String>> object_metadata;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);

View File

@ -549,7 +549,6 @@ public:
getAsyncPolicy().setAutoExecute(false);
return std::make_unique<WriteBufferFromS3>(
client,
client,
bucket,
file_name,

View File

@ -825,7 +825,6 @@ public:
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(
configuration_.client,
configuration_.client_with_long_timeout,
bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
@ -1330,8 +1329,6 @@ void StorageS3::Configuration::connect(ContextPtr context)
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
});
client_with_long_timeout = client->clone(std::nullopt, request_settings.long_request_timeout_ms);
}
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)

View File

@ -311,7 +311,6 @@ public:
HTTPHeaderEntries headers_from_ast;
std::shared_ptr<const S3::Client> client;
std::shared_ptr<const S3::Client> client_with_long_timeout;
std::vector<String> keys;
};

View File

@ -69,8 +69,7 @@ struct S3Settings
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
size_t retry_attempts = 10;
size_t request_timeout_ms = 3000;
size_t long_request_timeout_ms = 30000; // TODO: Take this from config like request_timeout_ms
size_t request_timeout_ms = 30000;
bool allow_native_copy = true;
bool throw_on_zero_files_match = false;

View File

@ -4,6 +4,7 @@
<profiles>
<default>
<s3_retry_attempts>1000000</s3_retry_attempts>
<s3_use_adaptive_timeouts>1</s3_use_adaptive_timeouts>
</default>
</profiles>
</clickhouse>

View File

@ -4,6 +4,7 @@
<profiles>
<default>
<s3_retry_attempts>5</s3_retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
</default>
</profiles>
</clickhouse>

View File

@ -7,11 +7,18 @@
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<broken_s3>
<type>s3</type>
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<skip_access_check>1</skip_access_check>
</broken_s3>
</disks>
@ -23,9 +30,16 @@
</main>
</volumes>
</broken_s3>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>broken_s3</storage_policy>
<storage_policy>s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -64,6 +64,8 @@ def test_upload_after_check_works(cluster, broken_s3):
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS
storage_policy='broken_s3'
"""
)
@ -78,7 +80,7 @@ def test_upload_after_check_works(cluster, broken_s3):
assert "suddenly disappeared" in error, error
def get_counters(node, query_id, log_type="ExceptionWhileProcessing"):
def get_multipart_counters(node, query_id, log_type="ExceptionWhileProcessing"):
node.query("SYSTEM FLUSH LOGS")
return [
int(x)
@ -87,7 +89,25 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"):
SELECT
ProfileEvents['S3CreateMultipartUpload'],
ProfileEvents['S3UploadPart'],
ProfileEvents['S3WriteRequestsErrors']
ProfileEvents['S3WriteRequestsErrors'],
FROM system.query_log
WHERE query_id='{query_id}'
AND type='{log_type}'
"""
).split()
if x
]
def get_put_counters(node, query_id, log_type="ExceptionWhileProcessing"):
node.query("SYSTEM FLUSH LOGS")
return [
int(x)
for x in node.query(
f"""
SELECT
ProfileEvents['S3PutObject'],
ProfileEvents['S3WriteRequestsErrors'],
FROM system.query_log
WHERE query_id='{query_id}'
AND type='{log_type}'
@ -129,12 +149,12 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
assert "Code: 499" in error, error
assert "mock s3 injected error" in error, error
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 0
assert count_s3_errors == 1
assert create_multipart == 1
assert upload_parts == 0
assert s3_errors == 1
# Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed
@ -172,12 +192,12 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
assert "Code: 499" in error, error
assert "mock s3 injected error" in error, error
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts >= 2
assert count_s3_errors >= 2
assert create_multipart == 1
assert upload_parts >= 2
assert s3_errors >= 2
def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
@ -207,12 +227,12 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 39
assert count_s3_errors == 3
assert create_multipart == 1
assert upload_parts == 39
assert s3_errors == 3
broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused")
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1"
@ -279,13 +299,13 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 39
assert count_s3_errors == 3
assert create_multipart == 1
assert upload_parts == 39
assert s3_errors == 3
broken_s3.setup_at_part_upload(
count=1000,
@ -361,13 +381,13 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 39
assert count_s3_errors == 3
assert create_multipart == 1
assert upload_parts == 39
assert s3_errors == 3
broken_s3.setup_at_create_multi_part_upload(
count=1000,
@ -438,13 +458,13 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
query_id=insert_query_id,
)
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert count_create_multi_part_uploads == 1
assert count_upload_parts == 7
assert count_s3_errors == 3
assert create_multipart == 1
assert upload_parts == 7
assert s3_errors == 3
broken_s3.setup_at_part_upload(
count=1000,
@ -533,3 +553,60 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
retry_count=120,
sleep_time=1,
)
@pytest.mark.parametrize("node_name", ["node", "node_with_inf_s3_retries"])
def test_adaptive_timeouts(cluster, broken_s3, node_name):
node = cluster.instances[node_name]
broken_s3.setup_fake_puts(part_length=1)
broken_s3.setup_slow_answers(
timeout=5,
count=1000000,
)
insert_query_id = f"TEST_ADAPTIVE_TIMEOUTS_{node_name}"
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/adaptive_timeouts',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1
SETTINGS
s3_request_timeout_ms=30000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
broken_s3.reset()
put_objects, s3_errors = get_put_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert put_objects == 1
s3_use_adaptive_timeouts = node.query(
f"""
SELECT
value
FROM system.settings
WHERE
name='s3_use_adaptive_timeouts'
"""
).strip()
if node_name == "node_with_inf_s3_retries":
# first 2 attempts failed
assert s3_use_adaptive_timeouts == "1"
assert s3_errors == 1
else:
assert s3_use_adaptive_timeouts == "0"
assert s3_errors == 0

View File

@ -11,6 +11,7 @@
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3>
<s3_retryable>
@ -33,6 +34,7 @@
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>1</retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>

View File

@ -1,9 +1,4 @@
<clickhouse>
<profiles>
<default>
<s3_retry_attempts>5</s3_retry_attempts>
</default>
</profiles>
<s3>
<s3_mock>
<endpoint>http://resolver:8080</endpoint>

View File

@ -1,7 +1,9 @@
<clickhouse>
<profiles>
<default>
<s3_retry_attempts>5</s3_retry_attempts>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
<s3_retry_attempts>10</s3_retry_attempts>
<s3_max_inflight_parts_for_one_file>5</s3_max_inflight_parts_for_one_file>
</default>
</profiles>
</clickhouse>

View File

@ -4,6 +4,7 @@ import re
import socket
import struct
import sys
import time
def gen_n_digit_number(n):
@ -39,14 +40,14 @@ random.seed("Unstable server/1.0")
# Generating some "random" data and append a line which contains sum of numbers in column 4.
lines = (
b"".join((gen_line() for _ in range(500000)))
b"".join([gen_line() for _ in range(500000)])
+ f"0,0,0,{-sum_in_4_column}\n".encode()
)
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_HEAD(self):
if self.path == "/root/test.csv":
if self.path == "/root/test.csv" or self.path == "/root/slow_send_test.csv":
self.from_bytes = 0
self.end_bytes = len(lines)
self.size = self.end_bytes
@ -101,6 +102,18 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
print("Dropping connection")
break
if self.path == "/root/slow_send_test.csv":
self.send_block_size = 81920
for c, i in enumerate(
range(self.from_bytes, self.end_bytes, self.send_block_size)
):
self.wfile.write(
lines[i : min(i + self.send_block_size, self.end_bytes)]
)
self.wfile.flush()
time.sleep(1)
elif self.path == "/":
self.wfile.write(b"OK")

View File

@ -818,6 +818,15 @@ def test_storage_s3_get_unstable(started_cluster):
assert result.splitlines() == ["500001,500000,0"]
def test_storage_s3_get_slow(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/slow_send_test.csv', 'CSV', '{table_format}') FORMAT CSV"
result = run_query(instance, get_query)
assert result.splitlines() == ["500001,500000,0"]
def test_storage_s3_put_uncompressed(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]