mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 13:32:13 +00:00
Addressed review comments
This commit is contained in:
parent
788eb48707
commit
f551081dd4
@ -104,7 +104,7 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
|||||||
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
|
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
|
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
|
||||||
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes) -> size_t
|
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
|
||||||
{
|
{
|
||||||
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
|
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
|
||||||
if (blob_path.size() != 2 || mode != WriteMode::Rewrite)
|
if (blob_path.size() != 2 || mode != WriteMode::Rewrite)
|
||||||
@ -123,7 +123,6 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
|||||||
/* dest_path */ blob_path[0],
|
/* dest_path */ blob_path[0],
|
||||||
settings,
|
settings,
|
||||||
read_settings,
|
read_settings,
|
||||||
object_attributes,
|
|
||||||
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"),
|
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"),
|
||||||
/* for_disk_azure_blob_storage= */ true);
|
/* for_disk_azure_blob_storage= */ true);
|
||||||
|
|
||||||
@ -180,7 +179,6 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
|
|||||||
fs::path(configuration.blob_path) / path_in_backup,
|
fs::path(configuration.blob_path) / path_in_backup,
|
||||||
settings,
|
settings,
|
||||||
read_settings,
|
read_settings,
|
||||||
{},
|
|
||||||
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
|
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
|
||||||
return; /// copied!
|
return; /// copied!
|
||||||
}
|
}
|
||||||
@ -204,14 +202,13 @@ void BackupWriterAzureBlobStorage::copyFile(const String & destination, const St
|
|||||||
/* dest_path */ destination,
|
/* dest_path */ destination,
|
||||||
settings,
|
settings,
|
||||||
read_settings,
|
read_settings,
|
||||||
{},
|
|
||||||
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"),
|
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"),
|
||||||
/* for_disk_azure_blob_storage= */ true);
|
/* for_disk_azure_blob_storage= */ true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
|
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
|
||||||
{
|
{
|
||||||
copyDataToAzureBlobStorageFile(create_read_buffer, start_pos, length, client, configuration.container, path_in_backup, settings, {},
|
copyDataToAzureBlobStorageFile(create_read_buffer, start_pos, length, client, configuration.container, path_in_backup, settings,
|
||||||
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
|
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,10 +47,9 @@ namespace
|
|||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client_,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client_,
|
||||||
size_t offset_,
|
size_t offset_,
|
||||||
size_t total_size_,
|
size_t total_size_,
|
||||||
const String & dest_container_,
|
const String & dest_container_for_logging_,
|
||||||
const String & dest_blob_,
|
const String & dest_blob_,
|
||||||
MultiVersion<AzureObjectStorageSettings> settings_,
|
MultiVersion<AzureObjectStorageSettings> settings_,
|
||||||
const std::optional<std::map<String, String>> & object_metadata_,
|
|
||||||
ThreadPoolCallbackRunner<void> schedule_,
|
ThreadPoolCallbackRunner<void> schedule_,
|
||||||
bool for_disk_azure_blob_storage_,
|
bool for_disk_azure_blob_storage_,
|
||||||
const Poco::Logger * log_)
|
const Poco::Logger * log_)
|
||||||
@ -58,10 +57,9 @@ namespace
|
|||||||
, client(client_)
|
, client(client_)
|
||||||
, offset (offset_)
|
, offset (offset_)
|
||||||
, total_size (total_size_)
|
, total_size (total_size_)
|
||||||
, dest_container(dest_container_)
|
, dest_container_for_logging(dest_container_for_logging_)
|
||||||
, dest_blob(dest_blob_)
|
, dest_blob(dest_blob_)
|
||||||
, settings(settings_)
|
, settings(settings_)
|
||||||
, object_metadata(object_metadata_)
|
|
||||||
, schedule(schedule_)
|
, schedule(schedule_)
|
||||||
, for_disk_azure_blob_storage(for_disk_azure_blob_storage_)
|
, for_disk_azure_blob_storage(for_disk_azure_blob_storage_)
|
||||||
, log(log_)
|
, log(log_)
|
||||||
@ -76,10 +74,9 @@ namespace
|
|||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client;
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client;
|
||||||
size_t offset;
|
size_t offset;
|
||||||
size_t total_size;
|
size_t total_size;
|
||||||
const String & dest_container;
|
const String & dest_container_for_logging;
|
||||||
const String & dest_blob;
|
const String & dest_blob;
|
||||||
MultiVersion<AzureObjectStorageSettings> settings;
|
MultiVersion<AzureObjectStorageSettings> settings;
|
||||||
const std::optional<std::map<String, String>> & object_metadata;
|
|
||||||
ThreadPoolCallbackRunner<void> schedule;
|
ThreadPoolCallbackRunner<void> schedule;
|
||||||
bool for_disk_azure_blob_storage;
|
bool for_disk_azure_blob_storage;
|
||||||
const Poco::Logger * log;
|
const Poco::Logger * log;
|
||||||
@ -208,7 +205,7 @@ namespace
|
|||||||
|
|
||||||
void uploadPart(size_t part_offset, size_t part_size)
|
void uploadPart(size_t part_offset, size_t part_size)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, Size: {}", dest_container, dest_blob, part_size);
|
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, Size: {}", dest_container_for_logging, dest_blob, part_size);
|
||||||
|
|
||||||
if (!part_size)
|
if (!part_size)
|
||||||
{
|
{
|
||||||
@ -287,7 +284,7 @@ namespace
|
|||||||
|
|
||||||
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
|
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
|
||||||
task.block_id = block_id;
|
task.block_id = block_id;
|
||||||
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, block_id: {}, Parts: {}", dest_container, dest_blob, block_id, bg_tasks.size());
|
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, block_id: {}, Parts: {}", dest_container_for_logging, dest_blob, block_id, bg_tasks.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
String processUploadPartRequest(UploadPartTask & task)
|
String processUploadPartRequest(UploadPartTask & task)
|
||||||
@ -331,14 +328,13 @@ void copyDataToAzureBlobStorageFile(
|
|||||||
size_t offset,
|
size_t offset,
|
||||||
size_t size,
|
size_t size,
|
||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||||
const String & dest_container,
|
const String & dest_container_for_logging,
|
||||||
const String & dest_blob,
|
const String & dest_blob,
|
||||||
MultiVersion<AzureObjectStorageSettings> settings,
|
MultiVersion<AzureObjectStorageSettings> settings,
|
||||||
const std::optional<std::map<String, String>> & object_metadata,
|
|
||||||
ThreadPoolCallbackRunner<void> schedule,
|
ThreadPoolCallbackRunner<void> schedule,
|
||||||
bool for_disk_azure_blob_storage)
|
bool for_disk_azure_blob_storage)
|
||||||
{
|
{
|
||||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container, dest_blob, settings, object_metadata, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
|
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
|
||||||
helper.performCopy();
|
helper.performCopy();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -346,15 +342,14 @@ void copyDataToAzureBlobStorageFile(
|
|||||||
void copyAzureBlobStorageFile(
|
void copyAzureBlobStorageFile(
|
||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & src_client,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & src_client,
|
||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||||
const String & src_container,
|
const String & src_container_for_logging,
|
||||||
const String & src_blob,
|
const String & src_blob,
|
||||||
size_t offset,
|
size_t offset,
|
||||||
size_t size,
|
size_t size,
|
||||||
const String & dest_container,
|
const String & dest_container_for_logging,
|
||||||
const String & dest_blob,
|
const String & dest_blob,
|
||||||
MultiVersion<AzureObjectStorageSettings> settings,
|
MultiVersion<AzureObjectStorageSettings> settings,
|
||||||
const ReadSettings & read_settings,
|
const ReadSettings & read_settings,
|
||||||
const std::optional<std::map<String, String>> & object_metadata,
|
|
||||||
ThreadPoolCallbackRunner<void> schedule,
|
ThreadPoolCallbackRunner<void> schedule,
|
||||||
bool for_disk_azure_blob_storage)
|
bool for_disk_azure_blob_storage)
|
||||||
{
|
{
|
||||||
@ -390,14 +385,14 @@ void copyAzureBlobStorageFile(
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container, src_blob);
|
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
|
||||||
auto create_read_buffer = [&]
|
auto create_read_buffer = [&]
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client.get(), src_blob, read_settings, settings.get()->max_single_read_retries,
|
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client.get(), src_blob, read_settings, settings.get()->max_single_read_retries,
|
||||||
settings.get()->max_single_download_retries);
|
settings.get()->max_single_download_retries);
|
||||||
};
|
};
|
||||||
|
|
||||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container, dest_blob, settings, object_metadata, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyAzureBlobStorageFile")};
|
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyAzureBlobStorageFile")};
|
||||||
helper.performCopy();
|
helper.performCopy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,15 +23,14 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
|
|||||||
void copyAzureBlobStorageFile(
|
void copyAzureBlobStorageFile(
|
||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & src_client,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & src_client,
|
||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
|
||||||
const String & src_container,
|
const String & src_container_for_logging,
|
||||||
const String & src_blob,
|
const String & src_blob,
|
||||||
size_t src_offset,
|
size_t src_offset,
|
||||||
size_t src_size,
|
size_t src_size,
|
||||||
const String & dest_container,
|
const String & dest_container_for_logging,
|
||||||
const String & dest_blob,
|
const String & dest_blob,
|
||||||
MultiVersion<AzureObjectStorageSettings> settings,
|
MultiVersion<AzureObjectStorageSettings> settings,
|
||||||
const ReadSettings & read_settings,
|
const ReadSettings & read_settings,
|
||||||
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
|
|
||||||
ThreadPoolCallbackRunner<void> schedule_ = {},
|
ThreadPoolCallbackRunner<void> schedule_ = {},
|
||||||
bool for_disk_azure_blob_storage = false);
|
bool for_disk_azure_blob_storage = false);
|
||||||
|
|
||||||
@ -46,10 +45,9 @@ void copyDataToAzureBlobStorageFile(
|
|||||||
size_t offset,
|
size_t offset,
|
||||||
size_t size,
|
size_t size,
|
||||||
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client,
|
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> & client,
|
||||||
const String & dest_container,
|
const String & dest_container_for_logging,
|
||||||
const String & dest_blob,
|
const String & dest_blob,
|
||||||
MultiVersion<AzureObjectStorageSettings> settings,
|
MultiVersion<AzureObjectStorageSettings> settings,
|
||||||
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
|
|
||||||
ThreadPoolCallbackRunner<void> schedule_ = {},
|
ThreadPoolCallbackRunner<void> schedule_ = {},
|
||||||
bool for_disk_azure_blob_storage = false);
|
bool for_disk_azure_blob_storage = false);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user