mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #56744 from MikhailBurdukov/native_copy_for_s3_disks
Enabled s3 `copyObject` for copy between s3 disks.
This commit is contained in:
commit
0dfe530a7f
@ -406,7 +406,7 @@ RESTORE TABLE data AS data_restored FROM Disk('s3_plain', 'cloud_backup');
|
||||
:::note
|
||||
But keep in mind that:
|
||||
- This disk should not be used for `MergeTree` itself, only for `BACKUP`/`RESTORE`
|
||||
- If your tables are backed by S3 storage, it doesn't use `CopyObject` calls to copy parts to the destination bucket, instead, it downloads and uploads them, which is very inefficient. Prefer to use `BACKUP ... TO S3(<endpoint>)` syntax for this use-case.
|
||||
- If your tables are backed by S3 storage and types of the disks are different, it doesn't use `CopyObject` calls to copy parts to the destination bucket, instead, it downloads and uploads them, which is very inefficient. Prefer to use `BACKUP ... TO S3(<endpoint>)` syntax for this use-case.
|
||||
:::
|
||||
|
||||
## Alternatives
|
||||
|
@ -115,6 +115,7 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/LocalDirectorySyncGuard.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/TemporaryFileOnDisk.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/loadLocalDiskConfig.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/DiskType.cpp
|
||||
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/IObjectStorage.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp
|
||||
|
@ -46,6 +46,17 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
|
||||
send_metadata ? metadata_helper.get() : nullptr);
|
||||
}
|
||||
|
||||
DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk)
|
||||
{
|
||||
return std::make_shared<MultipleDisksObjectStorageTransaction>(
|
||||
*object_storage,
|
||||
*metadata_storage,
|
||||
*to_disk.getObjectStorage(),
|
||||
*to_disk.getMetadataStorage(),
|
||||
send_metadata ? metadata_helper.get() : nullptr);
|
||||
}
|
||||
|
||||
|
||||
DiskObjectStorage::DiskObjectStorage(
|
||||
const String & name_,
|
||||
const String & object_key_prefix_,
|
||||
@ -179,12 +190,13 @@ void DiskObjectStorage::copyFile( /// NOLINT
|
||||
const std::function<void()> & cancellation_hook
|
||||
)
|
||||
{
|
||||
if (this == &to_disk)
|
||||
if (getDataSourceDescription() == to_disk.getDataSourceDescription())
|
||||
{
|
||||
/// It may use s3-server-side copy
|
||||
auto transaction = createObjectStorageTransaction();
|
||||
transaction->copyFile(from_file_path, to_file_path);
|
||||
transaction->commit();
|
||||
/// It may use s3-server-side copy
|
||||
auto & to_disk_object_storage = dynamic_cast<DiskObjectStorage &>(to_disk);
|
||||
auto transaction = createObjectStorageTransactionToAnotherDisk(to_disk_object_storage);
|
||||
transaction->copyFile(from_file_path, to_file_path);
|
||||
transaction->commit();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -222,6 +222,7 @@ private:
|
||||
/// Create actual disk object storage transaction for operations
|
||||
/// execution.
|
||||
DiskTransactionPtr createObjectStorageTransaction();
|
||||
DiskTransactionPtr createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk);
|
||||
|
||||
String getReadResourceName() const;
|
||||
String getWriteResourceName() const;
|
||||
|
@ -38,6 +38,29 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction(
|
||||
, metadata_helper(metadata_helper_)
|
||||
{}
|
||||
|
||||
|
||||
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
|
||||
IObjectStorage & object_storage_,
|
||||
IMetadataStorage & metadata_storage_,
|
||||
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
|
||||
MetadataTransactionPtr metadata_transaction_)
|
||||
: object_storage(object_storage_)
|
||||
, metadata_storage(metadata_storage_)
|
||||
, metadata_transaction(metadata_transaction_)
|
||||
, metadata_helper(metadata_helper_)
|
||||
{}
|
||||
|
||||
MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction(
|
||||
IObjectStorage & object_storage_,
|
||||
IMetadataStorage & metadata_storage_,
|
||||
IObjectStorage& destination_object_storage_,
|
||||
IMetadataStorage& destination_metadata_storage_,
|
||||
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
|
||||
: DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction())
|
||||
, destination_object_storage(destination_object_storage_)
|
||||
, destination_metadata_storage(destination_metadata_storage_)
|
||||
{}
|
||||
|
||||
namespace
|
||||
{
|
||||
/// Operation which affects only metadata. Simplest way to
|
||||
@ -485,10 +508,12 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
std::string to_path;
|
||||
|
||||
StoredObjects created_objects;
|
||||
IObjectStorage& destination_object_storage;
|
||||
|
||||
CopyFileObjectStorageOperation(
|
||||
IObjectStorage & object_storage_,
|
||||
IMetadataStorage & metadata_storage_,
|
||||
IObjectStorage & destination_object_storage_,
|
||||
const ReadSettings & read_settings_,
|
||||
const WriteSettings & write_settings_,
|
||||
const std::string & from_path_,
|
||||
@ -498,6 +523,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
, write_settings(write_settings_)
|
||||
, from_path(from_path_)
|
||||
, to_path(to_path_)
|
||||
, destination_object_storage(destination_object_storage_)
|
||||
{}
|
||||
|
||||
std::string getInfoForLog() const override
|
||||
@ -515,7 +541,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
auto object_key = object_storage.generateObjectKeyForPath(to_path);
|
||||
auto object_to = StoredObject(object_key.serialize());
|
||||
|
||||
object_storage.copyObject(object_from, object_to, read_settings, write_settings);
|
||||
object_storage.copyObjectToAnotherObjectStorage(object_from, object_to,read_settings,write_settings, destination_object_storage);
|
||||
|
||||
tx->addBlobToMetadata(to_path, object_key, object_from.bytes_size);
|
||||
|
||||
@ -526,7 +552,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
|
||||
void undo() override
|
||||
{
|
||||
for (const auto & object : created_objects)
|
||||
object_storage.removeObject(object);
|
||||
destination_object_storage.removeObject(object);
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
@ -859,7 +885,13 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
|
||||
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
|
||||
{
|
||||
operations_to_execute.emplace_back(
|
||||
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, object_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
}
|
||||
|
||||
void MultipleDisksObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
|
||||
{
|
||||
operations_to_execute.emplace_back(
|
||||
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, destination_object_storage, read_settings, write_settings, from_file_path, to_file_path));
|
||||
}
|
||||
|
||||
void DiskObjectStorageTransaction::commit()
|
||||
|
@ -50,9 +50,9 @@ using DiskObjectStorageOperations = std::vector<DiskObjectStorageOperation>;
|
||||
///
|
||||
/// If something wrong happen on step 1 or 2 reverts all applied operations.
|
||||
/// If finalize failed -- nothing is reverted, garbage is left in blob storage.
|
||||
struct DiskObjectStorageTransaction final : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
|
||||
struct DiskObjectStorageTransaction : public IDiskTransaction, std::enable_shared_from_this<DiskObjectStorageTransaction>
|
||||
{
|
||||
private:
|
||||
protected:
|
||||
IObjectStorage & object_storage;
|
||||
IMetadataStorage & metadata_storage;
|
||||
|
||||
@ -63,6 +63,12 @@ private:
|
||||
|
||||
DiskObjectStorageOperations operations_to_execute;
|
||||
|
||||
DiskObjectStorageTransaction(
|
||||
IObjectStorage & object_storage_,
|
||||
IMetadataStorage & metadata_storage_,
|
||||
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
|
||||
MetadataTransactionPtr metadata_transaction_);
|
||||
|
||||
public:
|
||||
DiskObjectStorageTransaction(
|
||||
IObjectStorage & object_storage_,
|
||||
@ -118,6 +124,21 @@ public:
|
||||
void createHardLink(const std::string & src_path, const std::string & dst_path) override;
|
||||
};
|
||||
|
||||
struct MultipleDisksObjectStorageTransaction final : public DiskObjectStorageTransaction, std::enable_shared_from_this<MultipleDisksObjectStorageTransaction>
|
||||
{
|
||||
IObjectStorage& destination_object_storage;
|
||||
IMetadataStorage& destination_metadata_storage;
|
||||
|
||||
MultipleDisksObjectStorageTransaction(
|
||||
IObjectStorage & object_storage_,
|
||||
IMetadataStorage & metadata_storage_,
|
||||
IObjectStorage& destination_object_storage,
|
||||
IMetadataStorage& destination_metadata_storage,
|
||||
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);
|
||||
|
||||
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
|
||||
};
|
||||
|
||||
using DiskObjectStorageTransactionPtr = std::shared_ptr<DiskObjectStorageTransaction>;
|
||||
|
||||
}
|
||||
|
@ -458,27 +458,39 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
/// Shortcut for S3
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
auto client_ = client.get();
|
||||
auto client_ = dest_s3->client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(
|
||||
client.get(),
|
||||
bucket,
|
||||
object_from.remote_path,
|
||||
0,
|
||||
size,
|
||||
dest_s3->bucket,
|
||||
object_to.remote_path,
|
||||
settings_ptr->request_settings,
|
||||
patchSettings(read_settings),
|
||||
BlobStorageLogWriter::create(disk_name),
|
||||
object_to_attributes,
|
||||
scheduler,
|
||||
/* for_disk_s3= */ true);
|
||||
try {
|
||||
copyS3File(
|
||||
client_,
|
||||
bucket,
|
||||
object_from.remote_path,
|
||||
0,
|
||||
size,
|
||||
dest_s3->bucket,
|
||||
object_to.remote_path,
|
||||
settings_ptr->request_settings,
|
||||
patchSettings(read_settings),
|
||||
BlobStorageLogWriter::create(disk_name),
|
||||
object_to_attributes,
|
||||
scheduler,
|
||||
/* for_disk_s3= */ true);
|
||||
return;
|
||||
}
|
||||
catch (S3Exception & exc)
|
||||
{
|
||||
/// If authentication/permissions error occurs then fallthrough to copy with buffer.
|
||||
if (exc.getS3ErrorCode() != Aws::S3::S3Errors::ACCESS_DENIED)
|
||||
throw;
|
||||
LOG_WARNING(&Poco::Logger::get("S3ObjectStorage"),
|
||||
"S3-server-side copy object from the disk {} to the disk {} can not be performed: {}\n",
|
||||
getName(), dest_s3->getName(), exc.what());
|
||||
}
|
||||
}
|
||||
else
|
||||
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
|
||||
|
||||
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::copyObject( // NOLINT
|
||||
|
@ -58,5 +58,6 @@
|
||||
<allowed_disk>disk_s3</allowed_disk>
|
||||
<allowed_disk>disk_s3_plain</allowed_disk>
|
||||
<allowed_disk>disk_s3_cache</allowed_disk>
|
||||
<allowed_disk>disk_s3_other_bucket</allowed_disk>
|
||||
</backups>
|
||||
</clickhouse>
|
||||
|
@ -184,6 +184,32 @@ def test_backup_to_disk(storage_policy, to_disk):
|
||||
check_backup_and_restore(storage_policy, backup_destination)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"storage_policy, to_disk",
|
||||
[
|
||||
pytest.param(
|
||||
"policy_s3",
|
||||
"disk_s3_other_bucket",
|
||||
id="from_s3_to_s3",
|
||||
),
|
||||
pytest.param(
|
||||
"policy_s3_other_bucket",
|
||||
"disk_s3",
|
||||
id="from_s3_to_s3_other_bucket",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_backup_from_s3_to_s3_disk_native_copy(storage_policy, to_disk):
|
||||
backup_name = new_backup_name()
|
||||
backup_destination = f"Disk('{to_disk}', '{backup_name}')"
|
||||
(backup_events, restore_events) = check_backup_and_restore(
|
||||
storage_policy, backup_destination
|
||||
)
|
||||
|
||||
assert backup_events["S3CopyObject"] > 0
|
||||
assert restore_events["S3CopyObject"] > 0
|
||||
|
||||
|
||||
def test_backup_to_s3():
|
||||
storage_policy = "default"
|
||||
backup_name = new_backup_name()
|
||||
|
@ -2,3 +2,5 @@ s3_plain_native_copy
|
||||
Single operation copy has completed.
|
||||
s3_plain_no_native_copy
|
||||
Single part upload has completed.
|
||||
copy from s3_plain_native_copy to s3_plain_another
|
||||
Single operation copy has completed.
|
||||
|
@ -24,5 +24,20 @@ function run_test_for_disk()
|
||||
clickhouse-disks -C "$config" --disk "$disk" remove $CLICKHOUSE_DATABASE/test.copy
|
||||
}
|
||||
|
||||
function run_test_copy_from_s3_to_s3(){
|
||||
local disk_src=$1 && shift
|
||||
local disk_dest=$1 && shift
|
||||
|
||||
echo "copy from $disk_src to $disk_dest"
|
||||
clickhouse-disks -C "$config" --disk "$disk_src" write --input "$config" $CLICKHOUSE_DATABASE/test
|
||||
|
||||
clickhouse-disks -C "$config" --log-level test copy --disk-from "$disk_src" --disk-to "$disk_dest" $CLICKHOUSE_DATABASE/test $CLICKHOUSE_DATABASE/test.copy |& {
|
||||
grep -o -e "Single part upload has completed." -e "Single operation copy has completed."
|
||||
}
|
||||
clickhouse-disks -C "$config" --disk "$disk_dest" remove $CLICKHOUSE_DATABASE/test.copy/test
|
||||
clickhouse-disks -C "$config" --disk "$disk_dest" remove $CLICKHOUSE_DATABASE/test.copy
|
||||
}
|
||||
|
||||
run_test_for_disk s3_plain_native_copy
|
||||
run_test_for_disk s3_plain_no_native_copy
|
||||
run_test_copy_from_s3_to_s3 s3_plain_native_copy s3_plain_another
|
||||
|
@ -8,6 +8,13 @@
|
||||
<secret_access_key>clickhouse</secret_access_key>
|
||||
<s3_allow_native_copy>true</s3_allow_native_copy>
|
||||
</s3_plain_native_copy>
|
||||
<s3_plain_another>
|
||||
<type>s3_plain</type>
|
||||
<endpoint>http://localhost:11111/test/clickhouse-disks/</endpoint>
|
||||
<access_key_id>clickhouse</access_key_id>
|
||||
<secret_access_key>clickhouse</secret_access_key>
|
||||
<s3_allow_native_copy>true</s3_allow_native_copy>
|
||||
</s3_plain_another>
|
||||
|
||||
<s3_plain_no_native_copy>
|
||||
<type>s3_plain</type>
|
||||
|
Loading…
Reference in New Issue
Block a user