Better S3 read retries. Renamed s3_max_single_read_retries -> s3_single_read_retry_attempts, added sleeps before next attempts.

This commit is contained in:
Vladimir Chebotarev 2021-04-22 04:25:40 +03:00
parent 6936274398
commit bdf03387ab
11 changed files with 56 additions and 40 deletions

View File

@ -70,7 +70,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_single_read_retry_attempts, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \

View File

@ -277,12 +277,12 @@ public:
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
DiskS3::Metadata metadata_,
size_t s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
size_t buf_size_)
: client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, metadata(std::move(metadata_))
, s3_max_single_read_retries(s3_max_single_read_retries_)
, single_read_retry_strategy(std::move(single_read_retry_strategy_))
, buf_size(buf_size_)
{
}
@ -339,7 +339,7 @@ private:
const auto & [path, size] = metadata.s3_objects[i];
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size);
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, single_read_retry_strategy, buf_size);
buf->seek(offset, SEEK_SET);
return buf;
}
@ -368,7 +368,7 @@ private:
++current_buf_idx;
const auto & path = metadata.s3_objects[current_buf_idx].first;
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size);
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, single_read_retry_strategy, buf_size);
current_buf->next();
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
@ -379,7 +379,7 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String & bucket;
DiskS3::Metadata metadata;
size_t s3_max_single_read_retries;
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy;
size_t buf_size;
size_t absolute_position = 0;
@ -686,7 +686,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->single_read_retry_strategy, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
@ -1000,7 +1000,7 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
settings->client,
source_bucket,
source_path + SCHEMA_VERSION_OBJECT,
settings->s3_max_single_read_retries);
settings->single_read_retry_strategy);
readIntText(version, buffer);
@ -1559,7 +1559,7 @@ void DiskS3::applyNewSettings(const Poco::Util::AbstractConfiguration & config,
DiskS3Settings::DiskS3Settings(
const std::shared_ptr<Aws::S3::S3Client> & client_,
size_t s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
size_t s3_min_upload_part_size_,
size_t s3_max_single_part_upload_size_,
size_t min_bytes_for_seek_,
@ -1567,7 +1567,7 @@ DiskS3Settings::DiskS3Settings(
int thread_pool_size_,
int list_object_keys_size_)
: client(client_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, single_read_retry_strategy(single_read_retry_strategy_)
, s3_min_upload_part_size(s3_min_upload_part_size_)
, s3_max_single_part_upload_size(s3_max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)

View File

@ -22,7 +22,7 @@ struct DiskS3Settings
{
DiskS3Settings(
const std::shared_ptr<Aws::S3::S3Client> & client_,
size_t s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
size_t s3_min_upload_part_size_,
size_t s3_max_single_part_upload_size_,
size_t min_bytes_for_seek_,
@ -31,7 +31,7 @@ struct DiskS3Settings
int list_object_keys_size_);
std::shared_ptr<Aws::S3::S3Client> client;
size_t s3_max_single_read_retries;
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy;
size_t s3_min_upload_part_size;
size_t s3_max_single_part_upload_size;
size_t min_bytes_for_seek;

View File

@ -150,7 +150,7 @@ std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfigurat
{
return std::make_unique<DiskS3Settings>(
getClient(config, config_prefix, context),
config.getUInt64(config_prefix + ".s3_max_single_read_retries", context->getSettingsRef().s3_max_single_read_retries),
std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".single_read_retry_attempts", context->getSettingsRef().s3_single_read_retry_attempts)),
config.getUInt64(config_prefix + ".s3_min_upload_part_size", context->getSettingsRef().s3_min_upload_part_size),
config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),

View File

@ -6,6 +6,7 @@
# include <IO/ReadBufferFromS3.h>
# include <Common/Stopwatch.h>
# include <aws/core/client/RetryStrategy.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/GetObjectRequest.h>
# include <common/logger_useful.h>
@ -31,12 +32,12 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, UInt64 s3_max_single_read_retries_, size_t buffer_size_)
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy_, size_t buffer_size_)
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, retry_strategy(std::move(retry_strategy_))
, buffer_size(buffer_size_)
{
}
@ -52,11 +53,10 @@ bool ReadBufferFromS3::nextImpl()
Stopwatch watch;
bool next_result = false;
for (Int64 attempt = static_cast<Int64>(s3_max_single_read_retries); attempt >= 0; --attempt)
{
if (!impl)
impl = initialize();
Aws::Client::AWSError<Aws::Client::CoreErrors> network_error;
for (int attempt = 1;; ++attempt)
{
try
{
next_result = impl->next();
@ -68,6 +68,8 @@ bool ReadBufferFromS3::nextImpl()
}
catch (const Exception & e)
{
network_error = Aws::Client::AWSError<Aws::Client::CoreErrors>(Aws::Client::CoreErrors::NETWORK_CONNECTION, e.name(), e.message(), true);
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Remaining attempts: {}, Message: {}",
@ -75,9 +77,14 @@ bool ReadBufferFromS3::nextImpl()
impl.reset();
if (!attempt)
if (!retry_strategy->ShouldRetry(network_error, attempt))
throw;
}
std::this_thread::sleep_for(std::chrono::milliseconds(retry_strategy->CalculateDelayBeforeNextRetry(network_error, attempt)));
if (!impl)
impl = initialize();
}
watch.stop();

View File

@ -15,6 +15,10 @@ namespace Aws::S3
{
class S3Client;
}
namespace Aws::Client
{
class RetryStrategy;
}
namespace DB
{
@ -27,7 +31,7 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String key;
UInt64 s3_max_single_read_retries;
std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy;
size_t buffer_size;
off_t offset = 0;
Aws::S3::Model::GetObjectResult read_result;
@ -40,7 +44,7 @@ public:
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
UInt64 s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;

View File

@ -428,8 +428,6 @@ public:
/// EC2MetadataService throttles by delaying the response so the service client should set a large read timeout.
/// EC2MetadataService delay is in order of seconds so it only make sense to retry after a couple of seconds.
aws_client_configuration.connectTimeoutMs = 1000;
/// FIXME. Somehow this timeout does not work in docker without --net=host.
aws_client_configuration.requestTimeoutMs = 1000;
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);

View File

@ -25,6 +25,7 @@
#include <DataTypes/DataTypeString.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
@ -167,7 +168,7 @@ StorageS3Source::StorageS3Source(
ContextPtr context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket_,
@ -179,7 +180,7 @@ StorageS3Source::StorageS3Source(
, format(format_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, single_read_retry_strategy(std::move(single_read_retry_strategy_))
, compression_hint(compression_hint_)
, client(client_)
, sample_block(sample_block_)
@ -200,7 +201,7 @@ bool StorageS3Source::initialize()
file_path = bucket + "/" + current_key;
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, s3_max_single_read_retries), chooseCompressionMethod(current_key, compression_hint));
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, single_read_retry_strategy), chooseCompressionMethod(current_key, compression_hint));
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(input_format);
@ -324,7 +325,7 @@ StorageS3::StorageS3(
const String & secret_access_key_,
const StorageID & table_id_,
const String & format_name_,
UInt64 s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
@ -337,7 +338,7 @@ StorageS3::StorageS3(
: IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, format_name(format_name_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, single_read_retry_strategy(std::move(single_read_retry_strategy_))
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
@ -405,7 +406,7 @@ Pipe StorageS3::read(
local_context,
metadata_snapshot->getColumns(),
max_block_size,
s3_max_single_read_retries,
single_read_retry_strategy,
compression_method,
client_auth.client,
client_auth.uri.bucket,
@ -492,7 +493,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
UInt64 s3_max_single_read_retries = args.getLocalContext()->getSettingsRef().s3_max_single_read_retries;
auto single_read_retry_strategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(args.getLocalContext()->getSettingsRef().s3_single_read_retry_attempts);
UInt64 min_upload_part_size = args.getLocalContext()->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.getLocalContext()->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = args.getLocalContext()->getSettingsRef().s3_max_connections;
@ -516,7 +517,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
secret_access_key,
args.table_id,
format_name,
s3_max_single_read_retries,
single_read_retry_strategy,
min_upload_part_size,
max_single_part_upload_size,
max_connections,

View File

@ -55,7 +55,7 @@ public:
ContextPtr context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket,
@ -72,7 +72,7 @@ private:
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
UInt64 s3_max_single_read_retries;
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy;
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
@ -103,7 +103,7 @@ public:
const String & secret_access_key,
const StorageID & table_id_,
const String & format_name_,
UInt64 s3_max_single_read_retries_,
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
@ -150,7 +150,7 @@ private:
ClientAuthentificaiton client_auth;
String format_name;
UInt64 s3_max_single_read_retries;
std::shared_ptr<Aws::Client::RetryStrategy> single_read_retry_strategy;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;

View File

@ -12,6 +12,9 @@
#include <Parsers/ASTLiteral.h>
#include "registerTableFunctions.h"
#include <aws/core/client/DefaultRetryStrategy.h>
namespace DB
{
@ -83,7 +86,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
{
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 s3_max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
auto single_read_retry_strategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(context->getSettingsRef().s3_single_read_retry_attempts);
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
@ -94,7 +97,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
s3_max_single_read_retries,
single_read_retry_strategy,
min_upload_part_size,
max_single_part_upload_size,
max_connections,

View File

@ -28,6 +28,9 @@
#include <memory>
#include <thread>
#include <aws/core/client/DefaultRetryStrategy.h>
namespace DB
{
@ -109,7 +112,7 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
Poco::URI uri (filename);
S3::URI s3_uri (uri);
/// Actually this parameters are not used
UInt64 s3_max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
auto s3_max_single_read_retry_strategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(context->getSettingsRef().s3_single_read_retry_attempts);
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
@ -119,7 +122,7 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
s3_max_single_read_retries,
s3_max_single_read_retry_strategy,
min_upload_part_size,
max_single_part_upload_size,
max_connections,