Add function copyS3FileFromDisk().

This commit is contained in:
Vitaly Baranov 2023-04-22 17:51:14 +02:00
parent 69114cb550
commit 101aa6eff0
4 changed files with 94 additions and 58 deletions

View File

@ -176,27 +176,9 @@ DataSourceDescription BackupWriterS3::getDataSourceDescription() const
void BackupWriterS3::copyFileFromDisk(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
/// copyS3File() can copy to another S3 bucket, but it requires the same S3 URI endpoint.
/// We don't check `has_throttling` here (compare with BackupWriterDisk::copyFileFromDisk()) because
/// copyS3File() almost doesn't use network so the throttling is not needed.
if (getDataSourceDescription() == src_disk->getDataSourceDescription())
{
/// getBlobPath() can return std::nullopt if the file is stored as multiple objects in S3 bucket.
/// In this case we can't use the native copy.
if (auto blob_path = src_disk->getBlobPath(src_file_name))
{
/// Use more optimal way.
LOG_TRACE(log, "Copying file {} using native copy", src_file_name);
const auto & [src_bucket, src_key] = *blob_path;
auto dest_key = fs::path(s3_uri.key) / dest_file_name;
copyS3File(client, src_bucket, src_key, src_offset, src_size, s3_uri.bucket, dest_key, request_settings, {},
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"));
return;
}
}
/// Fallback to BackupWriterS3::copyDataToFile().
IBackupWriter::copyFileFromDisk(src_disk, src_file_name, src_offset, src_size, dest_file_name);
copyS3FileFromDisk(src_disk, src_file_name, src_offset, src_size,
client, s3_uri.bucket, fs::path(s3_uri.key) / dest_file_name, read_settings, request_settings,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"));
}
void BackupWriterS3::copyDataToFile(

View File

@ -13,7 +13,7 @@ namespace DB
{
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & src_s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
@ -31,47 +31,85 @@ void copyS3FileToDisk(
src_offset = 0;
if (!src_size)
src_size = S3::getObjectSize(*s3_client, src_bucket, src_key, version_id.value_or(""), request_settings) - *src_offset;
src_size = S3::getObjectSize(*src_s3_client, src_bucket, src_key, version_id.value_or(""), request_settings) - *src_offset;
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if (destination_data_source_description != DataSourceDescription{DataSourceType::S3, s3_client->getInitialEndpoint(), false, false})
if (destination_data_source_description == DataSourceDescription{DataSourceType::S3, src_s3_client->getInitialEndpoint(), false, false})
{
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} through buffers", src_key, destination_disk->getName());
ReadBufferFromS3 read_buffer{s3_client, src_bucket, src_key, {}, request_settings, read_settings};
if (*src_offset)
read_buffer.seek(*src_offset, SEEK_SET);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(*src_size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(read_buffer, *write_buffer, *src_size);
write_buffer->finalize();
/// Use native copy, the more optimal way.
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} using native copy", src_key, destination_disk->getName());
auto write_blob_function = [&](const std::pair<String, String> & blob_path_, WriteMode write_mode_, const std::optional<ObjectAttributes> & object_attributes_) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
chassert(write_mode_ == WriteMode::Rewrite);
copyS3File(
src_s3_client,
src_bucket,
src_key,
*src_offset,
*src_size,
/* dest_bucket= */ blob_path_.first,
/* dest_key= */ blob_path_.second,
request_settings,
object_attributes_,
scheduler,
/* for_disk_s3= */ true);
return *src_size;
};
destination_disk->writeFileUsingBlobWritingFunction(destination_path, write_mode, write_blob_function);
return;
}
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} using native copy", src_key, destination_disk->getName());
/// Fallback to copy through buffers.
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} through buffers", src_key, destination_disk->getName());
ReadBufferFromS3 read_buffer{src_s3_client, src_bucket, src_key, {}, request_settings, read_settings};
if (*src_offset)
read_buffer.seek(*src_offset, SEEK_SET);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(*src_size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(read_buffer, *write_buffer, *src_size);
write_buffer->finalize();
}
String dest_bucket = destination_disk->getObjectStorage()->getObjectsNamespace();
void copyS3FileFromDisk(
DiskPtr src_disk,
const String & src_path,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const ReadSettings & read_settings,
const S3Settings::RequestSettings & request_settings,
ThreadPoolCallbackRunner<void> scheduler)
{
if (!src_offset)
src_offset = 0;
auto write_blob_function = [&](const std::pair<String, String> & blob_path_, WriteMode write_mode_, const std::optional<ObjectAttributes> & object_attributes_) -> size_t
if (!src_size)
src_size = src_disk->getFileSize(src_path) - *src_offset;
auto source_data_source_description = src_disk->getDataSourceDescription();
if (source_data_source_description == DataSourceDescription{DataSourceType::S3, dest_s3_client->getInitialEndpoint(), false, false})
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
chassert(write_mode_ == WriteMode::Rewrite);
/// getBlobPath() can return std::nullopt if the file is stored as multiple objects in S3 bucket.
/// In this case we can't use native copy.
if (auto blob_path = src_disk->getBlobPath(src_path))
{
/// Use native copy, the more optimal way.
LOG_TRACE(&Poco::Logger::get("copyS3FileFromDisk"), "Copying file {} to S3 using native copy", src_path);
const auto & [src_bucket, src_key] = *blob_path;
copyS3File(dest_s3_client, src_bucket, src_key, *src_offset, *src_size, dest_bucket, dest_key, request_settings, {}, scheduler);
return;
}
}
copyS3File(
s3_client,
src_bucket,
src_key,
*src_offset,
*src_size,
/* dest_bucket= */ blob_path_.first,
/* dest_key= */ blob_path_.second,
request_settings,
object_attributes_,
scheduler,
/* for_disk_s3= */ true);
return *src_size;
};
destination_disk->writeFileUsingBlobWritingFunction(destination_path, write_mode, write_blob_function);
/// Fallback to copy through buffers.
LOG_TRACE(&Poco::Logger::get("copyS3FileFromDisk"), "Copying {} to S3 through buffers", src_path);
auto create_read_buffer = [src_disk, &src_path, &read_settings] { return src_disk->readFile(src_path, read_settings); };
copyDataToS3File(create_read_buffer, *src_offset, *src_size, dest_s3_client, dest_bucket, dest_key, request_settings, {}, scheduler);
}
}

View File

@ -13,11 +13,11 @@ namespace DB
{
/// Copies an object from S3 bucket to a disk of any type.
/// Depending on the disk the function can either do copying though buffers
/// Depending on the disk the function can either do copying through buffers
/// (i.e. download the object by portions and then write those portions to the specified disk),
/// or perform a server-side copy.
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & src_s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
@ -31,6 +31,22 @@ void copyS3FileToDisk(
const S3Settings::RequestSettings & request_settings = {},
ThreadPoolCallbackRunner<void> scheduler = {});
/// Copies an object from a disk of any type to S3 bucket.
/// Depending on the disk the function can either do copying through buffers
/// (i.e. read the object by portions and then upload those portions to the specified disk),
/// or perform a server-side copy.
void copyS3FileFromDisk(
DiskPtr src_disk,
const String & src_path,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const ReadSettings & read_settings = {},
const S3Settings::RequestSettings & request_settings = {},
ThreadPoolCallbackRunner<void> scheduler = {});
}
#endif

View File

@ -141,7 +141,7 @@ def test_backup_to_s3_native_copy():
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("BackupWriterS3.*using native copy")
assert node.contains_in_log("copyS3FileFromDisk.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
@ -155,7 +155,7 @@ def test_backup_to_s3_native_copy_other_bucket():
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("BackupWriterS3.*using native copy")
assert node.contains_in_log("copyS3FileFromDisk.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
@ -167,7 +167,7 @@ def test_backup_to_s3_native_copy_multipart():
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("BackupWriterS3.*using native copy")
assert node.contains_in_log("copyS3FileFromDisk.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"