mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 17:20:50 +00:00
Revert s3 prefixes + add retries on write requests unexpected errors
This commit is contained in:
parent
456da30c9f
commit
d9c3549cc8
@ -86,6 +86,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 1000, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
|
||||
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
|
||||
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
|
||||
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
|
||||
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
|
||||
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
|
||||
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
|
||||
|
@ -39,6 +39,7 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
|
||||
rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold);
|
||||
rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size);
|
||||
rw_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", context->getSettingsRef().s3_check_objects_after_upload);
|
||||
rw_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", context->getSettingsRef().s3_max_unexpected_write_error_retries);
|
||||
|
||||
return std::make_unique<S3ObjectStorageSettings>(
|
||||
rw_settings,
|
||||
|
@ -339,17 +339,29 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
}
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size());
|
||||
else
|
||||
size_t max_retry = std::max(s3_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
|
||||
outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " "));
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size());
|
||||
break;
|
||||
}
|
||||
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
|
||||
{
|
||||
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
|
||||
/// BTW, NO_SUCH_UPLOAD is expected error and we shouldn't retry it
|
||||
LOG_WARNING(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", bucket, key, multipart_upload_id, tags.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
|
||||
outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " "));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,15 +441,27 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
|
||||
|
||||
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
|
||||
{
|
||||
auto outcome = client_ptr->PutObject(task.req);
|
||||
bool with_pool = static_cast<bool>(schedule);
|
||||
if (outcome.IsSuccess())
|
||||
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
|
||||
else
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}",
|
||||
outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool);
|
||||
size_t max_retry = std::max(s3_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
auto outcome = client_ptr->PutObject(task.req);
|
||||
bool with_pool = static_cast<bool>(schedule);
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
|
||||
break;
|
||||
}
|
||||
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
|
||||
{
|
||||
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
|
||||
LOG_WARNING(log, "Single part upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, WithPool: {}, will retry", bucket, key, task.req.GetContentLength(), with_pool);
|
||||
}
|
||||
else
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}",
|
||||
outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::waitForReadyBackGroundTasks()
|
||||
|
@ -121,6 +121,7 @@ S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
|
||||
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
|
||||
max_connections = settings.s3_max_connections;
|
||||
check_objects_after_upload = settings.s3_check_objects_after_upload;
|
||||
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
|
||||
}
|
||||
|
||||
void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings)
|
||||
@ -137,6 +138,8 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
|
||||
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
|
||||
if (!max_connections)
|
||||
max_connections = settings.s3_max_connections;
|
||||
if (!max_unexpected_write_error_retries)
|
||||
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
|
||||
check_objects_after_upload = settings.s3_check_objects_after_upload;
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,7 @@ struct S3Settings
|
||||
size_t max_single_part_upload_size = 0;
|
||||
size_t max_connections = 0;
|
||||
bool check_objects_after_upload = false;
|
||||
size_t max_unexpected_write_error_retries = 0;
|
||||
|
||||
ReadWriteSettings() = default;
|
||||
explicit ReadWriteSettings(const Settings & settings);
|
||||
@ -73,7 +74,8 @@ struct S3Settings
|
||||
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
|
||||
&& max_single_part_upload_size == other.max_single_part_upload_size
|
||||
&& max_connections == other.max_connections
|
||||
&& check_objects_after_upload == other.check_objects_after_upload;
|
||||
&& check_objects_after_upload == other.check_objects_after_upload
|
||||
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries;
|
||||
}
|
||||
|
||||
void updateFromSettingsIfEmpty(const Settings & settings);
|
||||
|
Loading…
Reference in New Issue
Block a user