Merge pull request #44065 from vitlibar/fix-race-in-s3-multipart-upload

Fix race in s3 multipart upload
This commit is contained in:
Vitaly Baranov 2022-12-12 15:28:05 +01:00 committed by GitHub
commit 7d701e9b32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 92 additions and 31 deletions

View File

@ -23,6 +23,7 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
extern const int LOGICAL_ERROR;
}
@ -222,8 +223,21 @@ void BackupWriterS3::copyObjectMultipartImpl(
for (size_t part_number = 1; position < size; ++part_number)
{
/// Check that part number is not too big.
if (part_number > request_settings.max_part_number)
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_operation_copy_size = {}",
request_settings.max_part_number, size, request_settings.min_upload_part_size, request_settings.max_upload_part_size,
request_settings.upload_part_size_multiply_factor, request_settings.upload_part_size_multiply_parts_count_threshold,
request_settings.max_single_operation_copy_size);
}
size_t next_position = std::min(position + upload_part_size, size);
/// Make a copy request to copy a part.
Aws::S3::Model::UploadPartCopyRequest part_request;
part_request.SetCopySource(src_bucket + "/" + src_key);
part_request.SetBucket(dst_bucket);
@ -250,6 +264,7 @@ void BackupWriterS3::copyObjectMultipartImpl(
position = next_position;
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
if (part_number % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= request_settings.upload_part_size_multiply_factor;

View File

@ -136,7 +136,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
return;
S3Settings::RequestSettings request_settings_1;
request_settings_1.upload_part_size_multiply_parts_count_threshold = 10000;
request_settings_1.setEmptyFieldsByDefault();
const auto create_writer = [&](const auto & key)
{

View File

@ -37,8 +37,10 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", settings.s3_max_single_read_retries);
request_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", settings.s3_min_upload_part_size);
request_settings.max_upload_part_size = config.getUInt64(config_prefix + ".s3_max_upload_part_size", S3Settings::RequestSettings::DEFAULT_MAX_UPLOAD_PART_SIZE);
request_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", settings.s3_upload_part_size_multiply_factor);
request_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", settings.s3_upload_part_size_multiply_parts_count_threshold);
request_settings.max_part_number = config.getUInt64(config_prefix + ".s3_max_part_number", S3Settings::RequestSettings::DEFAULT_MAX_PART_NUMBER);
request_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", settings.s3_max_single_part_upload_size);
request_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", settings.s3_check_objects_after_upload);
request_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", settings.s3_max_unexpected_write_error_retries);

View File

@ -50,6 +50,7 @@ const int S3_WARN_MAX_PARTS = 10000;
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
}
struct WriteBufferFromS3::UploadPartTask
@ -122,12 +123,6 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::allocateBuffer()
{
if (total_parts_uploaded != 0 && total_parts_uploaded % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= request_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
}
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
temporary_buffer->exceptions(std::ios::badbit);
last_part_size = 0;
@ -257,13 +252,10 @@ void WriteBufferFromS3::writePart()
{
UploadPartTask * task = nullptr;
int part_number;
{
std::lock_guard lock(bg_tasks_mutex);
task = &upload_object_tasks.emplace_back();
++num_added_bg_tasks;
part_number = num_added_bg_tasks;
}
/// Notify waiting thread when task finished
@ -281,7 +273,7 @@ void WriteBufferFromS3::writePart()
try
{
fillUploadRequest(task->req, part_number);
fillUploadRequest(task->req);
schedule([this, task, task_finish_notify]()
{
@ -308,23 +300,44 @@ void WriteBufferFromS3::writePart()
UploadPartTask task;
auto & tags = TSA_SUPPRESS_WARNING_FOR_WRITE(part_tags); /// Suppress warning because schedule == false.
fillUploadRequest(task.req, static_cast<int>(tags.size() + 1));
fillUploadRequest(task.req);
processUploadRequest(task);
tags.push_back(task.tag);
}
}
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number)
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req)
{
/// Increase part number.
++part_number;
if (!multipart_upload_id.empty() && (part_number > request_settings.max_part_number))
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}",
request_settings.max_part_number, count(), request_settings.min_upload_part_size, request_settings.max_upload_part_size,
request_settings.upload_part_size_multiply_factor, request_settings.upload_part_size_multiply_parts_count_threshold,
request_settings.max_single_part_upload_size);
}
/// Setup request.
req.SetBucket(bucket);
req.SetKey(key);
req.SetPartNumber(part_number);
req.SetPartNumber(static_cast<int>(part_number));
req.SetUploadId(multipart_upload_id);
req.SetContentLength(temporary_buffer->tellp());
req.SetBody(temporary_buffer);
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
req.SetContentType("binary/octet-stream");
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
if (!multipart_upload_id.empty() && (part_number % request_settings.upload_part_size_multiply_parts_count_threshold == 0))
{
upload_part_size *= request_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
}
}
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
@ -343,8 +356,6 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
}
else
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
total_parts_uploaded++;
}
void WriteBufferFromS3::completeMultipartUpload()

View File

@ -75,7 +75,7 @@ private:
void finalizeImpl() override;
struct UploadPartTask;
void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number);
void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req);
void processUploadRequest(UploadPartTask & task);
struct PutObjectTask;
@ -95,7 +95,7 @@ private:
size_t upload_part_size = 0;
std::shared_ptr<Aws::StringStream> temporary_buffer; /// Buffer to accumulate data.
size_t last_part_size = 0;
std::atomic<size_t> total_parts_uploaded = 0;
size_t part_number = 0;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.

View File

@ -4525,6 +4525,7 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
auto read_buffer = backup_entry->getReadBuffer();
auto write_buffer = disk->writeFile(temp_part_dir / filename);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
reservation->update(reservation->getSize() - backup_entry->getSize());
}

View File

@ -1032,6 +1032,7 @@ void StorageLog::restoreDataImpl(const BackupPtr & backup, const String & data_p
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file.path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
out->finalize();
}
if (use_marks_file)

View File

@ -6,23 +6,12 @@
#include <Common/Exception.h>
#include <Common/Throttler.h>
#include <Interpreters/Context.h>
#include <base/unit.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
namespace
{
/// An object up to 5 GB can be copied in a single atomic operation.
constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5_GiB;
/// The maximum size of an uploaded part.
constexpr UInt64 DEFAULT_MAX_UPLOAD_PART_SIZE = 5_GiB;
}
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
{
std::lock_guard lock(mutex);
@ -61,11 +50,12 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
request_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
request_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
request_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, S3Settings::RequestSettings::DEFAULT_MAX_UPLOAD_PART_SIZE);
request_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
request_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
request_settings.max_part_number = get_uint_for_key(key, "max_part_number", true, S3Settings::RequestSettings::DEFAULT_MAX_PART_NUMBER);
request_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
request_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
request_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, S3Settings::RequestSettings::DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
request_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
request_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
@ -128,6 +118,8 @@ void S3Settings::RequestSettings::updateFromSettingsIfEmpty(const Settings & set
upload_part_size_multiply_factor = settings.s3_upload_part_size_multiply_factor;
if (!upload_part_size_multiply_parts_count_threshold)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!max_part_number)
max_part_number = DEFAULT_MAX_PART_NUMBER;
if (!max_single_part_upload_size)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
if (!max_single_operation_copy_size)

View File

@ -31,6 +31,7 @@ struct S3Settings
size_t max_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 0;
size_t upload_part_size_multiply_parts_count_threshold = 0;
size_t max_part_number = 0;
size_t max_single_part_upload_size = 0;
size_t max_single_operation_copy_size = 0;
size_t max_connections = 0;
@ -49,6 +50,7 @@ struct S3Settings
&& max_upload_part_size == other.max_upload_part_size
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
&& max_part_number == other.max_part_number
&& max_single_part_upload_size == other.max_single_part_upload_size
&& max_single_operation_copy_size == other.max_single_operation_copy_size
&& max_connections == other.max_connections
@ -58,6 +60,18 @@ struct S3Settings
&& put_request_throttler == other.put_request_throttler;
}
static const constexpr UInt64 DEFAULT_SINGLE_READ_RETRIES = 4;
static const constexpr UInt64 DEFAULT_MIN_UPLOAD_PART_SIZE = 16 * 1024 * 1024;
static const constexpr UInt64 DEFAULT_MAX_UPLOAD_PART_SIZE = 5ULL * 1024 * 1024 * 1024;
static const constexpr UInt64 DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR = 2;
static const constexpr UInt64 DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD = 500;
static const constexpr UInt64 DEFAULT_MAX_PART_NUMBER = 10000;
static const constexpr UInt64 DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE = 32 * 1024 * 1024;
static const constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5ULL * 1024 * 1024 * 1024;
static const constexpr UInt64 DEFAULT_MAX_CONNECTIONS = 1024;
static const constexpr UInt64 DEFAULT_MAX_UNEXPECTED_WRITE_ERRORS_RETRIES = 4;
void setEmptyFieldsByDefault();
void updateFromSettingsIfEmpty(const Settings & settings);
};
@ -83,4 +97,28 @@ private:
std::map<const String, const S3Settings> s3_settings;
};
inline void S3Settings::RequestSettings::setEmptyFieldsByDefault()
{
if (!max_single_read_retries)
max_single_read_retries = DEFAULT_SINGLE_READ_RETRIES;
if (!min_upload_part_size)
min_upload_part_size = DEFAULT_MIN_UPLOAD_PART_SIZE;
if (!max_upload_part_size)
max_upload_part_size = DEFAULT_MAX_UPLOAD_PART_SIZE;
if (!upload_part_size_multiply_factor)
upload_part_size_multiply_factor = DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR;
if (!upload_part_size_multiply_parts_count_threshold)
upload_part_size_multiply_parts_count_threshold = DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD;
if (!max_part_number)
max_part_number = DEFAULT_MAX_PART_NUMBER;
if (!max_single_part_upload_size)
max_single_part_upload_size = DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE;
if (!max_single_operation_copy_size)
max_single_operation_copy_size = DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE;
if (!max_connections)
max_connections = DEFAULT_MAX_CONNECTIONS;
if (!max_unexpected_write_error_retries)
max_unexpected_write_error_retries = DEFAULT_MAX_UNEXPECTED_WRITE_ERRORS_RETRIES;
}
}

View File

@ -626,6 +626,7 @@ void StorageStripeLog::restoreDataImpl(const BackupPtr & backup, const String &
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file_path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
out->finalize();
}
/// Append the index.