mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 09:52:38 +00:00
Updated to use MultiVersion for BlobContainerClient in Backups and updated to get client from disk
This commit is contained in:
parent
df221f7db6
commit
91bad5bc39
@ -27,8 +27,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
//using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
|
||||
|
||||
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
|
||||
StorageAzureBlob::Configuration configuration_,
|
||||
const ReadSettings & read_settings_,
|
||||
@ -38,12 +36,13 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
|
||||
, data_source_description{DataSourceType::AzureBlobStorage, configuration_.container, false, false}
|
||||
, configuration(configuration_)
|
||||
{
|
||||
client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
settings = StorageAzureBlob::createSettingsAsSharedPtr(context_);
|
||||
auto settings_as_unique_ptr = StorageAzureBlob::createSettings(context_);
|
||||
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
|
||||
std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(*client.get()),
|
||||
std::move(client_ptr),
|
||||
std::move(settings_as_unique_ptr));
|
||||
client = object_storage->getClient();
|
||||
}
|
||||
|
||||
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
|
||||
@ -89,7 +88,7 @@ std::unique_ptr<SeekableReadBuffer> BackupReaderAzureBlobStorage::readFile(const
|
||||
key = file_name;
|
||||
}
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(
|
||||
client, key, read_settings, settings->max_single_read_retries,
|
||||
client.get(), key, read_settings, settings->max_single_read_retries,
|
||||
settings->max_single_download_retries);
|
||||
}
|
||||
|
||||
@ -113,23 +112,9 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
||||
"Blob writing function called with unexpected blob_path.size={} or mode={}",
|
||||
blob_path.size(), mode);
|
||||
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client;
|
||||
if (configuration.container == blob_path[1])
|
||||
{
|
||||
dest_client = client;
|
||||
}
|
||||
else
|
||||
{
|
||||
StorageAzureBlob::Configuration dest_configuration = configuration;
|
||||
dest_configuration.container = blob_path[1];
|
||||
dest_configuration.blob_path = blob_path[0];
|
||||
dest_client = StorageAzureBlob::createClient(dest_configuration, /* is_read_only */ false);
|
||||
}
|
||||
|
||||
|
||||
copyAzureBlobStorageFile(
|
||||
client,
|
||||
dest_client,
|
||||
reinterpret_cast<AzureObjectStorage *>(destination_disk->getObjectStorage().get())->getClient(),
|
||||
configuration.container,
|
||||
fs::path(configuration.blob_path) / path_in_backup,
|
||||
0,
|
||||
@ -163,12 +148,13 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
||||
, data_source_description{DataSourceType::AzureBlobStorage,configuration_.container, false, false}
|
||||
, configuration(configuration_)
|
||||
{
|
||||
client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
settings = StorageAzureBlob::createSettingsAsSharedPtr(context_);
|
||||
auto settings_as_unique_ptr = StorageAzureBlob::createSettings(context_);
|
||||
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
|
||||
std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(*client.get()),
|
||||
std::move(settings_as_unique_ptr));
|
||||
std::move(client_ptr),
|
||||
std::move(settings_as_unique_ptr));
|
||||
client = object_storage->getClient();
|
||||
}
|
||||
|
||||
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
|
||||
@ -182,23 +168,9 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
|
||||
/// In this case we can't use the native copy.
|
||||
if (auto blob_path = src_disk->getBlobPath(src_path); blob_path.size() == 2)
|
||||
{
|
||||
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client;
|
||||
if (configuration.container == blob_path[1])
|
||||
{
|
||||
src_client = client;
|
||||
}
|
||||
else
|
||||
{
|
||||
StorageAzureBlob::Configuration src_configuration = configuration;
|
||||
src_configuration.container = blob_path[1];
|
||||
src_configuration.blob_path = blob_path[0];
|
||||
src_client = StorageAzureBlob::createClient(src_configuration, /* is_read_only */ false);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Copying file {} from disk {} to AzureBlobStorag", src_path, src_disk->getName());
|
||||
copyAzureBlobStorageFile(
|
||||
src_client,
|
||||
reinterpret_cast<AzureObjectStorage *>(src_disk->getObjectStorage().get())->getClient(),
|
||||
client,
|
||||
/* src_container */ blob_path[1],
|
||||
/* src_path */ blob_path[0],
|
||||
@ -220,26 +192,16 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
|
||||
|
||||
void BackupWriterAzureBlobStorage::copyFile(const String & destination, const String & source, size_t size)
|
||||
{
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client;
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client;
|
||||
StorageAzureBlob::Configuration src_configuration = configuration;
|
||||
src_configuration.container = source;
|
||||
src_client = StorageAzureBlob::createClient(src_configuration, /* is_read_only */ false);
|
||||
|
||||
StorageAzureBlob::Configuration dest_configuration = configuration;
|
||||
dest_configuration.container = destination;
|
||||
dest_client = StorageAzureBlob::createClient(dest_configuration, /* is_read_only */ false);
|
||||
|
||||
LOG_TRACE(log, "Copying file inside backup from {} to {} ", source, destination);
|
||||
copyAzureBlobStorageFile(
|
||||
src_client,
|
||||
dest_client,
|
||||
client,
|
||||
client,
|
||||
configuration.container,
|
||||
fs::path(configuration.blob_path),
|
||||
fs::path(source),
|
||||
0,
|
||||
size,
|
||||
/* dest_container */ destination,
|
||||
/* dest_path */ configuration.blob_path,
|
||||
/* dest_container */ configuration.container,
|
||||
/* dest_path */ destination,
|
||||
settings,
|
||||
read_settings,
|
||||
{},
|
||||
@ -303,7 +265,7 @@ std::unique_ptr<ReadBuffer> BackupWriterAzureBlobStorage::readFile(const String
|
||||
}
|
||||
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(
|
||||
client, key, read_settings, settings->max_single_read_retries,
|
||||
client.get(), key, read_settings, settings->max_single_read_retries,
|
||||
settings->max_single_download_retries);
|
||||
}
|
||||
|
||||
@ -319,7 +281,7 @@ std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const Strin
|
||||
key = file_name;
|
||||
}
|
||||
return std::make_unique<WriteBufferFromAzureBlobStorage>(
|
||||
client,
|
||||
client.get(),
|
||||
key,
|
||||
settings->max_single_part_upload_size,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
|
@ -28,7 +28,7 @@ public:
|
||||
|
||||
private:
|
||||
const DataSourceDescription data_source_description;
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||
StorageAzureBlob::Configuration configuration;
|
||||
std::unique_ptr<AzureObjectStorage> object_storage;
|
||||
std::shared_ptr<AzureObjectStorageSettings> settings;
|
||||
@ -57,7 +57,7 @@ private:
|
||||
std::unique_ptr<ReadBuffer> readFile(const String & file_name, size_t expected_file_size) override;
|
||||
void removeFilesBatch(const Strings & file_names);
|
||||
const DataSourceDescription data_source_description;
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||
StorageAzureBlob::Configuration configuration;
|
||||
std::unique_ptr<AzureObjectStorage> object_storage;
|
||||
std::shared_ptr<AzureObjectStorageSettings> settings;
|
||||
|
@ -134,6 +134,11 @@ public:
|
||||
|
||||
bool isRemote() const override { return true; }
|
||||
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & getClient()
|
||||
{
|
||||
return client;
|
||||
}
|
||||
|
||||
private:
|
||||
const String name;
|
||||
/// client used to access the files in the Blob Storage cloud
|
||||
|
@ -43,7 +43,7 @@ namespace
|
||||
public:
|
||||
UploadHelper(
|
||||
const CreateReadBuffer & create_read_buffer_,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client_,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client_,
|
||||
size_t offset_,
|
||||
size_t total_size_,
|
||||
const String & dest_container_,
|
||||
@ -72,7 +72,7 @@ namespace
|
||||
|
||||
protected:
|
||||
std::function<std::unique_ptr<SeekableReadBuffer>()> create_read_buffer;
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client;
|
||||
size_t offset;
|
||||
size_t total_size;
|
||||
const String & dest_container;
|
||||
@ -170,7 +170,7 @@ namespace
|
||||
|
||||
void completeMultipartUpload()
|
||||
{
|
||||
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
|
||||
auto block_blob_client = client.get()->GetBlockBlobClient(dest_blob);
|
||||
block_blob_client.CommitBlockList(block_ids);
|
||||
}
|
||||
|
||||
@ -295,7 +295,7 @@ namespace
|
||||
if (for_disk_azure_blob_storage)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureUploadPart);
|
||||
|
||||
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
|
||||
auto block_blob_client = client.get()->GetBlockBlobClient(dest_blob);
|
||||
task.block_id = getRandomASCIIString(64);
|
||||
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(task.data), task.size);
|
||||
block_blob_client.StageBlock(task.block_id, memory);
|
||||
@ -329,7 +329,7 @@ void copyDataToAzureBlobStorageFile(
|
||||
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||
const String & dest_container,
|
||||
const String & dest_blob,
|
||||
std::shared_ptr<AzureObjectStorageSettings> settings,
|
||||
@ -343,8 +343,8 @@ void copyDataToAzureBlobStorageFile(
|
||||
|
||||
|
||||
void copyAzureBlobStorageFile(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & src_client,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||
const String & src_container,
|
||||
const String & src_blob,
|
||||
size_t offset,
|
||||
@ -363,8 +363,8 @@ void copyAzureBlobStorageFile(
|
||||
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
||||
if (for_disk_azure_blob_storage)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
||||
auto block_blob_client_src = src_client->GetBlockBlobClient(src_blob);
|
||||
auto block_blob_client_dest = dest_client->GetBlockBlobClient(dest_blob);
|
||||
auto block_blob_client_src = src_client.get()->GetBlockBlobClient(src_blob);
|
||||
auto block_blob_client_dest = dest_client.get()->GetBlockBlobClient(dest_blob);
|
||||
auto uri = block_blob_client_src.GetUrl();
|
||||
block_blob_client_dest.CopyFromUri(uri);
|
||||
}
|
||||
@ -373,7 +373,7 @@ void copyAzureBlobStorageFile(
|
||||
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container, src_blob);
|
||||
auto create_read_buffer = [&]
|
||||
{
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_blob, read_settings, settings->max_single_read_retries,
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client.get(), src_blob, read_settings, settings->max_single_read_retries,
|
||||
settings->max_single_download_retries);
|
||||
};
|
||||
|
||||
|
@ -21,8 +21,8 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
|
||||
/// Copies a file from AzureBlobStorage to AzureBlobStorage.
|
||||
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
|
||||
void copyAzureBlobStorageFile(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & src_client,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||
const String & src_container,
|
||||
const String & src_path,
|
||||
size_t src_offset,
|
||||
@ -45,7 +45,7 @@ void copyDataToAzureBlobStorageFile(
|
||||
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> & client,
|
||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client,
|
||||
const String & dest_container,
|
||||
const String & dest_bucket,
|
||||
std::shared_ptr<AzureObjectStorageSettings> settings,
|
||||
|
Loading…
Reference in New Issue
Block a user