mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #56314 from CheSema/s3-aggressive-timeouts
s3 adaptive timeouts
This commit is contained in:
commit
a950595c24
@ -26,7 +26,6 @@ HTTPServerSession::HTTPServerSession(const StreamSocket& socket, HTTPServerParam
|
||||
_maxKeepAliveRequests(pParams->getMaxKeepAliveRequests())
|
||||
{
|
||||
setTimeout(pParams->getTimeout());
|
||||
this->socket().setReceiveTimeout(pParams->getTimeout());
|
||||
}
|
||||
|
||||
|
||||
|
@ -93,9 +93,34 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout)
|
||||
|
||||
void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout)
|
||||
{
|
||||
_connectionTimeout = connectionTimeout;
|
||||
_sendTimeout = sendTimeout;
|
||||
_receiveTimeout = receiveTimeout;
|
||||
try
|
||||
{
|
||||
_connectionTimeout = connectionTimeout;
|
||||
|
||||
if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) {
|
||||
_sendTimeout = sendTimeout;
|
||||
|
||||
if (connected())
|
||||
_socket.setSendTimeout(_sendTimeout);
|
||||
}
|
||||
|
||||
if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) {
|
||||
_receiveTimeout = receiveTimeout;
|
||||
|
||||
if (connected())
|
||||
_socket.setReceiveTimeout(_receiveTimeout);
|
||||
}
|
||||
}
|
||||
catch (NetException &)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
throw;
|
||||
#else
|
||||
// mute exceptions in release
|
||||
// just in case when changing settings on socket is not allowed
|
||||
// however it should be OK for timeouts
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -4826,3 +4826,10 @@ When set to `true` the metadata files are written with `VERSION_FULL_OBJECT_KEY`
|
||||
When set to `false` the metadata files are written with the previous format version, `VERSION_INLINE_DATA`. With that format only suffixes of object storage key names are are written to the metadata files. The prefix for all of object storage key names is set in configurations files at `storage_configuration.disks` section.
|
||||
|
||||
Default value: `false`.
|
||||
|
||||
## s3_use_adaptive_timeouts {#s3_use_adaptive_timeouts}
|
||||
|
||||
When set to `true` than for all s3 requests first two attempts are made with low send and receive timeouts.
|
||||
When set to `false` than all attempts are made with identical timeouts.
|
||||
|
||||
Default value: `true`.
|
||||
|
@ -55,7 +55,9 @@ namespace
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
|
||||
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false, request_settings.get_request_throttler, request_settings.put_request_throttler,
|
||||
/* for_disk_s3 = */ false,
|
||||
request_settings.get_request_throttler,
|
||||
request_settings.put_request_throttler,
|
||||
s3_uri.uri.getScheme());
|
||||
|
||||
client_configuration.endpointOverride = s3_uri.endpoint;
|
||||
@ -167,7 +169,6 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
|
||||
blob_path.size(), mode);
|
||||
|
||||
copyS3File(
|
||||
client,
|
||||
client,
|
||||
s3_uri.bucket,
|
||||
fs::path(s3_uri.key) / path_in_backup,
|
||||
@ -229,7 +230,6 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
|
||||
{
|
||||
LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName());
|
||||
copyS3File(
|
||||
client,
|
||||
client,
|
||||
/* src_bucket */ blob_path[1],
|
||||
/* src_key= */ blob_path[0],
|
||||
@ -269,7 +269,7 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
|
||||
|
||||
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
|
||||
{
|
||||
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
|
||||
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
|
||||
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
|
||||
}
|
||||
|
||||
@ -299,7 +299,6 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
|
||||
{
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
client,
|
||||
client, // already has long timeout
|
||||
s3_uri.bucket,
|
||||
fs::path(s3_uri.key) / file_name,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
|
@ -148,7 +148,6 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
|
||||
const auto create_writer = [&](const auto & key)
|
||||
{
|
||||
return WriteBufferFromS3(
|
||||
s3_client->client,
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
key,
|
||||
|
@ -94,6 +94,7 @@ class IColumn;
|
||||
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
|
||||
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
|
||||
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
|
||||
M(Bool, s3_use_adaptive_timeouts, true, "When adaptive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \
|
||||
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
|
||||
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
|
||||
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
|
||||
@ -104,7 +105,7 @@ class IColumn;
|
||||
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
|
||||
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
|
||||
M(UInt64, s3_retry_attempts, 100, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
|
||||
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
|
||||
M(UInt64, s3_request_timeout_ms, 30000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
|
||||
M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \
|
||||
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
|
||||
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
|
||||
|
@ -155,7 +155,7 @@ private:
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
@ -174,7 +174,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
clients.get()->client,
|
||||
client.get(),
|
||||
bucket,
|
||||
path,
|
||||
version_id,
|
||||
@ -224,7 +224,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
clients.get()->client,
|
||||
client.get(),
|
||||
bucket,
|
||||
object.remote_path,
|
||||
version_id,
|
||||
@ -249,10 +249,8 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
if (write_settings.s3_allow_parallel_part_upload)
|
||||
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
|
||||
|
||||
auto clients_ = clients.get();
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
clients_->client,
|
||||
clients_->client_with_long_timeout,
|
||||
client.get(),
|
||||
bucket,
|
||||
object.remote_path,
|
||||
buf_size,
|
||||
@ -266,15 +264,12 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
|
||||
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
|
||||
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = clients.get()->client;
|
||||
|
||||
S3::ListObjectsV2Request request;
|
||||
request.SetBucket(bucket);
|
||||
@ -289,7 +284,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3ListObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
|
||||
outcome = client_ptr->ListObjectsV2(request);
|
||||
outcome = client.get()->ListObjectsV2(request);
|
||||
throwIfError(outcome);
|
||||
|
||||
auto result = outcome.GetResult();
|
||||
@ -320,14 +315,12 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
|
||||
|
||||
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
|
||||
{
|
||||
auto client_ptr = clients.get()->client;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(object.remote_path);
|
||||
auto outcome = client_ptr->DeleteObject(request);
|
||||
auto outcome = client.get()->DeleteObject(request);
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
|
||||
@ -346,7 +339,6 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
}
|
||||
else
|
||||
{
|
||||
auto client_ptr = clients.get()->client;
|
||||
auto settings_ptr = s3_settings.get();
|
||||
|
||||
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
|
||||
@ -375,7 +367,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = client_ptr->DeleteObjects(request);
|
||||
auto outcome = client.get()->DeleteObjects(request);
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
|
||||
@ -407,7 +399,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
|
||||
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
|
||||
return {};
|
||||
@ -423,7 +415,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
|
||||
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
|
||||
ObjectMetadata result;
|
||||
result.size_bytes = object_info.size;
|
||||
@ -444,12 +436,12 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
/// Shortcut for S3
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
auto clients_ = clients.get();
|
||||
auto client_ = client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(clients_->client,
|
||||
clients_->client_with_long_timeout,
|
||||
copyS3File(
|
||||
client.get(),
|
||||
bucket,
|
||||
object_from.remote_path,
|
||||
0,
|
||||
@ -473,12 +465,11 @@ void S3ObjectStorage::copyObject( // NOLINT
|
||||
const WriteSettings &,
|
||||
std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto clients_ = clients.get();
|
||||
auto client_ = client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(clients_->client,
|
||||
clients_->client_with_long_timeout,
|
||||
copyS3File(client_,
|
||||
bucket,
|
||||
object_from.remote_path,
|
||||
0,
|
||||
@ -499,31 +490,25 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
|
||||
|
||||
void S3ObjectStorage::shutdown()
|
||||
{
|
||||
auto clients_ptr = clients.get();
|
||||
/// This call stops any next retry attempts for ongoing S3 requests.
|
||||
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
|
||||
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
|
||||
/// This should significantly speed up shutdown process if S3 is unhealthy.
|
||||
const_cast<S3::Client &>(*clients_ptr->client).DisableRequestProcessing();
|
||||
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).DisableRequestProcessing();
|
||||
const_cast<S3::Client &>(*client.get()).DisableRequestProcessing();
|
||||
}
|
||||
|
||||
void S3ObjectStorage::startup()
|
||||
{
|
||||
auto clients_ptr = clients.get();
|
||||
|
||||
/// Need to be enabled if it was disabled during shutdown() call.
|
||||
const_cast<S3::Client &>(*clients_ptr->client).EnableRequestProcessing();
|
||||
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).EnableRequestProcessing();
|
||||
const_cast<S3::Client &>(*client.get()).EnableRequestProcessing();
|
||||
}
|
||||
|
||||
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
|
||||
{
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context);
|
||||
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
|
||||
auto new_clients = std::make_unique<Clients>(std::move(new_client), *new_s3_settings);
|
||||
s3_settings.set(std::move(new_s3_settings));
|
||||
clients.set(std::move(new_clients));
|
||||
client.set(std::move(new_client));
|
||||
}
|
||||
|
||||
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
@ -538,9 +523,6 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
endpoint, object_key_prefix);
|
||||
}
|
||||
|
||||
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
|
||||
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
|
||||
|
||||
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const
|
||||
{
|
||||
/// Path to store the new S3 object.
|
||||
|
@ -39,16 +39,6 @@ struct S3ObjectStorageSettings
|
||||
|
||||
class S3ObjectStorage : public IObjectStorage
|
||||
{
|
||||
public:
|
||||
struct Clients
|
||||
{
|
||||
std::shared_ptr<S3::Client> client;
|
||||
std::shared_ptr<S3::Client> client_with_long_timeout;
|
||||
|
||||
Clients() = default;
|
||||
Clients(std::shared_ptr<S3::Client> client, const S3ObjectStorageSettings & settings);
|
||||
};
|
||||
|
||||
private:
|
||||
friend class S3PlainObjectStorage;
|
||||
|
||||
@ -63,7 +53,7 @@ private:
|
||||
String object_key_prefix_)
|
||||
: bucket(std::move(bucket_))
|
||||
, object_key_prefix(std::move(object_key_prefix_))
|
||||
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
|
||||
, client(std::move(client_))
|
||||
, s3_settings(std::move(s3_settings_))
|
||||
, s3_capabilities(s3_capabilities_)
|
||||
, version_id(std::move(version_id_))
|
||||
@ -184,7 +174,8 @@ private:
|
||||
std::string bucket;
|
||||
String object_key_prefix;
|
||||
|
||||
MultiVersion<Clients> clients;
|
||||
|
||||
MultiVersion<S3::Client> client;
|
||||
MultiVersion<S3ObjectStorageSettings> s3_settings;
|
||||
S3Capabilities s3_capabilities;
|
||||
|
||||
|
@ -60,13 +60,15 @@ std::unique_ptr<S3::Client> getClient(
|
||||
uri.uri.getScheme());
|
||||
|
||||
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000);
|
||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
|
||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000);
|
||||
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
client_configuration.http_keep_alive_timeout_ms
|
||||
= config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
|
||||
client_configuration.http_keep_alive_timeout_ms = config.getUInt(
|
||||
config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
|
||||
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
|
||||
client_configuration.wait_on_pool_size_limit = false;
|
||||
client_configuration.s3_use_adaptive_timeouts = config.getBool(
|
||||
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
|
||||
|
||||
/*
|
||||
* Override proxy configuration for backwards compatibility with old configuration format.
|
||||
|
@ -133,4 +133,86 @@ ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings
|
||||
settings.http_receive_timeout);
|
||||
}
|
||||
|
||||
class SendReceiveTimeoutsForFirstAttempt
|
||||
{
|
||||
private:
|
||||
static constexpr size_t known_methods_count = 6;
|
||||
using KnownMethodsArray = std::array<String, known_methods_count>;
|
||||
static const KnownMethodsArray known_methods;
|
||||
|
||||
/// HTTP_POST is used for CompleteMultipartUpload requests. Its latency could be high.
|
||||
/// These requests need longer timeout, especially when minio is used.
|
||||
/// The same assumption are made for HTTP_DELETE, HTTP_PATCH
|
||||
/// That requests are more heavy that HTTP_GET, HTTP_HEAD, HTTP_PUT
|
||||
|
||||
static constexpr Poco::Timestamp::TimeDiff first_byte_ms[known_methods_count][2] =
|
||||
{
|
||||
/* GET */ {200, 200},
|
||||
/* POST */ {200, 200},
|
||||
/* DELETE */ {200, 200},
|
||||
/* PUT */ {200, 200},
|
||||
/* HEAD */ {200, 200},
|
||||
/* PATCH */ {200, 200},
|
||||
};
|
||||
|
||||
static constexpr Poco::Timestamp::TimeDiff rest_bytes_ms[known_methods_count][2] =
|
||||
{
|
||||
/* GET */ {500, 500},
|
||||
/* POST */ {1000, 30000},
|
||||
/* DELETE */ {1000, 10000},
|
||||
/* PUT */ {1000, 3000},
|
||||
/* HEAD */ {500, 500},
|
||||
/* PATCH */ {1000, 10000},
|
||||
};
|
||||
|
||||
static_assert(sizeof(first_byte_ms) == sizeof(rest_bytes_ms));
|
||||
static_assert(sizeof(first_byte_ms) == known_methods_count * sizeof(Poco::Timestamp::TimeDiff) * 2);
|
||||
|
||||
static size_t getMethodIndex(const String & method)
|
||||
{
|
||||
KnownMethodsArray::const_iterator it = std::find(known_methods.begin(), known_methods.end(), method);
|
||||
chassert(it != known_methods.end());
|
||||
if (it == known_methods.end())
|
||||
return 0;
|
||||
return std::distance(known_methods.begin(), it);
|
||||
}
|
||||
|
||||
public:
|
||||
static std::pair<Poco::Timespan, Poco::Timespan> getSendReceiveTimeout(const String & method, bool first_byte)
|
||||
{
|
||||
auto idx = getMethodIndex(method);
|
||||
|
||||
if (first_byte)
|
||||
return std::make_pair(
|
||||
Poco::Timespan(first_byte_ms[idx][0] * 1000),
|
||||
Poco::Timespan(first_byte_ms[idx][1] * 1000)
|
||||
);
|
||||
|
||||
return std::make_pair(
|
||||
Poco::Timespan(rest_bytes_ms[idx][0] * 1000),
|
||||
Poco::Timespan(rest_bytes_ms[idx][1] * 1000)
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const SendReceiveTimeoutsForFirstAttempt::KnownMethodsArray SendReceiveTimeoutsForFirstAttempt::known_methods =
|
||||
{
|
||||
"GET", "POST", "DELETE", "PUT", "HEAD", "PATCH"
|
||||
};
|
||||
|
||||
|
||||
ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(const String & method, bool first_attempt, bool first_byte) const
|
||||
{
|
||||
if (!first_attempt)
|
||||
return *this;
|
||||
|
||||
auto [send, recv] = SendReceiveTimeoutsForFirstAttempt::getSendReceiveTimeout(method, first_byte);
|
||||
|
||||
auto aggressive = *this;
|
||||
aggressive.send_timeout = saturate(send, send_timeout);
|
||||
aggressive.receive_timeout = saturate(recv, receive_timeout);
|
||||
|
||||
return aggressive;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -67,6 +67,8 @@ struct ConnectionTimeouts
|
||||
/// Timeouts for the case when we will try many addresses in a loop.
|
||||
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings);
|
||||
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout);
|
||||
|
||||
ConnectionTimeouts getAdaptiveTimeouts(const String & method, bool first_attempt, bool first_byte) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -50,12 +50,6 @@ namespace ErrorCodes
|
||||
|
||||
namespace
|
||||
{
|
||||
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
|
||||
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
||||
}
|
||||
|
||||
Poco::Net::HTTPClientSession::ProxyConfig proxyConfigurationToPocoProxyConfig(const ProxyConfiguration & proxy_configuration)
|
||||
{
|
||||
Poco::Net::HTTPClientSession::ProxyConfig poco_proxy_config;
|
||||
@ -359,6 +353,12 @@ namespace
|
||||
};
|
||||
}
|
||||
|
||||
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
|
||||
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
||||
}
|
||||
|
||||
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout)
|
||||
{
|
||||
if (!response.getKeepAlive())
|
||||
|
@ -113,4 +113,6 @@ std::istream * receiveResponse(
|
||||
|
||||
void assertResponseIsOk(
|
||||
const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, bool allow_redirects = false);
|
||||
|
||||
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts);
|
||||
}
|
||||
|
@ -167,9 +167,9 @@ bool ReadBufferFromS3::nextImpl()
|
||||
}
|
||||
|
||||
size_t sleep_time_with_backoff_milliseconds = 100;
|
||||
for (size_t attempt = 0; !next_result; ++attempt)
|
||||
for (size_t attempt = 1; !next_result; ++attempt)
|
||||
{
|
||||
bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries;
|
||||
bool last_attempt = attempt >= request_settings.max_single_read_retries;
|
||||
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
|
||||
|
||||
@ -177,7 +177,7 @@ bool ReadBufferFromS3::nextImpl()
|
||||
{
|
||||
if (!impl)
|
||||
{
|
||||
impl = initialize();
|
||||
impl = initialize(attempt);
|
||||
|
||||
if (use_external_buffer)
|
||||
{
|
||||
@ -232,9 +232,9 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
|
||||
{
|
||||
size_t initial_n = n;
|
||||
size_t sleep_time_with_backoff_milliseconds = 100;
|
||||
for (size_t attempt = 0; n > 0; ++attempt)
|
||||
for (size_t attempt = 1; n > 0; ++attempt)
|
||||
{
|
||||
bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries;
|
||||
bool last_attempt = attempt >= request_settings.max_single_read_retries;
|
||||
size_t bytes_copied = 0;
|
||||
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
|
||||
@ -266,7 +266,7 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
|
||||
|
||||
try
|
||||
{
|
||||
result = sendRequest(range_begin, range_begin + n - 1);
|
||||
result = sendRequest(attempt, range_begin, range_begin + n - 1);
|
||||
std::istream & istr = result->GetBody();
|
||||
|
||||
copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied);
|
||||
@ -304,8 +304,8 @@ bool ReadBufferFromS3::processException(Poco::Exception & e, size_t read_offset,
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, "
|
||||
"Attempt: {}, Message: {}",
|
||||
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, e.message());
|
||||
"Attempt: {}/{}, Message: {}",
|
||||
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, request_settings.max_single_read_retries, e.message());
|
||||
|
||||
|
||||
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
|
||||
@ -463,7 +463,7 @@ ReadBufferFromS3::~ReadBufferFromS3()
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize(size_t attempt)
|
||||
{
|
||||
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
|
||||
read_all_range_successfully = false;
|
||||
@ -475,13 +475,13 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
if (read_until_position && offset >= read_until_position)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
||||
|
||||
read_result = sendRequest(offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
|
||||
read_result = sendRequest(attempt, offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
|
||||
|
||||
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
|
||||
return std::make_unique<ReadBufferFromIStream>(read_result->GetBody(), buffer_size);
|
||||
}
|
||||
|
||||
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const
|
||||
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, size_t range_begin, std::optional<size_t> range_end_incl) const
|
||||
{
|
||||
S3::GetObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
@ -489,6 +489,8 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
req.SetAdditionalCustomHeaderValue("clickhouse-request", fmt::format("attempt={}", attempt));
|
||||
|
||||
if (range_end_incl)
|
||||
{
|
||||
req.SetRange(fmt::format("bytes={}-{}", range_begin, *range_end_incl));
|
||||
|
@ -79,7 +79,7 @@ public:
|
||||
bool supportsReadAt() override { return true; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBuffer> initialize();
|
||||
std::unique_ptr<ReadBuffer> initialize(size_t attempt);
|
||||
|
||||
/// If true, if we destroy impl now, no work was wasted. Just for metrics.
|
||||
bool atEndOfRequestedRangeGuess();
|
||||
@ -88,7 +88,7 @@ private:
|
||||
/// Returns true if the error looks retriable.
|
||||
bool processException(Poco::Exception & e, size_t read_offset, size_t attempt) const;
|
||||
|
||||
Aws::S3::Model::GetObjectResult sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const;
|
||||
Aws::S3::Model::GetObjectResult sendRequest(size_t attempt, size_t range_begin, std::optional<size_t> range_end_incl) const;
|
||||
|
||||
bool readAllRangeSuccessfully() const;
|
||||
|
||||
|
@ -118,16 +118,9 @@ std::unique_ptr<Client> Client::create(
|
||||
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
|
||||
}
|
||||
|
||||
std::unique_ptr<Client> Client::clone(
|
||||
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy,
|
||||
std::optional<Int64> override_request_timeout_ms) const
|
||||
std::unique_ptr<Client> Client::clone() const
|
||||
{
|
||||
PocoHTTPClientConfiguration new_configuration = client_configuration;
|
||||
if (override_retry_strategy.has_value())
|
||||
new_configuration.retryStrategy = *override_retry_strategy;
|
||||
if (override_request_timeout_ms.has_value())
|
||||
new_configuration.requestTimeoutMs = *override_request_timeout_ms;
|
||||
return std::unique_ptr<Client>(new Client(*this, new_configuration));
|
||||
return std::unique_ptr<Client>(new Client(*this, client_configuration));
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -905,6 +898,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
|
||||
s3_retry_attempts,
|
||||
enable_s3_requests_logging,
|
||||
for_disk_s3,
|
||||
context->getGlobalContext()->getSettingsRef().s3_use_adaptive_timeouts,
|
||||
get_request_throttler,
|
||||
put_request_throttler,
|
||||
error_report);
|
||||
|
@ -118,15 +118,7 @@ public:
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
bool use_virtual_addressing);
|
||||
|
||||
/// Create a client with adjusted settings:
|
||||
/// * override_retry_strategy can be used to disable retries to avoid nested retries when we have
|
||||
/// a retry loop outside of S3 client. Specifically, for read and write buffers. Currently not
|
||||
/// actually used.
|
||||
/// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest
|
||||
/// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321
|
||||
std::unique_ptr<Client> clone(
|
||||
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy = std::nullopt,
|
||||
std::optional<Int64> override_request_timeout_ms = std::nullopt) const;
|
||||
std::unique_ptr<Client> clone() const;
|
||||
|
||||
Client & operator=(const Client &) = delete;
|
||||
|
||||
|
@ -99,6 +99,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
||||
unsigned int s3_retry_attempts_,
|
||||
bool enable_s3_requests_logging_,
|
||||
bool for_disk_s3_,
|
||||
bool s3_use_adaptive_timeouts_,
|
||||
const ThrottlerPtr & get_request_throttler_,
|
||||
const ThrottlerPtr & put_request_throttler_,
|
||||
std::function<void(const DB::ProxyConfiguration &)> error_report_)
|
||||
@ -111,6 +112,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
||||
, for_disk_s3(for_disk_s3_)
|
||||
, get_request_throttler(get_request_throttler_)
|
||||
, put_request_throttler(put_request_throttler_)
|
||||
, s3_use_adaptive_timeouts(s3_use_adaptive_timeouts_)
|
||||
, error_report(error_report_)
|
||||
{
|
||||
}
|
||||
@ -157,6 +159,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
|
||||
Poco::Timespan(client_configuration.http_keep_alive_timeout_ms * 1000))) /// flag indicating whether keep-alive is enabled is set to each session upon creation
|
||||
, remote_host_filter(client_configuration.remote_host_filter)
|
||||
, s3_max_redirects(client_configuration.s3_max_redirects)
|
||||
, s3_use_adaptive_timeouts(client_configuration.s3_use_adaptive_timeouts)
|
||||
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
|
||||
, for_disk_s3(client_configuration.for_disk_s3)
|
||||
, get_request_throttler(client_configuration.get_request_throttler)
|
||||
@ -268,6 +271,38 @@ void PocoHTTPClient::addMetric(const Aws::Http::HttpRequest & request, S3MetricT
|
||||
ProfileEvents::increment(disk_s3_events_map[static_cast<unsigned int>(type)][static_cast<unsigned int>(kind)], amount);
|
||||
}
|
||||
|
||||
String extractAttemptFromInfo(const Aws::String & request_info)
|
||||
{
|
||||
static auto key = Aws::String("attempt=");
|
||||
|
||||
auto key_begin = request_info.find(key, 0);
|
||||
if (key_begin == Aws::String::npos)
|
||||
return "1";
|
||||
|
||||
auto val_begin = key_begin + key.size();
|
||||
auto val_end = request_info.find(';', val_begin);
|
||||
if (val_end == Aws::String::npos)
|
||||
val_end = request_info.size();
|
||||
|
||||
return request_info.substr(val_begin, val_end-val_begin);
|
||||
}
|
||||
|
||||
String getOrEmpty(const Aws::Http::HeaderValueCollection & map, const String & key)
|
||||
{
|
||||
auto it = map.find(key);
|
||||
if (it == map.end())
|
||||
return {};
|
||||
return it->second;
|
||||
}
|
||||
|
||||
ConnectionTimeouts PocoHTTPClient::getTimeouts(const String & method, bool first_attempt, bool first_byte) const
|
||||
{
|
||||
if (!s3_use_adaptive_timeouts)
|
||||
return timeouts;
|
||||
|
||||
return timeouts.getAdaptiveTimeouts(method, first_attempt, first_byte);
|
||||
}
|
||||
|
||||
void PocoHTTPClient::makeRequestInternal(
|
||||
Aws::Http::HttpRequest & request,
|
||||
std::shared_ptr<PocoHTTPResponse> & response,
|
||||
@ -282,6 +317,25 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
makeRequestInternalImpl<false>(request, request_configuration, response, readLimiter, writeLimiter);
|
||||
}
|
||||
|
||||
String getMethod(const Aws::Http::HttpRequest & request)
|
||||
{
|
||||
switch (request.GetMethod())
|
||||
{
|
||||
case Aws::Http::HttpMethod::HTTP_GET:
|
||||
return Poco::Net::HTTPRequest::HTTP_GET;
|
||||
case Aws::Http::HttpMethod::HTTP_POST:
|
||||
return Poco::Net::HTTPRequest::HTTP_POST;
|
||||
case Aws::Http::HttpMethod::HTTP_DELETE:
|
||||
return Poco::Net::HTTPRequest::HTTP_DELETE;
|
||||
case Aws::Http::HttpMethod::HTTP_PUT:
|
||||
return Poco::Net::HTTPRequest::HTTP_PUT;
|
||||
case Aws::Http::HttpMethod::HTTP_HEAD:
|
||||
return Poco::Net::HTTPRequest::HTTP_HEAD;
|
||||
case Aws::Http::HttpMethod::HTTP_PATCH:
|
||||
return Poco::Net::HTTPRequest::HTTP_PATCH;
|
||||
}
|
||||
}
|
||||
|
||||
template <bool pooled>
|
||||
void PocoHTTPClient::makeRequestInternalImpl(
|
||||
Aws::Http::HttpRequest & request,
|
||||
@ -295,9 +349,14 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
Poco::Logger * log = &Poco::Logger::get("AWSClient");
|
||||
|
||||
auto uri = request.GetUri().GetURIString();
|
||||
auto method = getMethod(request);
|
||||
|
||||
auto sdk_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), Aws::Http::SDK_REQUEST_HEADER));
|
||||
auto ch_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), "clickhouse-request"));
|
||||
bool first_attempt = ch_attempt == "1" && sdk_attempt == "1";
|
||||
|
||||
if (enable_s3_requests_logging)
|
||||
LOG_TEST(log, "Make request to: {}", uri);
|
||||
LOG_TEST(log, "Make request to: {}, aws sdk attempt: {}, clickhouse attempt: {}", uri, sdk_attempt, ch_attempt);
|
||||
|
||||
switch (request.GetMethod())
|
||||
{
|
||||
@ -348,17 +407,29 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
/// This can lead to request signature difference on S3 side.
|
||||
if constexpr (pooled)
|
||||
session = makePooledHTTPSession(
|
||||
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration);
|
||||
target_uri,
|
||||
getTimeouts(method, first_attempt, /*first_byte*/ true),
|
||||
http_connection_pool_size,
|
||||
wait_on_pool_size_limit,
|
||||
proxy_configuration);
|
||||
else
|
||||
session = makeHTTPSession(target_uri, timeouts, proxy_configuration);
|
||||
session = makeHTTPSession(
|
||||
target_uri,
|
||||
getTimeouts(method, first_attempt, /*first_byte*/ true),
|
||||
proxy_configuration);
|
||||
}
|
||||
else
|
||||
{
|
||||
if constexpr (pooled)
|
||||
session = makePooledHTTPSession(
|
||||
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
|
||||
target_uri,
|
||||
getTimeouts(method, first_attempt, /*first_byte*/ true),
|
||||
http_connection_pool_size,
|
||||
wait_on_pool_size_limit);
|
||||
else
|
||||
session = makeHTTPSession(target_uri, timeouts);
|
||||
session = makeHTTPSession(
|
||||
target_uri,
|
||||
getTimeouts(method, first_attempt, /*first_byte*/ true));
|
||||
}
|
||||
|
||||
/// In case of error this address will be written to logs
|
||||
@ -392,28 +463,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
path_and_query = "/";
|
||||
|
||||
poco_request.setURI(path_and_query);
|
||||
|
||||
switch (request.GetMethod())
|
||||
{
|
||||
case Aws::Http::HttpMethod::HTTP_GET:
|
||||
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_GET);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_POST:
|
||||
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_POST);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_DELETE:
|
||||
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_DELETE);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_PUT:
|
||||
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PUT);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_HEAD:
|
||||
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_HEAD);
|
||||
break;
|
||||
case Aws::Http::HttpMethod::HTTP_PATCH:
|
||||
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PATCH);
|
||||
break;
|
||||
}
|
||||
poco_request.setMethod(method);
|
||||
|
||||
/// Headers coming from SDK are lower-cased.
|
||||
for (const auto & [header_name, header_value] : request.GetHeaders())
|
||||
@ -438,6 +488,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
request.GetContentBody()->clear();
|
||||
request.GetContentBody()->seekg(0);
|
||||
|
||||
setTimeouts(*session, getTimeouts(method, first_attempt, /*first_byte*/ false));
|
||||
auto size = Poco::StreamCopier::copyStream(*request.GetContentBody(), request_body_stream);
|
||||
if (enable_s3_requests_logging)
|
||||
LOG_TEST(log, "Written {} bytes to request body", size);
|
||||
@ -447,6 +498,8 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
LOG_TEST(log, "Receiving response...");
|
||||
auto & response_body_stream = session->receiveResponse(poco_response);
|
||||
|
||||
setTimeouts(*session, getTimeouts(method, first_attempt, /*first_byte*/ false));
|
||||
|
||||
watch.stop();
|
||||
addMetric(request, S3MetricType::Microseconds, watch.elapsedMicroseconds());
|
||||
|
||||
@ -498,6 +551,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
/// Request is successful but for some special requests we can have actual error message in body
|
||||
if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX && checkRequestCanReturn2xxAndErrorInBody(request))
|
||||
{
|
||||
/// reading the full response
|
||||
std::string response_string((std::istreambuf_iterator<char>(response_body_stream)),
|
||||
std::istreambuf_iterator<char>());
|
||||
|
||||
@ -512,7 +566,6 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
addMetric(request, S3MetricType::Errors);
|
||||
if (error_report)
|
||||
error_report(proxy_configuration);
|
||||
|
||||
}
|
||||
|
||||
/// Set response from string
|
||||
@ -531,6 +584,8 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
if (status_code >= 500 && error_report)
|
||||
error_report(proxy_configuration);
|
||||
}
|
||||
|
||||
/// expose stream, after that client reads data from that stream without built-in retries
|
||||
response->SetResponseBody(response_body_stream, session);
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||
size_t http_connection_pool_size = 0;
|
||||
/// See PoolBase::BehaviourOnLimit
|
||||
bool wait_on_pool_size_limit = true;
|
||||
bool s3_use_adaptive_timeouts = true;
|
||||
|
||||
std::function<void(const DB::ProxyConfiguration &)> error_report;
|
||||
|
||||
@ -69,6 +70,7 @@ private:
|
||||
unsigned int s3_retry_attempts,
|
||||
bool enable_s3_requests_logging_,
|
||||
bool for_disk_s3_,
|
||||
bool s3_use_adaptive_timeouts_,
|
||||
const ThrottlerPtr & get_request_throttler_,
|
||||
const ThrottlerPtr & put_request_throttler_,
|
||||
std::function<void(const DB::ProxyConfiguration &)> error_report_
|
||||
@ -169,6 +171,8 @@ private:
|
||||
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
|
||||
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
|
||||
|
||||
ConnectionTimeouts getTimeouts(const String & method, bool first_attempt, bool first_byte) const;
|
||||
|
||||
protected:
|
||||
static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request);
|
||||
void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const;
|
||||
@ -178,6 +182,7 @@ protected:
|
||||
ConnectionTimeouts timeouts;
|
||||
const RemoteHostFilter & remote_host_filter;
|
||||
unsigned int s3_max_redirects;
|
||||
bool s3_use_adaptive_timeouts = true;
|
||||
bool enable_s3_requests_logging;
|
||||
bool for_disk_s3;
|
||||
|
||||
|
@ -53,7 +53,6 @@ namespace
|
||||
public:
|
||||
UploadHelper(
|
||||
const std::shared_ptr<const S3::Client> & client_ptr_,
|
||||
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
|
||||
const String & dest_bucket_,
|
||||
const String & dest_key_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
@ -62,7 +61,6 @@ namespace
|
||||
bool for_disk_s3_,
|
||||
const Poco::Logger * log_)
|
||||
: client_ptr(client_ptr_)
|
||||
, client_with_long_timeout_ptr(client_with_long_timeout_ptr_)
|
||||
, dest_bucket(dest_bucket_)
|
||||
, dest_key(dest_key_)
|
||||
, request_settings(request_settings_)
|
||||
@ -78,7 +76,6 @@ namespace
|
||||
|
||||
protected:
|
||||
std::shared_ptr<const S3::Client> client_ptr;
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
|
||||
const String & dest_bucket;
|
||||
const String & dest_key;
|
||||
const S3Settings::RequestSettings & request_settings;
|
||||
@ -179,7 +176,7 @@ namespace
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
|
||||
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(request);
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
@ -433,14 +430,13 @@ namespace
|
||||
size_t offset_,
|
||||
size_t size_,
|
||||
const std::shared_ptr<const S3::Client> & client_ptr_,
|
||||
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
|
||||
const String & dest_bucket_,
|
||||
const String & dest_key_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const std::optional<std::map<String, String>> & object_metadata_,
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
bool for_disk_s3_)
|
||||
: UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
|
||||
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
|
||||
, create_read_buffer(create_read_buffer_)
|
||||
, offset(offset_)
|
||||
, size(size_)
|
||||
@ -602,7 +598,6 @@ namespace
|
||||
public:
|
||||
CopyFileHelper(
|
||||
const std::shared_ptr<const S3::Client> & client_ptr_,
|
||||
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
|
||||
const String & src_bucket_,
|
||||
const String & src_key_,
|
||||
size_t src_offset_,
|
||||
@ -614,7 +609,7 @@ namespace
|
||||
const std::optional<std::map<String, String>> & object_metadata_,
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
bool for_disk_s3_)
|
||||
: UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
|
||||
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
|
||||
, src_bucket(src_bucket_)
|
||||
, src_key(src_key_)
|
||||
, offset(src_offset_)
|
||||
@ -677,7 +672,7 @@ namespace
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
request.SetContentType("binary/octet-stream");
|
||||
|
||||
client_with_long_timeout_ptr->setKMSHeaders(request);
|
||||
client_ptr->setKMSHeaders(request);
|
||||
}
|
||||
|
||||
void processCopyRequest(const S3::CopyObjectRequest & request)
|
||||
@ -689,7 +684,7 @@ namespace
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CopyObject);
|
||||
|
||||
auto outcome = client_with_long_timeout_ptr->CopyObject(request);
|
||||
auto outcome = client_ptr->CopyObject(request);
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
LOG_TRACE(
|
||||
@ -714,7 +709,6 @@ namespace
|
||||
offset,
|
||||
size,
|
||||
client_ptr,
|
||||
client_with_long_timeout_ptr,
|
||||
dest_bucket,
|
||||
dest_key,
|
||||
request_settings,
|
||||
@ -788,7 +782,7 @@ namespace
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy);
|
||||
|
||||
auto outcome = client_with_long_timeout_ptr->UploadPartCopy(req);
|
||||
auto outcome = client_ptr->UploadPartCopy(req);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
abortMultipartUpload();
|
||||
@ -806,7 +800,6 @@ void copyDataToS3File(
|
||||
size_t offset,
|
||||
size_t size,
|
||||
const std::shared_ptr<const S3::Client> & dest_s3_client,
|
||||
const std::shared_ptr<const S3::Client> & dest_s3_client_with_long_timeout,
|
||||
const String & dest_bucket,
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
@ -814,14 +807,13 @@ void copyDataToS3File(
|
||||
ThreadPoolCallbackRunner<void> schedule,
|
||||
bool for_disk_s3)
|
||||
{
|
||||
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
|
||||
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
|
||||
helper.performCopy();
|
||||
}
|
||||
|
||||
|
||||
void copyS3File(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
size_t src_offset,
|
||||
@ -836,7 +828,7 @@ void copyS3File(
|
||||
{
|
||||
if (settings.allow_native_copy)
|
||||
{
|
||||
CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
|
||||
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
|
||||
helper.performCopy();
|
||||
}
|
||||
else
|
||||
@ -845,7 +837,7 @@ void copyS3File(
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, read_settings);
|
||||
};
|
||||
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
|
||||
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,15 +27,9 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
|
||||
/// because it is a known issue, it is fallbacks to read-write copy
|
||||
/// (copyDataToS3File()).
|
||||
///
|
||||
/// s3_client_with_long_timeout (may be equal to s3_client) is used for native copy and
|
||||
/// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often
|
||||
/// block on them for multiple seconds without sending or receiving data from us (maybe the servers
|
||||
/// are copying data internally, or maybe throttling, idk).
|
||||
///
|
||||
/// read_settings - is used for throttling in case of native copy is not possible
|
||||
void copyS3File(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
size_t src_offset,
|
||||
@ -58,7 +52,6 @@ void copyDataToS3File(
|
||||
size_t offset,
|
||||
size_t size,
|
||||
const std::shared_ptr<const S3::Client> & dest_s3_client,
|
||||
const std::shared_ptr<const S3::Client> & dest_s3_client_with_long_timeout,
|
||||
const String & dest_bucket,
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
|
@ -91,7 +91,6 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
|
||||
DB::S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
|
||||
DB::WriteBufferFromS3 write_buffer(
|
||||
client,
|
||||
client,
|
||||
uri.bucket,
|
||||
uri.key,
|
||||
@ -171,6 +170,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeadersRead)
|
||||
"authorization: ... SignedHeaders="
|
||||
"amz-sdk-invocation-id;"
|
||||
"amz-sdk-request;"
|
||||
"clickhouse-request;"
|
||||
"content-type;"
|
||||
"host;"
|
||||
"x-amz-api-version;"
|
||||
@ -216,6 +216,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSEKMSHeadersRead)
|
||||
"authorization: ... SignedHeaders="
|
||||
"amz-sdk-invocation-id;"
|
||||
"amz-sdk-request;"
|
||||
"clickhouse-request;"
|
||||
"content-type;"
|
||||
"host;"
|
||||
"x-amz-api-version;"
|
||||
|
@ -77,7 +77,6 @@ struct WriteBufferFromS3::PartData
|
||||
|
||||
WriteBufferFromS3::WriteBufferFromS3(
|
||||
std::shared_ptr<const S3::Client> client_ptr_,
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
size_t buf_size_,
|
||||
@ -92,7 +91,6 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
, upload_settings(request_settings.getUploadSettings())
|
||||
, write_settings(write_settings_)
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, client_with_long_timeout_ptr(std::move(client_with_long_timeout_ptr_))
|
||||
, object_metadata(std::move(object_metadata_))
|
||||
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
|
||||
, task_tracker(
|
||||
@ -566,7 +564,7 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
|
||||
Stopwatch watch;
|
||||
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(req);
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
||||
|
@ -30,8 +30,6 @@ class WriteBufferFromS3 final : public WriteBufferFromFileBase
|
||||
public:
|
||||
WriteBufferFromS3(
|
||||
std::shared_ptr<const S3::Client> client_ptr_,
|
||||
/// for CompleteMultipartUploadRequest, because it blocks on recv() for a few seconds on big uploads
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
size_t buf_size_,
|
||||
@ -90,7 +88,6 @@ private:
|
||||
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
|
||||
const WriteSettings write_settings;
|
||||
const std::shared_ptr<const S3::Client> client_ptr;
|
||||
const std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
|
||||
const std::optional<std::map<String, String>> object_metadata;
|
||||
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
|
||||
LogSeriesLimiterPtr limitedLog = std::make_shared<LogSeriesLimiter>(log, 1, 5);
|
||||
|
@ -549,7 +549,6 @@ public:
|
||||
getAsyncPolicy().setAutoExecute(false);
|
||||
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
client,
|
||||
client,
|
||||
bucket,
|
||||
file_name,
|
||||
|
@ -825,7 +825,6 @@ public:
|
||||
write_buf = wrapWriteBufferWithCompressionMethod(
|
||||
std::make_unique<WriteBufferFromS3>(
|
||||
configuration_.client,
|
||||
configuration_.client_with_long_timeout,
|
||||
bucket,
|
||||
key,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
@ -1330,8 +1329,6 @@ void StorageS3::Configuration::connect(ContextPtr context)
|
||||
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
|
||||
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
|
||||
});
|
||||
|
||||
client_with_long_timeout = client->clone(std::nullopt, request_settings.long_request_timeout_ms);
|
||||
}
|
||||
|
||||
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
|
||||
|
@ -311,7 +311,6 @@ public:
|
||||
HTTPHeaderEntries headers_from_ast;
|
||||
|
||||
std::shared_ptr<const S3::Client> client;
|
||||
std::shared_ptr<const S3::Client> client_with_long_timeout;
|
||||
std::vector<String> keys;
|
||||
};
|
||||
|
||||
|
@ -69,8 +69,7 @@ struct S3Settings
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
size_t retry_attempts = 10;
|
||||
size_t request_timeout_ms = 3000;
|
||||
size_t long_request_timeout_ms = 30000; // TODO: Take this from config like request_timeout_ms
|
||||
size_t request_timeout_ms = 30000;
|
||||
bool allow_native_copy = true;
|
||||
|
||||
bool throw_on_zero_files_match = false;
|
||||
|
@ -4,6 +4,7 @@
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_retry_attempts>1000000</s3_retry_attempts>
|
||||
<s3_use_adaptive_timeouts>1</s3_use_adaptive_timeouts>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
||||
|
@ -4,6 +4,7 @@
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_retry_attempts>5</s3_retry_attempts>
|
||||
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
||||
|
@ -7,11 +7,18 @@
|
||||
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</s3>
|
||||
<broken_s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://resolver:8083/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<skip_access_check>1</skip_access_check>
|
||||
</broken_s3>
|
||||
</disks>
|
||||
|
||||
@ -23,9 +30,16 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</broken_s3>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<merge_tree>
|
||||
<storage_policy>broken_s3</storage_policy>
|
||||
<storage_policy>s3</storage_policy>
|
||||
</merge_tree>
|
||||
</clickhouse>
|
||||
|
@ -64,6 +64,8 @@ def test_upload_after_check_works(cluster, broken_s3):
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
SETTINGS
|
||||
storage_policy='broken_s3'
|
||||
"""
|
||||
)
|
||||
|
||||
@ -78,7 +80,7 @@ def test_upload_after_check_works(cluster, broken_s3):
|
||||
assert "suddenly disappeared" in error, error
|
||||
|
||||
|
||||
def get_counters(node, query_id, log_type="ExceptionWhileProcessing"):
|
||||
def get_multipart_counters(node, query_id, log_type="ExceptionWhileProcessing"):
|
||||
node.query("SYSTEM FLUSH LOGS")
|
||||
return [
|
||||
int(x)
|
||||
@ -87,7 +89,25 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"):
|
||||
SELECT
|
||||
ProfileEvents['S3CreateMultipartUpload'],
|
||||
ProfileEvents['S3UploadPart'],
|
||||
ProfileEvents['S3WriteRequestsErrors']
|
||||
ProfileEvents['S3WriteRequestsErrors'],
|
||||
FROM system.query_log
|
||||
WHERE query_id='{query_id}'
|
||||
AND type='{log_type}'
|
||||
"""
|
||||
).split()
|
||||
if x
|
||||
]
|
||||
|
||||
|
||||
def get_put_counters(node, query_id, log_type="ExceptionWhileProcessing"):
|
||||
node.query("SYSTEM FLUSH LOGS")
|
||||
return [
|
||||
int(x)
|
||||
for x in node.query(
|
||||
f"""
|
||||
SELECT
|
||||
ProfileEvents['S3PutObject'],
|
||||
ProfileEvents['S3WriteRequestsErrors'],
|
||||
FROM system.query_log
|
||||
WHERE query_id='{query_id}'
|
||||
AND type='{log_type}'
|
||||
@ -129,12 +149,12 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
|
||||
assert "Code: 499" in error, error
|
||||
assert "mock s3 injected error" in error, error
|
||||
|
||||
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
|
||||
create_multipart, upload_parts, s3_errors = get_multipart_counters(
|
||||
node, insert_query_id
|
||||
)
|
||||
assert count_create_multi_part_uploads == 1
|
||||
assert count_upload_parts == 0
|
||||
assert count_s3_errors == 1
|
||||
assert create_multipart == 1
|
||||
assert upload_parts == 0
|
||||
assert s3_errors == 1
|
||||
|
||||
|
||||
# Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed
|
||||
@ -172,12 +192,12 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
|
||||
assert "Code: 499" in error, error
|
||||
assert "mock s3 injected error" in error, error
|
||||
|
||||
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
|
||||
create_multipart, upload_parts, s3_errors = get_multipart_counters(
|
||||
node, insert_query_id
|
||||
)
|
||||
assert count_create_multi_part_uploads == 1
|
||||
assert count_upload_parts >= 2
|
||||
assert count_s3_errors >= 2
|
||||
assert create_multipart == 1
|
||||
assert upload_parts >= 2
|
||||
assert s3_errors >= 2
|
||||
|
||||
|
||||
def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
|
||||
@ -207,12 +227,12 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
|
||||
query_id=insert_query_id,
|
||||
)
|
||||
|
||||
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
|
||||
create_multipart, upload_parts, s3_errors = get_multipart_counters(
|
||||
node, insert_query_id, log_type="QueryFinish"
|
||||
)
|
||||
assert count_create_multi_part_uploads == 1
|
||||
assert count_upload_parts == 39
|
||||
assert count_s3_errors == 3
|
||||
assert create_multipart == 1
|
||||
assert upload_parts == 39
|
||||
assert s3_errors == 3
|
||||
|
||||
broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused")
|
||||
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1"
|
||||
@ -279,13 +299,13 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
|
||||
query_id=insert_query_id,
|
||||
)
|
||||
|
||||
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
|
||||
create_multipart, upload_parts, s3_errors = get_multipart_counters(
|
||||
node, insert_query_id, log_type="QueryFinish"
|
||||
)
|
||||
|
||||
assert count_create_multi_part_uploads == 1
|
||||
assert count_upload_parts == 39
|
||||
assert count_s3_errors == 3
|
||||
assert create_multipart == 1
|
||||
assert upload_parts == 39
|
||||
assert s3_errors == 3
|
||||
|
||||
broken_s3.setup_at_part_upload(
|
||||
count=1000,
|
||||
@ -361,13 +381,13 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
|
||||
query_id=insert_query_id,
|
||||
)
|
||||
|
||||
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
|
||||
create_multipart, upload_parts, s3_errors = get_multipart_counters(
|
||||
node, insert_query_id, log_type="QueryFinish"
|
||||
)
|
||||
|
||||
assert count_create_multi_part_uploads == 1
|
||||
assert count_upload_parts == 39
|
||||
assert count_s3_errors == 3
|
||||
assert create_multipart == 1
|
||||
assert upload_parts == 39
|
||||
assert s3_errors == 3
|
||||
|
||||
broken_s3.setup_at_create_multi_part_upload(
|
||||
count=1000,
|
||||
@ -438,13 +458,13 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
|
||||
query_id=insert_query_id,
|
||||
)
|
||||
|
||||
count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters(
|
||||
create_multipart, upload_parts, s3_errors = get_multipart_counters(
|
||||
node, insert_query_id, log_type="QueryFinish"
|
||||
)
|
||||
|
||||
assert count_create_multi_part_uploads == 1
|
||||
assert count_upload_parts == 7
|
||||
assert count_s3_errors == 3
|
||||
assert create_multipart == 1
|
||||
assert upload_parts == 7
|
||||
assert s3_errors == 3
|
||||
|
||||
broken_s3.setup_at_part_upload(
|
||||
count=1000,
|
||||
@ -533,3 +553,60 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
|
||||
retry_count=120,
|
||||
sleep_time=1,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_name", ["node", "node_with_inf_s3_retries"])
|
||||
def test_adaptive_timeouts(cluster, broken_s3, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
|
||||
broken_s3.setup_fake_puts(part_length=1)
|
||||
broken_s3.setup_slow_answers(
|
||||
timeout=5,
|
||||
count=1000000,
|
||||
)
|
||||
|
||||
insert_query_id = f"TEST_ADAPTIVE_TIMEOUTS_{node_name}"
|
||||
node.query(
|
||||
f"""
|
||||
INSERT INTO
|
||||
TABLE FUNCTION s3(
|
||||
'http://resolver:8083/root/data/adaptive_timeouts',
|
||||
'minio', 'minio123',
|
||||
'CSV', auto, 'none'
|
||||
)
|
||||
SELECT
|
||||
*
|
||||
FROM system.numbers
|
||||
LIMIT 1
|
||||
SETTINGS
|
||||
s3_request_timeout_ms=30000,
|
||||
s3_check_objects_after_upload=0
|
||||
""",
|
||||
query_id=insert_query_id,
|
||||
)
|
||||
|
||||
broken_s3.reset()
|
||||
|
||||
put_objects, s3_errors = get_put_counters(
|
||||
node, insert_query_id, log_type="QueryFinish"
|
||||
)
|
||||
|
||||
assert put_objects == 1
|
||||
|
||||
s3_use_adaptive_timeouts = node.query(
|
||||
f"""
|
||||
SELECT
|
||||
value
|
||||
FROM system.settings
|
||||
WHERE
|
||||
name='s3_use_adaptive_timeouts'
|
||||
"""
|
||||
).strip()
|
||||
|
||||
if node_name == "node_with_inf_s3_retries":
|
||||
# first 2 attempts failed
|
||||
assert s3_use_adaptive_timeouts == "1"
|
||||
assert s3_errors == 1
|
||||
else:
|
||||
assert s3_use_adaptive_timeouts == "0"
|
||||
assert s3_errors == 0
|
||||
|
@ -11,6 +11,7 @@
|
||||
<skip_access_check>true</skip_access_check>
|
||||
<!-- Avoid extra retries to speed up tests -->
|
||||
<retry_attempts>0</retry_attempts>
|
||||
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
|
||||
<connect_timeout_ms>20000</connect_timeout_ms>
|
||||
</s3>
|
||||
<s3_retryable>
|
||||
@ -33,6 +34,7 @@
|
||||
<skip_access_check>true</skip_access_check>
|
||||
<!-- Avoid extra retries to speed up tests -->
|
||||
<retry_attempts>1</retry_attempts>
|
||||
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
|
||||
<s3_max_single_read_retries>1</s3_max_single_read_retries>
|
||||
<connect_timeout_ms>20000</connect_timeout_ms>
|
||||
</s3_no_retries>
|
||||
|
@ -1,9 +1,4 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_retry_attempts>5</s3_retry_attempts>
|
||||
</default>
|
||||
</profiles>
|
||||
<s3>
|
||||
<s3_mock>
|
||||
<endpoint>http://resolver:8080</endpoint>
|
||||
|
@ -1,7 +1,9 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<s3_retry_attempts>5</s3_retry_attempts>
|
||||
<enable_s3_requests_logging>1</enable_s3_requests_logging>
|
||||
<s3_retry_attempts>10</s3_retry_attempts>
|
||||
<s3_max_inflight_parts_for_one_file>5</s3_max_inflight_parts_for_one_file>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
||||
|
@ -4,6 +4,7 @@ import re
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
def gen_n_digit_number(n):
|
||||
@ -39,14 +40,14 @@ random.seed("Unstable server/1.0")
|
||||
|
||||
# Generating some "random" data and append a line which contains sum of numbers in column 4.
|
||||
lines = (
|
||||
b"".join((gen_line() for _ in range(500000)))
|
||||
b"".join([gen_line() for _ in range(500000)])
|
||||
+ f"0,0,0,{-sum_in_4_column}\n".encode()
|
||||
)
|
||||
|
||||
|
||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_HEAD(self):
|
||||
if self.path == "/root/test.csv":
|
||||
if self.path == "/root/test.csv" or self.path == "/root/slow_send_test.csv":
|
||||
self.from_bytes = 0
|
||||
self.end_bytes = len(lines)
|
||||
self.size = self.end_bytes
|
||||
@ -101,6 +102,18 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
print("Dropping connection")
|
||||
break
|
||||
|
||||
if self.path == "/root/slow_send_test.csv":
|
||||
self.send_block_size = 81920
|
||||
|
||||
for c, i in enumerate(
|
||||
range(self.from_bytes, self.end_bytes, self.send_block_size)
|
||||
):
|
||||
self.wfile.write(
|
||||
lines[i : min(i + self.send_block_size, self.end_bytes)]
|
||||
)
|
||||
self.wfile.flush()
|
||||
time.sleep(1)
|
||||
|
||||
elif self.path == "/":
|
||||
self.wfile.write(b"OK")
|
||||
|
||||
|
@ -818,6 +818,15 @@ def test_storage_s3_get_unstable(started_cluster):
|
||||
assert result.splitlines() == ["500001,500000,0"]
|
||||
|
||||
|
||||
def test_storage_s3_get_slow(started_cluster):
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
|
||||
get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/slow_send_test.csv', 'CSV', '{table_format}') FORMAT CSV"
|
||||
result = run_query(instance, get_query)
|
||||
assert result.splitlines() == ["500001,500000,0"]
|
||||
|
||||
|
||||
def test_storage_s3_put_uncompressed(started_cluster):
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
|
Loading…
Reference in New Issue
Block a user