Updated thread name

This commit is contained in:
Smita Kulkarni 2023-12-27 10:28:52 +01:00
parent 3e22f29b45
commit d46d914521
3 changed files with 59 additions and 55 deletions

View File

@ -35,7 +35,7 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::AzureBlobStorage, "AzureBlobStorage", false, false}
, data_source_description{DataSourceType::AzureBlobStorage, configuration_.container, false, false}
, configuration(configuration_)
{
client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
@ -160,7 +160,7 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::AzureBlobStorage, "AzureBlobStorage", false, false}
, data_source_description{DataSourceType::AzureBlobStorage,configuration_.container, false, false}
, configuration(configuration_)
{
client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
@ -209,7 +209,7 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
settings,
read_settings,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterAzureBlobStorage"));
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
return; /// copied!
}
}
@ -221,7 +221,7 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToAzureBlobStorageFile(create_read_buffer, start_pos, length, client, configuration.container, path_in_backup, settings, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterAzureBlobStorage"));
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
BackupWriterAzureBlobStorage::~BackupWriterAzureBlobStorage() = default;

View File

@ -12,57 +12,54 @@
namespace DB
{
// using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
/// Represents a backup stored to Azure
class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupReaderAzureBlobStorage() override;
class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupReaderAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) override;
void copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) override;
private:
const DataSourceDescription data_source_description;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<AzureObjectStorageSettings> settings;
};
private:
const DataSourceDescription data_source_description;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<AzureObjectStorageSettings> settings;
};
class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupWriterAzureBlobStorage() override;
class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupWriterAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) override;
void copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
bool copy_encrypted, UInt64 start_pos, UInt64 length) override;
void copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) override;
void copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
bool copy_encrypted, UInt64 start_pos, UInt64 length) override;
void removeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
void removeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
private:
std::unique_ptr<ReadBuffer> readFile(const String & file_name, size_t expected_file_size) override;
void removeFilesBatch(const Strings & file_names);
const DataSourceDescription data_source_description;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<AzureObjectStorageSettings> settings;
};
private:
std::unique_ptr<ReadBuffer> readFile(const String & file_name, size_t expected_file_size) override;
void removeFilesBatch(const Strings & file_names);
const DataSourceDescription data_source_description;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<AzureObjectStorageSettings> settings;
};
}

View File

@ -22,6 +22,11 @@ namespace ProfileEvents
extern const Event DiskAzureUploadPart;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
{
@ -44,7 +49,8 @@ namespace
std::shared_ptr<AzureObjectStorageSettings> settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_azure_blob_storage_)
bool for_disk_azure_blob_storage_,
const Poco::Logger * log_)
: create_read_buffer(create_read_buffer_)
, client(client_)
, offset (offset_)
@ -55,7 +61,7 @@ namespace
, object_metadata(object_metadata_)
, schedule(schedule_)
, for_disk_azure_blob_storage(for_disk_azure_blob_storage_)
, log(&Poco::Logger::get("azureBlobStorageUploadHelper"))
, log(log_)
, max_single_part_upload_size(settings_.get()->max_single_part_upload_size)
{
}
@ -179,11 +185,11 @@ namespace
try
{
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), part_offset, part_size);
auto buffer = std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), part_size);
task->data = new char[part_size];
task->size = part_size;
buffer->read(task->data,part_size);
task->block_id = getRandomASCIIString(64);
size_t n = read_buffer->read(task->data,part_size);
if (n != part_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size");
schedule([this, task, task_finish_notify]()
{
@ -208,9 +214,10 @@ namespace
{
UploadPartTask task;
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), part_offset, part_size);
auto buffer = std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), part_size);
task.data = new char[part_size];
buffer->read(task.data,part_size);
size_t n = read_buffer->read(task.data,part_size);
if (n != part_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size");
task.size = part_size;
processUploadTask(task);
block_ids.emplace_back(task.block_id);
@ -274,7 +281,7 @@ void copyDataToAzureBlobStorageFile(
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_azure_blob_storage)
{
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_azure_blob_storage};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
helper.performCopy();
}
@ -314,7 +321,7 @@ void copyAzureBlobStorageFile(
settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_azure_blob_storage};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyAzureBlobStorageFile")};
helper.performCopy();
}
}