Initial draft version of adding backup support to AzureBlobStorage

This commit is contained in:
Smita Kulkarni 2023-11-20 10:56:10 +01:00
parent a6ebeb8422
commit 961bf074da
16 changed files with 1132 additions and 2 deletions

View File

@ -33,11 +33,13 @@ void BackupFactory::registerBackupEngine(const String & engine_name, const Creat
void registerBackupEnginesFileAndDisk(BackupFactory &);
void registerBackupEngineS3(BackupFactory &);
void registerBackupEngineAzureBlobStorage(BackupFactory &);
void registerBackupEngines(BackupFactory & factory)
{
registerBackupEnginesFileAndDisk(factory);
registerBackupEngineS3(factory);
registerBackupEngineAzureBlobStorage(factory);
}
BackupFactory::BackupFactory()

View File

@ -0,0 +1,336 @@
#include <Backups/BackupIO_AzureBlobStorage.h>
#if USE_AZURE_BLOB_STORAGE
#include <Common/quoteString.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/SharedThreadPools.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/StorageAzureBlobCluster.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/AzureBlobStorage/copyAzureBlobStorageFile.h>
#include <Disks/IDisk.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
//using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::AzureBlobStorage, "AzureBlobStorage", false, false}
, configuration(configuration_)
{
client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
settings = StorageAzureBlob::createSettingsAsSharedPtr(context_);
auto settings_as_unique_ptr = StorageAzureBlob::createSettings(context_);
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(*client.get()),
std::move(settings_as_unique_ptr));
}
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
bool BackupReaderAzureBlobStorage::fileExists(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return object_storage->exists(StoredObject(key));
}
UInt64 BackupReaderAzureBlobStorage::getFileSize(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
return object_metadata.size_bytes;
}
std::unique_ptr<SeekableReadBuffer> BackupReaderAzureBlobStorage::readFile(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client, key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
}
void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
{
LOG_INFO(&Poco::Logger::get("BackupReaderAzureBlobStorage"), "Enter copyFileToDisk");
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
/// We don't check for `has_throttling` here because the native copy almost doesn't use network.
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if (destination_data_source_description.sameKind(data_source_description)
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
{
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
if (blob_path.size() != 2 || mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Blob writing function called with unexpected blob_path.size={} or mode={}",
blob_path.size(), mode);
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client;
if (configuration.container == blob_path[1])
{
dest_client = client;
}
else
{
StorageAzureBlob::Configuration dest_configuration = configuration;
dest_configuration.container = blob_path[1];
dest_configuration.blob_path = blob_path[0];
dest_client = StorageAzureBlob::createClient(dest_configuration, /* is_read_only */ false);
}
copyAzureBlobStorageFile(
client,
dest_client,
configuration.container,
fs::path(configuration.blob_path) / path_in_backup,
0,
file_size,
/* dest_bucket= */ blob_path[1],
/* dest_key= */ blob_path[0],
settings,
read_settings,
object_attributes,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderAzureBlobStorage"),
/* for_disk_azure_blob_storage= */ true);
return file_size;
};
destination_disk->writeFileUsingBlobWritingFunction(destination_path, write_mode, write_blob_function);
return; /// copied!
}
/// Fallback to copy through buffers.
BackupReaderDefault::copyFileToDisk(path_in_backup, file_size, encrypted_in_backup, destination_disk, destination_path, write_mode);
}
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::AzureBlobStorage, "AzureBlobStorage", false, false}
, configuration(configuration_)
{
client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
settings = StorageAzureBlob::createSettingsAsSharedPtr(context_);
auto settings_as_unique_ptr = StorageAzureBlob::createSettings(context_);
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(*client.get()),
std::move(settings_as_unique_ptr));
}
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
bool copy_encrypted, UInt64 start_pos, UInt64 length)
{
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
auto source_data_source_description = src_disk->getDataSourceDescription();
if (source_data_source_description.sameKind(data_source_description) && (source_data_source_description.is_encrypted == copy_encrypted))
{
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage bucket.
/// In this case we can't use the native copy.
if (auto blob_path = src_disk->getBlobPath(src_path); blob_path.size() == 2)
{
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client;
if (configuration.container == blob_path[1])
{
src_client = client;
}
else
{
StorageAzureBlob::Configuration src_configuration = configuration;
src_configuration.container = blob_path[1];
src_configuration.blob_path = blob_path[0];
src_client = StorageAzureBlob::createClient(src_configuration, /* is_read_only */ false);
}
LOG_TRACE(log, "Copying file {} from disk {} to AzureBlobStorag", src_path, src_disk->getName());
copyAzureBlobStorageFile(
src_client,
client,
/* src_bucket */ blob_path[1],
/* src_key= */ blob_path[0],
start_pos,
length,
configuration.container,
fs::path(configuration.blob_path) / path_in_backup,
settings,
read_settings,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterAzureBlobStorage"));
return; /// copied!
}
}
/// Fallback to copy through buffers.
BackupWriterDefault::copyFileFromDisk(path_in_backup, src_disk, src_path, copy_encrypted, start_pos, length);
}
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToAzureBlobStorageFile(create_read_buffer, start_pos, length, client, configuration.container, path_in_backup, settings, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterAzureBlobStorage"));
}
BackupWriterAzureBlobStorage::~BackupWriterAzureBlobStorage() = default;
bool BackupWriterAzureBlobStorage::fileExists(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
LOG_INFO(&Poco::Logger::get("BackupWriterAzureBlobStorage"), "Result fileExists {} ", object_storage->exists(StoredObject(key)));
return object_storage->exists(StoredObject(key));
}
UInt64 BackupWriterAzureBlobStorage::getFileSize(const String & file_name)
{
LOG_INFO(&Poco::Logger::get("BackupWriterAzureBlobStorage"), "Enter getFileSize");
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
RelativePathsWithMetadata children;
object_storage->listObjects(key,children,/*max_keys*/0);
if (children.empty())
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Object {} must exist");
return children[0].metadata.size_bytes;
}
std::unique_ptr<ReadBuffer> BackupWriterAzureBlobStorage::readFile(const String & file_name, size_t /*expected_file_size*/)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client, key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
}
std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client,
key,
settings->max_single_part_upload_size,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings);
}
void BackupWriterAzureBlobStorage::removeFile(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
StoredObject object(key);
object_storage->removeObjectIfExists(object);
}
void BackupWriterAzureBlobStorage::removeFiles(const Strings & keys)
{
StoredObjects objects;
for (const auto & key : keys)
objects.emplace_back(key);
object_storage->removeObjectsIfExist(objects);
}
void BackupWriterAzureBlobStorage::removeFilesBatch(const Strings & keys)
{
StoredObjects objects;
for (const auto & key : keys)
objects.emplace_back(key);
object_storage->removeObjectsIfExist(objects);
}
}
#endif

View File

@ -0,0 +1,69 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Backups/BackupIO_Default.h>
#include <Disks/DiskType.h>
#include <Storages/StorageAzureBlobCluster.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
// using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
/// Represents a backup stored to Azure
class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupReaderAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) override;
private:
const DataSourceDescription data_source_description;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<AzureObjectStorageSettings> settings;
};
class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupWriterAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) override;
void copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
bool copy_encrypted, UInt64 start_pos, UInt64 length) override;
void removeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
private:
std::unique_ptr<ReadBuffer> readFile(const String & file_name, size_t expected_file_size) override;
void removeFilesBatch(const Strings & file_names);
const DataSourceDescription data_source_description;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<AzureObjectStorageSettings> settings;
};
}
#endif

View File

@ -492,6 +492,7 @@ void BackupImpl::checkBackupDoesntExist() const
else
file_name_to_check_existence = ".backup";
LOG_INFO(&Poco::Logger::get("BackupImpl"), "checkBackupDoesntExist 1");
if (writer->fileExists(file_name_to_check_existence))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", backup_name_for_logging);
@ -499,6 +500,7 @@ void BackupImpl::checkBackupDoesntExist() const
if (!is_internal_backup)
{
assert(!lock_file_name.empty());
LOG_INFO(&Poco::Logger::get("BackupImpl"), "checkBackupDoesntExist 2");
if (writer->fileExists(lock_file_name))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} is being written already", backup_name_for_logging);
}
@ -522,6 +524,8 @@ bool BackupImpl::checkLockFile(bool throw_if_failed) const
if (throw_if_failed)
{
LOG_INFO(&Poco::Logger::get("BackupImpl"), "checkLockFile");
if (!writer->fileExists(lock_file_name))
{
throw Exception(
@ -886,12 +890,12 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry)
}
else if (src_disk && from_immutable_file)
{
LOG_TRACE(log, "Writing backup for file {} from {} (disk {}): data file #{}", info.data_file_name, src_file_desc, src_disk->getName(), info.data_file_index);
LOG_INFO(log, "Writing backup for file {} from {} (disk {}): data file #{}", info.data_file_name, src_file_desc, src_disk->getName(), info.data_file_index);
writer->copyFileFromDisk(info.data_file_name, src_disk, src_file_path, info.encrypted_by_disk, info.base_size, info.size - info.base_size);
}
else
{
LOG_TRACE(log, "Writing backup for file {} from {}: data file #{}", info.data_file_name, src_file_desc, info.data_file_index);
LOG_INFO(log, "Writing backup for file {} from {}: data file #{}", info.data_file_name, src_file_desc, info.data_file_index);
auto create_read_buffer = [entry, read_settings = writer->getReadSettings()] { return entry->getReadBuffer(read_settings); };
writer->copyDataToFile(info.data_file_name, create_read_buffer, info.base_size, info.size - info.base_size);
}

View File

@ -0,0 +1,134 @@
#include "config.h"
#include <Backups/BackupFactory.h>
#include <Common/Exception.h>
#if USE_AZURE_BLOB_STORAGE
#include <Backups/BackupIO_AzureBlobStorage.h>
#include <Storages/StorageAzureBlob.h>
#include <Backups/BackupImpl.h>
#include <IO/Archives/hasRegisteredArchiveFileExtension.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
#endif
namespace DB
{
namespace fs = std::filesystem;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
}
#if USE_AZURE_BLOB_STORAGE
namespace
{
String removeFileNameFromURL(String & url)
{
Poco::URI url2{url};
String path = url2.getPath();
size_t slash_pos = path.find_last_of('/');
String file_name = path.substr(slash_pos + 1);
path.resize(slash_pos + 1);
url2.setPath(path);
url = url2.toString();
return file_name;
}
}
#endif
void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
{
auto creator_fn = []([[maybe_unused]] const BackupFactory::CreateParams & params) -> std::unique_ptr<IBackup>
{
#if USE_AZURE_BLOB_STORAGE
const String & id_arg = params.backup_info.id_arg;
const auto & args = params.backup_info.args;
LOG_INFO(&Poco::Logger::get("registerBackupEngineAzureBlobStorage"), "Begin id_arg={} args.size={}", id_arg, args.size());
StorageAzureBlob::Configuration configuration;
if (args.size() == 4)
{
configuration.connection_url = args[0].safeGet<String>();
configuration.is_connection_string = true;
configuration.container = args[1].safeGet<String>();
configuration.blob_path = args[2].safeGet<String>();
configuration.format = args[3].safeGet<String>();
LOG_TRACE(&Poco::Logger::get("registerBackupEngineAzureBlobStorage"), "configuration.connection_url = {}"
"configuration.container = {}"
"configuration.blob_path = {}"
"configuration.format = {}",
configuration.connection_url, configuration.container, configuration.blob_path, configuration.format);
}
BackupImpl::ArchiveParams archive_params;
if (hasRegisteredArchiveFileExtension(configuration.blob_path))
{
if (params.is_internal_backup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Using archives with backups on clusters is disabled");
archive_params.archive_name = removeFileNameFromURL(configuration.blob_path);
archive_params.compression_method = params.compression_method;
archive_params.compression_level = params.compression_level;
archive_params.password = params.password;
}
else
{
if (!params.password.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Password is not applicable, backup cannot be encrypted");
}
if (params.open_mode == IBackup::OpenMode::READ)
{
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
params.backup_info,
archive_params,
params.base_backup_info,
reader,
params.context,
/*params.use_same_s3_credentials_for_base_backup*/ false);
}
else
{
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
params.backup_info,
archive_params,
params.base_backup_info,
writer,
params.context,
params.is_internal_backup,
params.backup_coordination,
params.backup_uuid,
params.deduplicate_files,
/*params.use_same_s3_credentials_for_base_backup*/ false);
}
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "AzureBlobStorage support is disabled");
#endif
};
factory.registerBackupEngine("AzureBlobStorage", creator_fn);
}
}

View File

@ -87,6 +87,7 @@ add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/Resource)
add_headers_and_sources(clickhouse_common_io IO/S3)
add_headers_and_sources(clickhouse_common_io IO/AzureBlobStorage)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
@ -139,6 +140,7 @@ endif()
if (TARGET ch_contrib::azure_sdk)
add_headers_and_sources(dbms Disks/ObjectStorages/AzureBlobStorage)
add_headers_and_sources(dbms IO/AzureBlobStorage)
endif()
if (TARGET ch_contrib::hdfs)
@ -485,6 +487,7 @@ if (TARGET ch_contrib::aws_s3)
endif()
if (TARGET ch_contrib::azure_sdk)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::azure_sdk)
dbms_target_link_libraries (PRIVATE ch_contrib::azure_sdk)
endif()

View File

@ -361,6 +361,10 @@ The server successfully detected this situation and will download merged part fr
M(S3PutObject, "Number of S3 API PutObject calls.") \
M(S3GetObject, "Number of S3 API GetObject calls.") \
\
M(AzureUploadPart, "Number of Azure blob storage API UploadPart calls") \
M(DiskAzureUploadPart, "Number of Disk Azure blob storage API UploadPart calls") \
M(AzureCopyObject, "Number of Azure blob storage API CopyObject calls") \
M(DiskAzureCopyObject, "Number of Disk Azure blob storage API CopyObject calls") \
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.") \
\

View File

@ -0,0 +1,324 @@
#include <IO/AzureBlobStorage/copyAzureBlobStorageFile.h>
#if USE_AZURE_BLOB_STORAGE
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>
#include <Interpreters/Context.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/StdStreamFromReadBuffer.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <Common/getRandomASCIIString.h>
#include <IO/SharedThreadPools.h>
namespace ProfileEvents
{
extern const Event AzureCopyObject;
extern const Event AzureUploadPart;
extern const Event DiskAzureCopyObject;
extern const Event DiskAzureUploadPart;
}
namespace DB
{
size_t max_single_operation_copy_size = 256 * 1024 * 1024;
namespace
{
class UploadHelper
{
public:
UploadHelper(
const CreateReadBuffer & create_read_buffer_,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client_,
size_t offset_,
size_t total_size_,
const String & dest_bucket_,
const String & dest_key_,
std::shared_ptr<AzureObjectStorageSettings> settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_azure_blob_storage_)
: create_read_buffer(create_read_buffer_)
, client(client_)
, offset (offset_)
, total_size (total_size_)
, dest_bucket(dest_bucket_)
, dest_key(dest_key_)
, settings(settings_)
, object_metadata(object_metadata_)
, schedule(schedule_)
, for_disk_azure_blob_storage(for_disk_azure_blob_storage_)
, log(&Poco::Logger::get("azureBlobStorageUploadHelper"))
, max_single_part_upload_size(settings_.get()->max_single_part_upload_size)
{
}
~UploadHelper() {}
protected:
std::function<std::unique_ptr<SeekableReadBuffer>()> create_read_buffer;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client;
size_t offset;
size_t total_size;
const String & dest_bucket;
const String & dest_key;
std::shared_ptr<AzureObjectStorageSettings> settings;
const std::optional<std::map<String, String>> & object_metadata;
ThreadPoolCallbackRunner<void> schedule;
bool for_disk_azure_blob_storage;
const Poco::Logger * log;
size_t max_single_part_upload_size;
struct UploadPartTask
{
char *data = nullptr;
size_t size = 0;
std::string block_id;
bool is_finished = false;
std::exception_ptr exception;
~UploadPartTask()
{
if (data != nullptr)
free(data);
}
};
size_t normal_part_size;
std::vector<std::string> block_ids;
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar;
public:
void performCopy()
{
performMultipartUpload();
}
void completeMultipartUpload()
{
auto block_blob_client = client->GetBlockBlobClient(dest_key);
block_blob_client.CommitBlockList(block_ids);
}
void performMultipartUpload()
{
normal_part_size = 1024;
size_t position = offset;
size_t end_position = offset + total_size;
try
{
while (position < end_position)
{
size_t next_position = std::min(position + normal_part_size, end_position);
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
uploadPart(position, part_size);
position = next_position;
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
waitForAllBackgroundTasks();
throw;
}
waitForAllBackgroundTasks();
completeMultipartUpload();
}
void uploadPart(size_t part_offset, size_t part_size)
{
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Size: {}", dest_bucket, dest_key, part_size);
if (!part_size)
{
LOG_TRACE(log, "Skipping writing an empty part.");
return;
}
if (schedule)
{
UploadPartTask * task = nullptr;
{
std::lock_guard lock(bg_tasks_mutex);
task = &bg_tasks.emplace_back();
++num_added_bg_tasks;
}
/// Notify waiting thread when task finished
auto task_finish_notify = [this, task]()
{
std::lock_guard lock(bg_tasks_mutex);
task->is_finished = true;
++num_finished_bg_tasks;
/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};
try
{
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), part_offset, part_size);
auto buffer = std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), part_size);
task->data = new char[part_size];
task->size = part_size;
buffer->read(task->data,part_size);
task->block_id = getRandomASCIIString(64);
schedule([this, task, task_finish_notify]()
{
try
{
processUploadTask(*task);
}
catch (...)
{
task->exception = std::current_exception();
}
task_finish_notify();
}, Priority{});
}
catch (...)
{
task_finish_notify();
throw;
}
}
else
{
UploadPartTask task;
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), part_offset, part_size);
auto buffer = std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), part_size);
task.data = new char[part_size];
buffer->read(task.data,part_size);
task.size = part_size;
processUploadTask(task);
block_ids.emplace_back(task.block_id);
}
}
void processUploadTask(UploadPartTask & task)
{
auto block_id = processUploadPartRequest(task);
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
task.block_id = block_id;
LOG_TRACE(log, "Writing part finished. Bucket: {}, Key: {}, block_id: {}, Parts: {}", dest_bucket, dest_key, block_id, bg_tasks.size());
}
String processUploadPartRequest(UploadPartTask & task)
{
ProfileEvents::increment(ProfileEvents::AzureUploadPart);
if (for_disk_azure_blob_storage)
ProfileEvents::increment(ProfileEvents::DiskAzureUploadPart);
auto block_blob_client = client->GetBlockBlobClient(dest_key);
task.block_id = getRandomASCIIString(64);
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(task.data), task.size);
block_blob_client.StageBlock(task.block_id, memory);
return task.block_id;
}
void waitForAllBackgroundTasks()
{
if (!schedule)
return;
std::unique_lock lock(bg_tasks_mutex);
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks);
for (auto & task : tasks)
{
if (task.exception)
std::rethrow_exception(task.exception);
block_ids.emplace_back(task.block_id);
}
}
};
}
void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> & dest_client,
const String & dest_bucket,
const String & dest_key,
std::shared_ptr<AzureObjectStorageSettings> settings,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_azure_blob_storage)
{
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_azure_blob_storage};
helper.performCopy();
}
void copyAzureBlobStorageFile(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client,
const String & src_bucket,
const String & src_key,
size_t offset,
size_t size,
const String & dest_bucket,
const String & dest_key,
std::shared_ptr<AzureObjectStorageSettings> settings,
const ReadSettings & read_settings,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_azure_blob_storage)
{
if (size < max_single_operation_copy_size)
{
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (for_disk_azure_blob_storage)
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
auto block_blob_client_src = src_client->GetBlockBlobClient(src_key);
auto block_blob_client_dest = dest_client->GetBlockBlobClient(dest_key);
auto uri = block_blob_client_src.GetUrl();
block_blob_client_dest.CopyFromUri(uri);
}
else
{
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Bucket: {}, Key: {}", src_bucket, src_key);
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_azure_blob_storage};
helper.performCopy();
}
}
}
#endif

View File

@ -0,0 +1,58 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Storages/StorageAzureBlobCluster.h>
#include <Storages/StorageAzureBlob.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <base/types.h>
#include <functional>
#include <memory>
namespace DB
{
class SeekableReadBuffer;
using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// Copies a file from AzureBlobStorage to AzureBlobStorage.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyAzureBlobStorageFile(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> dest_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,
size_t src_size,
const String & dest_bucket,
const String & dest_key,
std::shared_ptr<AzureObjectStorageSettings> settings,
const ReadSettings & read_settings,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_azure_blob_storage = false);
/// Copies data from any seekable source to AzureBlobStorage.
/// The same functionality can be done by using the function copyData() and the class WriteBufferFromS3
/// however copyDataToS3File() is faster and spends less memory.
/// The callback `create_read_buffer` can be called from multiple threads in parallel, so that should be thread-safe.
/// The parameters `offset` and `size` specify a part in the source to copy.
void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> & client,
const String & dest_bucket,
const String & dest_key,
std::shared_ptr<AzureObjectStorageSettings> settings,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_azure_blob_storage = false);
}
#endif

View File

@ -258,6 +258,17 @@ AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(ContextPtr loca
return settings_ptr;
}
std::shared_ptr<AzureObjectStorageSettings> StorageAzureBlob::createSettingsAsSharedPtr(ContextPtr local_context)
{
const auto & context_settings = local_context->getSettingsRef();
auto settings_ptr = std::make_shared<AzureObjectStorageSettings>();
settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size;
settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries;
settings_ptr->list_object_keys_size = static_cast<int32_t>(context_settings.azure_list_object_keys_size);
return settings_ptr;
}
void registerStorageAzureBlob(StorageFactory & factory)
{
factory.registerStorage("AzureBlobStorage", [](const StorageFactory::Arguments & args)

View File

@ -80,6 +80,7 @@ public:
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration, bool is_read_only);
static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);
static std::shared_ptr<AzureObjectStorageSettings> createSettingsAsSharedPtr(ContextPtr local_context);
static void processNamedCollectionResult(StorageAzureBlob::Configuration & configuration, const NamedCollection & collection);

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,11 @@
<clickhouse>
<background_processing_pool_thread_sleep_seconds>1</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_thread_sleep_seconds_random_part>0</background_processing_pool_thread_sleep_seconds_random_part>
<background_processing_pool_thread_sleep_seconds_if_nothing_to_do>0.0</background_processing_pool_thread_sleep_seconds_if_nothing_to_do>
<background_processing_pool_task_sleep_seconds_when_no_work_min>0</background_processing_pool_task_sleep_seconds_when_no_work_min>
<background_processing_pool_task_sleep_seconds_when_no_work_max>1</background_processing_pool_task_sleep_seconds_when_no_work_max>
<background_processing_pool_task_sleep_seconds_when_no_work_multiplier>1</background_processing_pool_task_sleep_seconds_when_no_work_multiplier>
<background_processing_pool_task_sleep_seconds_when_no_work_random_part>0</background_processing_pool_task_sleep_seconds_when_no_work_random_part>
<backup_threads>16</backup_threads>
<restore_threads>16</restore_threads>
</clickhouse>

View File

@ -0,0 +1,13 @@
<!-- Sometime azurite is super slow, profiler make it even worse -->
<clickhouse>
<profiles>
<default>
<query_profiler_real_time_period_ns>0</query_profiler_real_time_period_ns>
<query_profiler_cpu_time_period_ns>0</query_profiler_cpu_time_period_ns>
<load_marks_asynchronously>0</load_marks_asynchronously>
<backup_restore_keeper_max_retries>1000</backup_restore_keeper_max_retries>
<backup_restore_keeper_retry_initial_backoff_ms>1</backup_restore_keeper_retry_initial_backoff_ms>
<backup_restore_keeper_retry_max_backoff_ms>1</backup_restore_keeper_retry_max_backoff_ms>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,151 @@
#!/usr/bin/env python3
import gzip
import json
import logging
import os
import io
import random
import threading
import time
from azure.storage.blob import BlobServiceClient
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/config.xml"],
user_configs=["configs/disable_profilers.xml", "configs/users.xml"],
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def azure_query(
node, query, expect_error="false", try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error == "true":
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
if query_on_retry is not None:
node.query(query_on_retry)
continue
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(
str(connection_string)
)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def put_azure_file_content(filename, port, data):
container_name = "cont"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
try:
container_client = blob_service_client.create_container(container_name)
except:
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
buf = io.BytesIO(data)
blob_client.upload_blob(buf)
@pytest.fixture(autouse=True, scope="function")
def delete_all_files(cluster):
port = cluster.env_variables["AZURITE_PORT"]
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
containers = blob_service_client.list_containers()
for container in containers:
container_client = blob_service_client.get_container_client(container)
blob_list = container_client.list_blobs()
for blob in blob_list:
print(blob)
blob_client = container_client.get_blob_client(blob)
blob_client.delete_blob()
assert len(list(container_client.list_blobs())) == 0
yield
def test_create_table_connection_string(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"CREATE TABLE test_create_table_conn_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_create_connection_string', 'CSV')",
)
def test_backup_restore(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write_c.csv', 'CSV')",
)
azure_query(node, f"INSERT INTO test_simple_write_connection_string VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write_c.csv", port))
assert get_azure_file_content("test_simple_write_c.csv", port) == '1,"a"\n'
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write_c_backup.csv', 'CSV')"
azure_query(node,f"BACKUP TABLE test_simple_write_connection_string TO {backup_destination}")
print (get_azure_file_content("test_simple_write_c_backup.csv.backup", port))
azure_query(node, f"RESTORE TABLE test_simple_write_connection_string AS test_simple_write_connection_string_restored FROM {backup_destination};")
assert(azure_query(node,f"SELECT * from test_simple_write_connection_string_restored") == "1\ta\n")