Simplify the implementation, create new utility function copyS3FileToDisk().

This commit is contained in:
Vitaly Baranov 2023-03-14 11:53:06 +01:00
parent 1cf1ce07fe
commit 25356786ea
29 changed files with 268 additions and 214 deletions

View File

@ -3,7 +3,6 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
#include <Common/logger_useful.h>
namespace DB
@ -14,21 +13,13 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode)
void IBackupReader::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
if (supportNativeCopy(destination_disk->getDataSourceDescription(), mode))
{
LOG_TRACE(getLogger(), "Copying {} using native copy", file_name);
copyFileToDiskNative(file_name, size, destination_disk, destination_path, mode);
}
else
{
LOG_TRACE(getLogger(), "Copying {} through buffers", file_name);
auto read_buffer = readFile(file_name);
auto write_buffer = destination_disk->writeFile(destination_path, DBMS_DEFAULT_BUFFER_SIZE, mode);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
auto read_buffer = readFile(file_name);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)

View File

@ -17,13 +17,9 @@ public:
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode);
virtual void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings);
virtual DataSourceDescription getDataSourceDescription() const = 0;
protected:
virtual bool supportNativeCopy(DataSourceDescription /* destination_data_source_description */, WriteMode /* mode */) const { return false; }
virtual void copyFileToDiskNative(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode) = 0;
virtual Poco::Logger * getLogger() const = 0;
};
/// Represents operations of storing to disk or uploading for writing a backup.

View File

@ -35,15 +35,18 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderDisk::readFile(const String & fi
return disk->readFile(path / file_name);
}
bool BackupReaderDisk::supportNativeCopy(DataSourceDescription destination_data_source_description, WriteMode mode) const
void BackupReaderDisk::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
return (destination_data_source_description == getDataSourceDescription()) && (mode == WriteMode::Rewrite);
}
if (write_mode == WriteMode::Rewrite)
{
LOG_TRACE(log, "Copying {}/{} from disk {} to {} by the disk", path, file_name, disk->getName(), destination_disk->getName());
disk->copyFile(path / file_name, *destination_disk, destination_path, write_settings);
return;
}
void BackupReaderDisk::copyFileToDiskNative(const String & file_name, size_t, DiskPtr destination_disk, const String & destination_path, WriteMode)
{
auto src_path = path / file_name;
disk->copyFile(src_path, *destination_disk, destination_path);
LOG_TRACE(log, "Copying {}/{} from disk {} to {} through buffers", path, file_name, disk->getName(), destination_disk->getName());
IBackupReader::copyFileToDisk(file_name, size, destination_disk, destination_path, write_mode, write_settings);
}

View File

@ -17,13 +17,10 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
protected:
bool supportNativeCopy(DataSourceDescription destination_data_source_description, WriteMode mode) const override;
void copyFileToDiskNative(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode) override;
Poco::Logger * getLogger() const override { return log; }
private:
DiskPtr disk;
std::filesystem::path path;

View File

@ -1,4 +1,5 @@
#include <Backups/BackupIO_File.h>
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
@ -32,16 +33,19 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderFile::readFile(const String & fi
return createReadBufferFromFileBase(path / file_name, {});
}
bool BackupReaderFile::supportNativeCopy(DataSourceDescription destination_data_source_description, WriteMode mode) const
void BackupReaderFile::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
return (destination_data_source_description == getDataSourceDescription()) && (mode == WriteMode::Rewrite);
}
if (destination_disk->getDataSourceDescription() == getDataSourceDescription())
{
/// Use more optimal way.
LOG_TRACE(log, "Copying {}/{} to disk {} locally", path, file_name, destination_disk->getName());
fs::copy(path / file_name, fullPath(destination_disk, destination_path), fs::copy_options::overwrite_existing);
return;
}
void BackupReaderFile::copyFileToDiskNative(const String & file_name, size_t, DiskPtr destination_disk, const String & destination_path, WriteMode)
{
std::string abs_source_path = path / file_name;
std::string abs_destination_path = fullPath(destination_disk, destination_path);
fs::copy(abs_source_path, abs_destination_path, fs::copy_options::overwrite_existing);
LOG_TRACE(log, "Copying {}/{} to disk {} through buffers", path, file_name, destination_disk->getName());
IBackupReader::copyFileToDisk(path / file_name, size, destination_disk, destination_path, write_mode, write_settings);
}

View File

@ -15,13 +15,10 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
protected:
bool supportNativeCopy(DataSourceDescription destination_data_source_description, WriteMode mode) const override;
void copyFileToDiskNative(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode) override;
Poco::Logger * getLogger() const override { return log; }
private:
std::filesystem::path path;
Poco::Logger * log;

View File

@ -2,7 +2,7 @@
#if USE_AWS_S3
#include <Common/quoteString.h>
#include <Disks/ObjectStorages/S3/S3ParamsForNativeCopyToDisk.h>
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/BackupsIOThreadPool.h>
@ -129,20 +129,25 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
}
bool BackupReaderS3::supportNativeCopy(DataSourceDescription destination_data_source_description, WriteMode) const
void BackupReaderS3::copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings)
{
return destination_data_source_description == getDataSourceDescription();
}
LOG_TRACE(log, "Copying {} to disk {}", file_name, destination_disk->getName());
void BackupReaderS3::copyFileToDiskNative(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode)
{
S3ParamsForNativeCopyToDisk params;
params.src_bucket = s3_uri.bucket;
params.src_key = fs::path(s3_uri.key) / file_name;
params.src_size = size;
params.request_settings = request_settings;
params.scheduler = threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupReaderS3");
destination_disk->writeFileUsingNativeCopy(destination_path, mode, params);
copyS3FileToDisk(
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
s3_uri.version_id,
0,
size,
destination_disk,
destination_path,
write_mode,
read_settings,
write_settings,
request_settings,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupReaderS3"));
}

View File

@ -22,13 +22,10 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) override;
DataSourceDescription getDataSourceDescription() const override;
protected:
bool supportNativeCopy(DataSourceDescription destination_data_source_description, WriteMode mode) const override;
void copyFileToDiskNative(const String & file_name, size_t size, DiskPtr destination_disk, const String & destination_path, WriteMode mode) override;
Poco::Logger * getLogger() const override { return log; }
private:
S3::URI s3_uri;
std::shared_ptr<S3::Client> client;

View File

@ -595,6 +595,14 @@ std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum &
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
std::lock_guard lock{mutex};
++num_read_files;
return std::make_unique<ReadBufferFromMemory>(static_cast<char *>(nullptr), 0);
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
if (!info_opt)
{
@ -607,20 +615,12 @@ std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum &
const auto & info = *info_opt;
if (!info.size)
{
/// Entry's data is empty.
std::lock_guard lock{mutex};
++num_read_files;
return std::make_unique<ReadBufferFromMemory>(static_cast<char *>(nullptr), 0);
}
std::unique_ptr<SeekableReadBuffer> read_buffer;
std::unique_ptr<SeekableReadBuffer> base_read_buffer;
if (info.size > info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
/// Make `read_buffer` if there is data for this backup entry in this backup.
if (use_archives)
{
std::shared_ptr<IArchiveReader> archive_reader;
@ -638,6 +638,7 @@ std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum &
if (info.base_size)
{
/// Make `base_read_buffer` if there is data for this backup entry in the base backup.
if (!base_backup)
{
throw Exception(
@ -683,16 +684,31 @@ std::unique_ptr<SeekableReadBuffer> BackupImpl::readFile(const SizeAndChecksum &
}
}
size_t BackupImpl::copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path, WriteMode mode) const
size_t BackupImpl::copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
return copyFileToDisk(getFileSizeAndChecksum(file_name), destination_disk, destination_path, mode);
return copyFileToDisk(getFileSizeAndChecksum(file_name), destination_disk, destination_path, write_mode, write_settings);
}
size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path, WriteMode mode) const
size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const
{
if (open_mode != OpenMode::READ)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for reading");
if (size_and_checksum.first == 0)
{
/// Entry's data is empty.
if (write_mode == WriteMode::Rewrite)
{
/// Just create an empty file.
destination_disk->createFile(destination_path);
}
std::lock_guard lock{mutex};
++num_read_files;
return 0;
}
auto info_opt = coordination->getFileInfo(size_and_checksum);
if (!info_opt)
{
@ -710,14 +726,14 @@ size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, Dis
if (info.size && !info.base_size && !use_archives)
{
/// Data comes completely from this backup.
reader->copyFileToDisk(info.data_file_name, info.size, destination_disk, destination_path, mode);
reader->copyFileToDisk(info.data_file_name, info.size, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
else if (info.size && (info.size == info.base_size))
{
/// Data comes completely from the base backup (nothing comes from this backup).
base_backup->copyFileToDisk(std::pair{info.base_size, info.base_checksum}, destination_disk, destination_path, mode);
base_backup->copyFileToDisk(std::pair{info.base_size, info.base_checksum}, destination_disk, destination_path, write_mode, write_settings);
file_copied = true;
}
@ -732,7 +748,8 @@ size_t BackupImpl::copyFileToDisk(const SizeAndChecksum & size_and_checksum, Dis
{
/// Use the generic way to copy data. `readFile()` will update `num_read_files`.
auto read_buffer = readFile(size_and_checksum);
auto write_buffer = destination_disk->writeFile(destination_path, DBMS_DEFAULT_BUFFER_SIZE, mode);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(info.size, DBMS_DEFAULT_BUFFER_SIZE),
write_mode, write_settings);
copyData(*read_buffer, *write_buffer, info.size);
write_buffer->finalize();
}

View File

@ -75,8 +75,10 @@ public:
SizeAndChecksum getFileSizeAndChecksum(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) const override;
std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const override;
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path, WriteMode mode) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path, WriteMode mode) const override;
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode, const WriteSettings & write_settings) const override;
void writeFile(const String & file_name, BackupEntryPtr entry) override;
void finalizeWriting() override;
bool supportsWritingInMultipleThreads() const override { return !use_archives; }

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <Disks/WriteMode.h>
#include <IO/WriteSettings.h>
#include <memory>
#include <optional>
@ -103,8 +104,11 @@ public:
virtual std::unique_ptr<SeekableReadBuffer> readFile(const SizeAndChecksum & size_and_checksum) const = 0;
/// Copies a file from the backup to a specified destination disk. Returns the number of bytes written.
virtual size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path, WriteMode mode = WriteMode::Rewrite) const = 0;
virtual size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path, WriteMode mode = WriteMode::Rewrite) const = 0;
virtual size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
virtual size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite, const WriteSettings & write_settings = {}) const = 0;
/// Puts a new entry to the backup.
virtual void writeFile(const String & file_name, BackupEntryPtr entry) = 0;

View File

@ -68,9 +68,13 @@ public:
return disk.writeFile(path, buf_size, mode, settings);
}
void writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params) override
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override
{
return disk.writeFileUsingNativeCopy(path, mode, params);
disk.writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void removeFile(const std::string & path) override

View File

@ -38,11 +38,12 @@ void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const Strin
out->finalize();
}
void IDisk::writeFileUsingNativeCopy(const String &, WriteMode, const IParamsForNativeCopyToDisk &)
void IDisk::writeFileUsingCustomWriteObject(
const String &, WriteMode, std::function<size_t(const StoredObject &, WriteMode, const std::optional<ObjectAttributes> &)>)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `writeFileUsingNativeCopy()` is not implemented for disk: {}",
"Method `writeFileUsingCustomWriteObject()` is not implemented for disk: {}",
getDataSourceDescription().type);
}

View File

@ -97,15 +97,6 @@ public:
using SyncGuardPtr = std::unique_ptr<ISyncGuard>;
/**
* The parameters for IDisk::writeFileUsingNativeCopy().
*/
class IParamsForNativeCopyToDisk
{
public:
virtual ~IParamsForNativeCopyToDisk() = default;
};
/**
* A unit of storage persisting data and metadata.
* Abstract underlying storage technology.
@ -218,8 +209,14 @@ public:
WriteMode mode = WriteMode::Rewrite,
const WriteSettings & settings = {}) = 0;
/// Write a file using native copy, if supported.
virtual void writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params);
/// Write a file using a custom function to write an object to the disk's object storage.
/// This method is alternative to writeFile(), the difference is that writeFile() calls IObjectStorage::writeObject()
/// to write an object to the object storage while this method allows to specify a callback for that.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function);
/// Remove file. Throws exception if file doesn't exists or it's a directory.
/// Return whether file was finally removed. (For remote disks it is not always removed).

View File

@ -68,8 +68,12 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) = 0;
/// Write a file using native copy, if supported.
virtual void writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params) = 0;
/// Write a file using a custom function to write an object to the disk's object storage.
virtual void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) = 0;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
virtual void removeFile(const std::string & path) = 0;

View File

@ -577,11 +577,15 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
return result;
}
void DiskObjectStorage::writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params)
void DiskObjectStorage::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
LOG_TEST(log, "Write file: {}", path);
auto transaction = createObjectStorageTransaction();
return transaction->writeFileUsingNativeCopy(path, mode, params);
return transaction->writeFileUsingCustomWriteObject(path, mode, std::move(custom_write_object_function));
}
void DiskObjectStorage::applyNewSettings(

View File

@ -152,7 +152,11 @@ public:
WriteMode mode,
const WriteSettings & settings) override;
void writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params) override;
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;

View File

@ -670,8 +670,13 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
void DiskObjectStorageTransaction::writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params)
void DiskObjectStorageTransaction::writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function)
{
/// This function is a simplified and adapted version of DiskObjectStorageTransaction::writeFile().
auto blob_name = object_storage.generateBlobNameForPath(path);
std::optional<ObjectAttributes> object_attributes;
@ -688,25 +693,18 @@ void DiskObjectStorageTransaction::writeFileUsingNativeCopy(const String & path,
auto object = StoredObject::create(object_storage, fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
auto create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name] (size_t count)
{
if (mode == WriteMode::Rewrite)
tx->metadata_transaction->createMetadataFile(path, blob_name, count);
else
tx->metadata_transaction->addBlobToMetadata(path, blob_name, count);
tx->metadata_transaction->commit();
};
operations_to_execute.emplace_back(std::move(write_operation));
/// We always use mode Rewrite because we simulate append using metadata and different files
return object_storage.writeObjectUsingNativeCopy(
object,
WriteMode::Rewrite,
params,
object_attributes,
std::move(create_metadata_callback));
size_t object_size = std::move(custom_write_object_function)(object, WriteMode::Rewrite, object_attributes);
/// Create metadata (see create_metadata_callback in DiskObjectStorageTransaction::writeFile()).
if (mode == WriteMode::Rewrite)
metadata_transaction->createMetadataFile(path, blob_name, object_size);
else
metadata_transaction->addBlobToMetadata(path, blob_name, object_size);
metadata_transaction->commit();
}

View File

@ -99,7 +99,12 @@ public:
const WriteSettings & settings = {},
bool autocommit = true) override;
void writeFileUsingNativeCopy(const String & path, WriteMode mode, const IParamsForNativeCopyToDisk & params) override;
/// Write a file using a custom function to write an object to the disk's object storage.
void writeFileUsingCustomWriteObject(
const String & path,
WriteMode mode,
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
custom_write_object_function) override;
void removeFile(const std::string & path) override;
void removeFileIfExists(const std::string & path) override;

View File

@ -88,12 +88,4 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
return settings;
}
void IObjectStorage::writeObjectUsingNativeCopy(
const StoredObject &, WriteMode, const IParamsForNativeCopyToDisk &, std::optional<ObjectAttributes>, FinalizeCallback &&)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `writeObjectUsingNativeCopy()` is not implemented for disk: {}",
getDataSourceDescription().type);
}
}

View File

@ -25,7 +25,6 @@ namespace DB
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class IParamsForNativeCopyToDisk;
using ObjectAttributes = std::map<std::string, std::string>;
@ -41,6 +40,7 @@ struct RelativePathWithSize
};
using RelativePathsWithSize = std::vector<RelativePathWithSize>;
struct ObjectMetadata
{
uint64_t size_bytes;
@ -123,14 +123,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) = 0;
/// Write a file using native copy.
virtual void writeObjectUsingNativeCopy(
const StoredObject & object,
WriteMode mode,
const IParamsForNativeCopyToDisk & params,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {});
virtual bool isRemote() const = 0;
/// Remove object. Throws exception if object doesn't exists.

View File

@ -17,7 +17,6 @@
#include <IO/S3/copyS3File.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Disks/ObjectStorages/S3/S3ParamsForNativeCopyToDisk.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Common/getRandomASCIIString.h>
@ -204,41 +203,6 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
std::move(s3_buffer), std::move(finalize_callback), object.absolute_path);
}
void S3ObjectStorage::writeObjectUsingNativeCopy(
const StoredObject & object,
WriteMode mode,
const IParamsForNativeCopyToDisk & params,
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback)
{
if (mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
const auto * s3_params = typeid_cast<const S3ParamsForNativeCopyToDisk *>(&params);
if (!s3_params)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"S3ObjectStorage::writeObjectUsingNativeCopy() doesn't support params of type {}",
typeid(params).name());
auto client_ptr = client.get();
auto settings_ptr = s3_settings.get();
auto scheduler = s3_params->scheduler ? *s3_params->scheduler : threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
const auto & request_settings = s3_params->request_settings ? *s3_params->request_settings : settings_ptr->request_settings;
const String & src_bucket = s3_params->src_bucket;
const String & src_key = s3_params->src_key;
size_t src_size = s3_params->src_size;
if (src_size == static_cast<size_t>(-1))
src_size = S3::getObjectSize(*client_ptr, src_bucket, src_key, {}, request_settings);
copyS3File(client_ptr, src_bucket, src_key, 0, src_size, bucket, object.absolute_path,
request_settings, attributes, scheduler, /* for_disk_s3= */ true);
(std::move(finalize_callback))(src_size);
}
void S3ObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children, int max_keys) const
{
auto settings_ptr = s3_settings.get();

View File

@ -102,14 +102,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
/// Write a file using native copy.
void writeObjectUsingNativeCopy(
const StoredObject & object,
WriteMode mode,
const IParamsForNativeCopyToDisk & params,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {}) override;
void findAllFiles(const std::string & path, RelativePathsWithSize & children, int max_keys) const override;
void getDirectoryContents(const std::string & path,
RelativePathsWithSize & files,

View File

@ -1,26 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Disks/IDisk.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/StorageS3Settings.h>
namespace DB
{
class S3ParamsForNativeCopyToDisk : public IParamsForNativeCopyToDisk
{
public:
String src_bucket;
String src_key;
size_t src_size = static_cast<size_t>(-1);
std::optional<ThreadPoolCallbackRunner<void>> scheduler;
std::optional<S3Settings::RequestSettings> request_settings;
};
}
#endif

View File

@ -0,0 +1,69 @@
#include <Disks/ObjectStorages/S3/copyS3FileToDisk.h>
#if USE_AWS_S3
#include <IO/S3/getObjectInfo.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/S3/copyS3File.h>
namespace DB
{
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
const S3Settings::RequestSettings & request_settings,
ThreadPoolCallbackRunner<void> scheduler)
{
if (!src_offset)
src_offset = 0;
if (!src_size)
src_size = S3::getObjectSize(*s3_client, src_bucket, src_key, version_id.value_or(""), request_settings) - *src_offset;
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if (destination_data_source_description != DataSourceDescription{DataSourceType::S3, s3_client->getInitialEndpoint(), false, false})
{
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} through buffers", src_key, destination_disk->getName());
ReadBufferFromS3 read_buffer{s3_client, src_bucket, src_key, {}, request_settings, read_settings};
if (*src_offset)
read_buffer.seek(*src_offset, SEEK_SET);
auto write_buffer = destination_disk->writeFile(destination_path, std::min<size_t>(*src_size, DBMS_DEFAULT_BUFFER_SIZE), write_mode, write_settings);
copyData(read_buffer, *write_buffer, *src_size);
write_buffer->finalize();
return;
}
LOG_TRACE(&Poco::Logger::get("copyS3FileToDisk"), "Copying {} to disk {} using native copy", src_key, destination_disk->getName());
String destination_bucket = destination_disk->getObjectStorage()->getObjectsNamespace();
auto custom_write_object = [&](const StoredObject & object_, WriteMode write_mode_, const std::optional<ObjectAttributes> & object_attributes_) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
chassert(write_mode_ == WriteMode::Rewrite);
copyS3File(s3_client, src_bucket, src_key, *src_offset, *src_size, destination_bucket, /* destination_key= */ object_.absolute_path,
request_settings, object_attributes_, scheduler, /* for_disk_s3= */ true);
return *src_size;
};
destination_disk->writeFileUsingCustomWriteObject(destination_path, write_mode, custom_write_object);
}
}
#endif

View File

@ -0,0 +1,36 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Disks/IDisk.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
namespace DB
{
/// Copies an object from S3 bucket to a disk of any type.
/// Depending on the disk the function can either do copying though buffers
/// (i.e. download the object by portions and then write those portions to the specified disk),
/// or perform a server-side copy.
void copyS3FileToDisk(
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
const std::optional<String> & version_id,
std::optional<size_t> src_offset,
std::optional<size_t> src_size,
DiskPtr destination_disk,
const String & destination_path,
WriteMode write_mode = WriteMode::Rewrite,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {},
const S3Settings::RequestSettings & request_settings = {},
ThreadPoolCallbackRunner<void> scheduler = {});
}
#endif

View File

@ -123,9 +123,8 @@ Client::Client(
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && initial_endpoint.find(".amazonaws.com") != std::string::npos;
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
@ -133,6 +132,7 @@ Client::Client(
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
, initial_endpoint(other.initial_endpoint)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)

View File

@ -109,6 +109,9 @@ public:
}
}
/// Returns the initial endpoint.
const String & getInitialEndpoint() const { return initial_endpoint; }
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
@ -198,6 +201,8 @@ private:
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
String initial_endpoint;
std::string explicit_region;
mutable bool detect_region = true;

View File

@ -139,7 +139,7 @@ def test_backup_to_s3_native_copy():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("BackupImpl.*using native copy")
assert node.contains_in_log("BackupReaderS3.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
@ -153,7 +153,7 @@ def test_backup_to_s3_native_copy_other_bucket():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("BackupImpl.*using native copy")
assert node.contains_in_log("BackupReaderS3.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
@ -165,7 +165,7 @@ def test_backup_to_s3_native_copy_multipart():
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("BackupImpl.*using native copy")
assert node.contains_in_log("BackupReaderS3.*using native copy")
assert node.contains_in_log("copyS3FileToDisk.*using native copy")
assert node.contains_in_log(
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
)