Merge pull request #64153 from ClickHouse/fix-backup-without-native-copy

Correctly fallback during backup copy
This commit is contained in:
Antonio Andelic 2024-05-28 13:11:06 +00:00 committed by GitHub
commit 9f242391b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 330 additions and 62 deletions

View File

@ -188,6 +188,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
fs::path(s3_uri.key) / path_in_backup,
0,
file_size,
/* dest_s3_client= */ destination_disk->getS3StorageClient(),
/* dest_bucket= */ blob_path[1],
/* dest_key= */ blob_path[0],
s3_settings.request_settings,
@ -252,18 +253,20 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
{
LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName());
copyS3File(
client,
src_disk->getS3StorageClient(),
/* src_bucket */ blob_path[1],
/* src_key= */ blob_path[0],
start_pos,
length,
s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup,
/* dest_s3_client= */ client,
/* dest_bucket= */ s3_uri.bucket,
/* dest_key= */ fs::path(s3_uri.key) / path_in_backup,
s3_settings.request_settings,
read_settings,
blob_storage_log,
{},
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
/*for_disk_s3=*/false);
return; /// copied!
}
}
@ -281,8 +284,9 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
/* src_key= */ fs::path(s3_uri.key) / source,
0,
size,
s3_uri.bucket,
fs::path(s3_uri.key) / destination,
/* dest_s3_client= */ client,
/* dest_bucket= */ s3_uri.bucket,
/* dest_key= */ fs::path(s3_uri.key) / destination,
s3_settings.request_settings,
read_settings,
blob_storage_log,

View File

@ -350,6 +350,13 @@ public:
return delegate;
}
#if USE_AWS_S3
std::shared_ptr<const S3::Client> getS3StorageClient() const override
{
return delegate->getS3StorageClient();
}
#endif
private:
String wrappedPath(const String & path) const
{

View File

@ -14,7 +14,6 @@
#include <Disks/DirectoryIterator.h>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include <Poco/Timestamp.h>
@ -116,13 +115,18 @@ public:
/// Default constructor.
IDisk(const String & name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: name(name_)
, copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, CurrentMetrics::IDiskCopierThreadsScheduled, config.getUInt(config_prefix + ".thread_pool_size", 16))
, copying_thread_pool(
CurrentMetrics::IDiskCopierThreads,
CurrentMetrics::IDiskCopierThreadsActive,
CurrentMetrics::IDiskCopierThreadsScheduled,
config.getUInt(config_prefix + ".thread_pool_size", 16))
{
}
explicit IDisk(const String & name_)
: name(name_)
, copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, CurrentMetrics::IDiskCopierThreadsScheduled, 16)
, copying_thread_pool(
CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, CurrentMetrics::IDiskCopierThreadsScheduled, 16)
{
}
@ -466,6 +470,17 @@ public:
virtual DiskPtr getDelegateDiskIfExists() const { return nullptr; }
#if USE_AWS_S3
virtual std::shared_ptr<const S3::Client> getS3StorageClient() const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method getS3StorageClient() is not implemented for disk type: {}",
getDataSourceDescription().toString());
}
#endif
protected:
friend class DiskDecorator;

View File

@ -127,6 +127,13 @@ public:
}
#endif
#if USE_AWS_S3
std::shared_ptr<const S3::Client> getS3StorageClient() override
{
return object_storage->getS3StorageClient();
}
#endif
private:
FileCacheKey getCacheKey(const std::string & path) const;

View File

@ -582,6 +582,12 @@ UInt64 DiskObjectStorage::getRevision() const
return metadata_helper->getRevision();
}
#if USE_AWS_S3
std::shared_ptr<const S3::Client> DiskObjectStorage::getS3StorageClient() const
{
return object_storage->getS3StorageClient();
}
#endif
DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const
{

View File

@ -6,6 +6,8 @@
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Common/re2.h>
#include "config.h"
namespace CurrentMetrics
{
@ -210,6 +212,10 @@ public:
bool supportsChmod() const override { return metadata_storage->supportsChmod(); }
void chmod(const String & path, mode_t mode) override;
#if USE_AWS_S3
std::shared_ptr<const S3::Client> getS3StorageClient() const override;
#endif
private:
/// Create actual disk object storage transaction for operations

View File

@ -1,10 +1,10 @@
#pragma once
#include <filesystem>
#include <string>
#include <map>
#include <mutex>
#include <optional>
#include <filesystem>
#include <Poco/Timestamp.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -31,6 +31,10 @@
#include <azure/storage/blobs.hpp>
#endif
#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
namespace DB
{
@ -257,6 +261,13 @@ public:
}
#endif
#if USE_AWS_S3
virtual std::shared_ptr<const S3::Client> getS3StorageClient()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "This function is only implemented for S3ObjectStorage");
}
#endif
private:
mutable std::mutex throttlers_mutex;

View File

@ -495,13 +495,14 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
try
{
copyS3File(
current_client,
uri.bucket,
object_from.remote_path,
0,
size,
dest_s3->uri.bucket,
object_to.remote_path,
/*src_s3_client=*/current_client,
/*src_bucket=*/uri.bucket,
/*src_key=*/object_from.remote_path,
/*src_offset=*/0,
/*src_size=*/size,
/*dest_s3_client=*/current_client,
/*dest_bucket=*/dest_s3->uri.bucket,
/*dest_key=*/object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
@ -535,13 +536,15 @@ void S3ObjectStorage::copyObject( // NOLINT
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings);
auto scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(current_client,
uri.bucket,
object_from.remote_path,
0,
size,
uri.bucket,
object_to.remote_path,
copyS3File(
/*src_s3_client=*/current_client,
/*src_bucket=*/uri.bucket,
/*src_key=*/object_from.remote_path,
/*src_offset=*/0,
/*src_size=*/size,
/*dest_s3_client=*/current_client,
/*dest_bucket=*/uri.bucket,
/*dest_key=*/object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
@ -617,6 +620,11 @@ ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string & p
return key_generator->generate(path, /* is_directory */ false);
}
std::shared_ptr<const S3::Client> S3ObjectStorage::getS3StorageClient()
{
return client.get();
}
}
#endif

View File

@ -168,6 +168,7 @@ public:
bool isReadOnly() const override { return s3_settings.get()->read_only; }
std::shared_ptr<const S3::Client> getS3StorageClient() override;
private:
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);

View File

@ -3,6 +3,8 @@
#include "config.h"
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <filesystem>
#include <shared_mutex>
namespace Poco

View File

@ -652,14 +652,25 @@ namespace
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, blob_storage_log_, getLogger("copyS3File"))
BlobStorageLogWriterPtr blob_storage_log_,
std::function<void()> fallback_method_)
: UploadHelper(
client_ptr_,
dest_bucket_,
dest_key_,
request_settings_,
object_metadata_,
schedule_,
for_disk_s3_,
blob_storage_log_,
getLogger("copyS3File"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
, size(src_size_)
, supports_multipart_copy(client_ptr_->supportsMultiPartCopy())
, read_settings(read_settings_)
, fallback_method(std::move(fallback_method_))
{
}
@ -682,14 +693,7 @@ namespace
size_t size;
bool supports_multipart_copy;
const ReadSettings read_settings;
CreateReadBuffer getSourceObjectReadBuffer()
{
return [&]
{
return std::make_unique<ReadBufferFromS3>(client_ptr, src_bucket, src_key, "", request_settings, read_settings);
};
}
std::function<void()> fallback_method;
void performSingleOperationCopy()
{
@ -744,28 +748,21 @@ namespace
if (outcome.GetError().GetExceptionName() == "EntityTooLarge" ||
outcome.GetError().GetExceptionName() == "InvalidRequest" ||
outcome.GetError().GetExceptionName() == "InvalidArgument" ||
outcome.GetError().GetExceptionName() == "AccessDenied" ||
(outcome.GetError().GetExceptionName() == "InternalError" &&
outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::GATEWAY_TIMEOUT &&
outcome.GetError().GetMessage().contains("use the Rewrite method in the JSON API")))
{
if (!supports_multipart_copy)
if (!supports_multipart_copy || outcome.GetError().GetExceptionName() == "AccessDenied")
{
LOG_INFO(log, "Multipart upload using copy is not supported, will try regular upload for Bucket: {}, Key: {}, Object size: {}",
dest_bucket,
dest_key,
size);
copyDataToS3File(
getSourceObjectReadBuffer(),
offset,
size,
client_ptr,
LOG_INFO(
log,
"Multipart upload using copy is not supported, will try regular upload for Bucket: {}, Key: {}, Object size: "
"{}",
dest_bucket,
dest_key,
request_settings,
blob_storage_log,
object_metadata,
schedule,
for_disk_s3);
size);
fallback_method();
break;
}
else
@ -859,17 +856,29 @@ void copyDataToS3File(
ThreadPoolCallbackRunnerUnsafe<void> schedule,
bool for_disk_s3)
{
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3, blob_storage_log};
CopyDataToFileHelper helper{
create_read_buffer,
offset,
size,
dest_s3_client,
dest_bucket,
dest_key,
settings,
object_metadata,
schedule,
for_disk_s3,
blob_storage_log};
helper.performCopy();
}
void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & src_s3_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,
size_t src_size,
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
@ -879,19 +888,50 @@ void copyS3File(
ThreadPoolCallbackRunnerUnsafe<void> schedule,
bool for_disk_s3)
{
if (settings.allow_native_copy)
if (!dest_s3_client)
dest_s3_client = src_s3_client;
std::function<void()> fallback_method = [&]
{
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3, blob_storage_log};
helper.performCopy();
}
else
auto create_read_buffer
= [&] { return std::make_unique<ReadBufferFromS3>(src_s3_client, src_bucket, src_key, "", settings, read_settings); };
copyDataToS3File(
create_read_buffer,
src_offset,
src_size,
dest_s3_client,
dest_bucket,
dest_key,
settings,
blob_storage_log,
object_metadata,
schedule,
for_disk_s3);
};
if (!settings.allow_native_copy)
{
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, read_settings);
};
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, blob_storage_log, object_metadata, schedule, for_disk_s3);
fallback_method();
return;
}
CopyFileHelper helper{
src_s3_client,
src_bucket,
src_key,
src_offset,
src_size,
dest_bucket,
dest_key,
settings,
read_settings,
object_metadata,
schedule,
for_disk_s3,
blob_storage_log,
std::move(fallback_method)};
helper.performCopy();
}
}

View File

@ -31,11 +31,12 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
///
/// read_settings - is used for throttling in case of native copy is not possible
void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & src_s3_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,
size_t src_size,
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,

View File

@ -513,6 +513,7 @@ class ClickHouseCluster:
self.minio_redirect_host = "proxy1"
self.minio_redirect_ip = None
self.minio_redirect_port = 8080
self.minio_docker_id = self.get_instance_docker_id(self.minio_host)
self.spark_session = None

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<clickhouse>
<storage_configuration>
<disks>
<disk_s3_restricted_user>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/disks/disk_s3_restricted_user/</endpoint>
<access_key_id>miniorestricted1</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3_restricted_user>
</disks>
<policies>
<policy_s3_restricted>
<volumes>
<main>
<disk>disk_s3_restricted_user</disk>
</main>
</volumes>
</policy_s3_restricted>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -3,8 +3,11 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
import uuid
import os
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
@ -20,13 +23,127 @@ node = cluster.add_instance(
],
with_minio=True,
with_zookeeper=True,
stay_alive=True,
)
def setup_minio_users():
# create 2 extra users with restricted access
# miniorestricted1 - full access to bucket 'root', no access to other buckets
# miniorestricted2 - full access to bucket 'root2', no access to other buckets
# storage policy 'policy_s3_restricted' defines a policy for storing files inside bucket 'root' using 'miniorestricted1' user
for user, bucket in [("miniorestricted1", "root"), ("miniorestricted2", "root2")]:
print(
cluster.exec_in_container(
cluster.minio_docker_id,
[
"mc",
"alias",
"set",
"root",
"http://minio1:9001",
"minio",
"minio123",
],
)
)
policy = f"""
{{
"Version": "2012-10-17",
"Statement": [
{{
"Effect": "Allow",
"Principal": {{
"AWS": [
"*"
]
}},
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket",
"s3:ListBucketMultipartUploads"
],
"Resource": [
"arn:aws:s3:::{bucket}"
]
}},
{{
"Effect": "Allow",
"Principal": {{
"AWS": [
"*"
]
}},
"Action": [
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:GetObject",
"s3:ListMultipartUploadParts",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::{bucket}/*"
]
}}
]
}}"""
cluster.exec_in_container(
cluster.minio_docker_id,
["bash", "-c", f"cat >/tmp/{bucket}_policy.json <<EOL{policy}"],
)
cluster.exec_in_container(
cluster.minio_docker_id, ["cat", f"/tmp/{bucket}_policy.json"]
)
print(
cluster.exec_in_container(
cluster.minio_docker_id,
["mc", "admin", "user", "add", "root", user, "minio123"],
)
)
print(
cluster.exec_in_container(
cluster.minio_docker_id,
[
"mc",
"admin",
"policy",
"create",
"root",
f"{bucket}only",
f"/tmp/{bucket}_policy.json",
],
)
)
print(
cluster.exec_in_container(
cluster.minio_docker_id,
[
"mc",
"admin",
"policy",
"attach",
"root",
f"{bucket}only",
"--user",
user,
],
)
)
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "disk_s3_restricted_user.xml"),
"/etc/clickhouse-server/config.d/disk_s3_restricted_user.xml",
)
node.start_clickhouse()
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
setup_minio_users()
yield
finally:
cluster.shutdown()
@ -137,6 +254,7 @@ def check_system_tables(backup_query_id=None):
("disk_s3_cache", "ObjectStorage", "S3", "Local"),
("disk_s3_other_bucket", "ObjectStorage", "S3", "Local"),
("disk_s3_plain", "ObjectStorage", "S3", "Plain"),
("disk_s3_restricted_user", "ObjectStorage", "S3", "Local"),
)
assert len(expected_disks) == len(disks)
for expected_disk in expected_disks:
@ -588,3 +706,22 @@ def test_user_specific_auth(start_cluster):
)
node.query("DROP TABLE IF EXISTS test.specific_auth")
def test_backup_to_s3_different_credentials():
storage_policy = "policy_s3_restricted"
def check_backup_restore(allow_s3_native_copy):
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root2/data/backups/{backup_name}', 'miniorestricted2', 'minio123')"
settings = {"allow_s3_native_copy": allow_s3_native_copy}
(backup_events, _) = check_backup_and_restore(
storage_policy,
backup_destination,
backup_settings=settings,
restore_settings=settings,
)
check_system_tables(backup_events["query_id"])
check_backup_restore(False)
check_backup_restore(True)