Address PR review

This commit is contained in:
Antonio Andelic 2024-05-27 08:07:05 +00:00
parent 4f165733a4
commit 8f775037bf
10 changed files with 72 additions and 30 deletions

View File

@ -188,6 +188,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
fs::path(s3_uri.key) / path_in_backup,
0,
file_size,
/* dest_s3_client= */ destination_disk->getObjectStorage()->getS3StorageClient(),
/* dest_bucket= */ blob_path[1],
/* dest_key= */ blob_path[0],
s3_settings.request_settings,
@ -195,8 +196,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
blob_storage_log,
object_attributes,
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
/* for_disk_s3= */ true,
destination_disk->getObjectStorage()->getS3StorageClient());
/* for_disk_s3= */ true);
return file_size;
};
@ -258,15 +258,15 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
/* src_key= */ blob_path[0],
start_pos,
length,
s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup,
/* dest_s3_client= */ client,
/* dest_bucket= */ s3_uri.bucket,
/* dest_key= */ fs::path(s3_uri.key) / path_in_backup,
s3_settings.request_settings,
read_settings,
blob_storage_log,
{},
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
/*for_disk_s3=*/false,
client);
/*for_disk_s3=*/false);
return; /// copied!
}
}
@ -284,8 +284,9 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
/* src_key= */ fs::path(s3_uri.key) / source,
0,
size,
s3_uri.bucket,
fs::path(s3_uri.key) / destination,
/* dest_s3_client= */ client,
/* dest_bucket= */ s3_uri.bucket,
/* dest_key= */ fs::path(s3_uri.key) / destination,
s3_settings.request_settings,
read_settings,
blob_storage_log,

View File

@ -350,10 +350,12 @@ public:
return delegate;
}
ObjectStoragePtr getObjectStorage() override
#if USE_AWS_S3
std::shared_ptr<const S3::Client> getS3StorageClient() const override
{
return delegate->getObjectStorage();
return delegate->getS3StorageClient();
}
#endif
private:
String wrappedPath(const String & path) const

View File

@ -14,7 +14,6 @@
#include <Disks/DirectoryIterator.h>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include <Poco/Timestamp.h>
@ -471,6 +470,17 @@ public:
virtual DiskPtr getDelegateDiskIfExists() const { return nullptr; }
#if USE_AWS_S3
virtual std::shared_ptr<const S3::Client> getS3StorageClient() const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method getS3StorageClient() is not implemented for disk type: {}",
getDataSourceDescription().toString());
}
#endif
protected:
friend class DiskDecorator;

View File

@ -582,6 +582,12 @@ UInt64 DiskObjectStorage::getRevision() const
return metadata_helper->getRevision();
}
#if USE_AWS_S3
std::shared_ptr<const S3::Client> DiskObjectStorage::getS3StorageClient() const
{
return object_storage->getS3StorageClient();
}
#endif
DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const
{

View File

@ -6,6 +6,8 @@
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Common/re2.h>
#include "config.h"
namespace CurrentMetrics
{
@ -210,6 +212,10 @@ public:
bool supportsChmod() const override { return metadata_storage->supportsChmod(); }
void chmod(const String & path, mode_t mode) override;
#if USE_AWS_S3
std::shared_ptr<const S3::Client> getS3StorageClient() const override;
#endif
private:
/// Create actual disk object storage transaction for operations

View File

@ -4,6 +4,7 @@
#include <map>
#include <mutex>
#include <optional>
#include <filesystem>
#include <Poco/Timestamp.h>
#include <Poco/Util/AbstractConfiguration.h>

View File

@ -495,13 +495,14 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
try
{
copyS3File(
current_client,
uri.bucket,
object_from.remote_path,
0,
size,
dest_s3->uri.bucket,
object_to.remote_path,
/*src_s3_client=*/current_client,
/*src_bucket=*/uri.bucket,
/*src_key=*/object_from.remote_path,
/*src_offset=*/0,
/*src_size=*/size,
/*dest_s3_client=*/current_client,
/*dest_bucket=*/dest_s3->uri.bucket,
/*dest_key=*/object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
@ -535,13 +536,15 @@ void S3ObjectStorage::copyObject( // NOLINT
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings);
auto scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(current_client,
uri.bucket,
object_from.remote_path,
0,
size,
uri.bucket,
object_to.remote_path,
copyS3File(
/*src_s3_client=*/current_client,
/*src_bucket=*/uri.bucket,
/*src_key=*/object_from.remote_path,
/*src_offset=*/0,
/*src_size=*/size,
/*dest_s3_client=*/current_client,
/*dest_bucket=*/uri.bucket,
/*dest_key=*/object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),

View File

@ -654,7 +654,16 @@ namespace
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_,
std::function<void()> fallback_method_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, blob_storage_log_, getLogger("copyS3File"))
: UploadHelper(
client_ptr_,
dest_bucket_,
dest_key_,
request_settings_,
object_metadata_,
schedule_,
for_disk_s3_,
blob_storage_log_,
getLogger("copyS3File"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
@ -869,6 +878,7 @@ void copyS3File(
const String & src_key,
size_t src_offset,
size_t src_size,
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
@ -876,8 +886,7 @@ void copyS3File(
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunnerUnsafe<void> schedule,
bool for_disk_s3,
std::shared_ptr<const S3::Client> dest_s3_client)
bool for_disk_s3)
{
if (!dest_s3_client)
dest_s3_client = src_s3_client;

View File

@ -36,6 +36,7 @@ void copyS3File(
const String & src_key,
size_t src_offset,
size_t src_size,
std::shared_ptr<const S3::Client> dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
@ -43,8 +44,7 @@ void copyS3File(
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},
bool for_disk_s3 = false,
std::shared_ptr<const S3::Client> dest_s3_client = nullptr);
bool for_disk_s3 = false);
/// Copies data from any seekable source to S3.
/// The same functionality can be done by using the function copyData() and the class WriteBufferFromS3

View File

@ -28,6 +28,10 @@ node = cluster.add_instance(
def setup_minio_users():
# create 2 extra users with restricted access
# miniorestricted1 - full access to bucket 'root', no access to other buckets
# miniorestricted2 - full access to bucket 'root2', no access to other buckets
# storage policy 'policy_s3_restricted' defines a policy for storing files inside bucket 'root' using 'miniorestricted1' user
for user, bucket in [("miniorestricted1", "root"), ("miniorestricted2", "root2")]:
print(
cluster.exec_in_container(