Merge pull request #56988 from ClickHouse/Azure_backup

Backup & Restore support for AzureBlobStorage
This commit is contained in:
Vitaly Baranov 2024-02-09 14:45:35 +01:00 committed by GitHub
commit 09e727d48b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1325 additions and 13 deletions

View File

@ -451,3 +451,24 @@ To disallow concurrent backup/restore, you can use these settings respectively.
The default value for both is true, so by default concurrent backup/restores are allowed.
When these settings are false on a cluster, only 1 backup/restore is allowed to run on a cluster at a time.
## Configuring BACKUP/RESTORE to use an AzureBlobStorage Endpoint
To write backups to an AzureBlobStorage container you need the following pieces of information:
- AzureBlobStorage endpoint connection string / url,
- Container,
- Path,
- Account name (if url is specified)
- Account Key (if url is specified)
The destination for a backup will be specified like this:
```
AzureBlobStorage('<connection string>/<url>', '<container>', '<path>', '<account name>', '<account key>')
```
```sql
BACKUP TABLE data TO AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
'test_container', 'data_backup');
RESTORE TABLE data AS data_restored FROM AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
'test_container', 'data_backup');
```

View File

@ -33,11 +33,13 @@ void BackupFactory::registerBackupEngine(const String & engine_name, const Creat
void registerBackupEnginesFileAndDisk(BackupFactory &);
void registerBackupEngineS3(BackupFactory &);
void registerBackupEngineAzureBlobStorage(BackupFactory &);
void registerBackupEngines(BackupFactory & factory)
{
registerBackupEnginesFileAndDisk(factory);
registerBackupEngineS3(factory);
registerBackupEngineAzureBlobStorage(factory);
}
BackupFactory::BackupFactory()

View File

@ -0,0 +1,320 @@
#include <Backups/BackupIO_AzureBlobStorage.h>
#if USE_AZURE_BLOB_STORAGE
#include <Common/quoteString.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <IO/SharedThreadPools.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/StorageAzureBlobCluster.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/AzureBlobStorage/copyAzureBlobStorageFile.h>
#include <Disks/IDisk.h>
#include <Disks/DiskType.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, configuration(configuration_)
{
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration_.container);
client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
}
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
bool BackupReaderAzureBlobStorage::fileExists(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return object_storage->exists(StoredObject(key));
}
UInt64 BackupReaderAzureBlobStorage::getFileSize(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
return object_metadata.size_bytes;
}
std::unique_ptr<SeekableReadBuffer> BackupReaderAzureBlobStorage::readFile(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client, key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
}
void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
{
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if ((destination_data_source_description.type == DataSourceType::ObjectStorage)
&& (destination_data_source_description.object_storage_type == ObjectStorageType::Azure)
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
{
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
if (blob_path.size() != 2 || mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Blob writing function called with unexpected blob_path.size={} or mode={}",
blob_path.size(), mode);
copyAzureBlobStorageFile(
client,
destination_disk->getObjectStorage()->getAzureBlobStorageClient(),
configuration.container,
fs::path(configuration.blob_path) / path_in_backup,
0,
file_size,
/* dest_container */ blob_path[1],
/* dest_path */ blob_path[0],
settings,
read_settings,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"),
/* for_disk_azure_blob_storage= */ true);
return file_size;
};
destination_disk->writeFileUsingBlobWritingFunction(destination_path, write_mode, write_blob_function);
return; /// copied!
}
/// Fallback to copy through buffers.
BackupReaderDefault::copyFileToDisk(path_in_backup, file_size, encrypted_in_backup, destination_disk, destination_path, write_mode);
}
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, configuration(configuration_)
{
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration_.container);
client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
}
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
bool copy_encrypted, UInt64 start_pos, UInt64 length)
{
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
auto source_data_source_description = src_disk->getDataSourceDescription();
if (source_data_source_description.sameKind(data_source_description) && (source_data_source_description.is_encrypted == copy_encrypted))
{
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
/// In this case we can't use the native copy.
if (auto blob_path = src_disk->getBlobPath(src_path); blob_path.size() == 2)
{
LOG_TRACE(log, "Copying file {} from disk {} to AzureBlobStorag", src_path, src_disk->getName());
copyAzureBlobStorageFile(
src_disk->getObjectStorage()->getAzureBlobStorageClient(),
client,
/* src_container */ blob_path[1],
/* src_path */ blob_path[0],
start_pos,
length,
configuration.container,
fs::path(configuration.blob_path) / path_in_backup,
settings,
read_settings,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
return; /// copied!
}
}
/// Fallback to copy through buffers.
BackupWriterDefault::copyFileFromDisk(path_in_backup, src_disk, src_path, copy_encrypted, start_pos, length);
}
void BackupWriterAzureBlobStorage::copyFile(const String & destination, const String & source, size_t size)
{
LOG_TRACE(log, "Copying file inside backup from {} to {} ", source, destination);
copyAzureBlobStorageFile(
client,
client,
configuration.container,
fs::path(source),
0,
size,
/* dest_container */ configuration.container,
/* dest_path */ destination,
settings,
read_settings,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"),
/* for_disk_azure_blob_storage= */ true);
}
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToAzureBlobStorageFile(create_read_buffer, start_pos, length, client, configuration.container, path_in_backup, settings,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
}
BackupWriterAzureBlobStorage::~BackupWriterAzureBlobStorage() = default;
bool BackupWriterAzureBlobStorage::fileExists(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return object_storage->exists(StoredObject(key));
}
UInt64 BackupWriterAzureBlobStorage::getFileSize(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
RelativePathsWithMetadata children;
object_storage->listObjects(key,children,/*max_keys*/0);
if (children.empty())
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Object must exist");
return children[0].metadata.size_bytes;
}
std::unique_ptr<ReadBuffer> BackupWriterAzureBlobStorage::readFile(const String & file_name, size_t /*expected_file_size*/)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client, key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
}
std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client,
key,
settings->max_single_part_upload_size,
settings->max_unexpected_write_error_retries,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings);
}
void BackupWriterAzureBlobStorage::removeFile(const String & file_name)
{
String key;
if (startsWith(file_name, "."))
{
key= configuration.blob_path + file_name;
}
else
{
key = file_name;
}
StoredObject object(key);
object_storage->removeObjectIfExists(object);
}
void BackupWriterAzureBlobStorage::removeFiles(const Strings & file_names)
{
StoredObjects objects;
for (const auto & file_name : file_names)
objects.emplace_back(file_name);
object_storage->removeObjectsIfExist(objects);
}
void BackupWriterAzureBlobStorage::removeFilesBatch(const Strings & file_names)
{
StoredObjects objects;
for (const auto & file_name : file_names)
objects.emplace_back(file_name);
object_storage->removeObjectsIfExist(objects);
}
}
#endif

View File

@ -0,0 +1,68 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Backups/BackupIO_Default.h>
#include <Disks/DiskType.h>
#include <Storages/StorageAzureBlobCluster.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
/// Represents a backup stored to Azure
class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupReaderAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
void copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) override;
private:
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<const AzureObjectStorageSettings> settings;
};
class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupWriterAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) override;
void copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
bool copy_encrypted, UInt64 start_pos, UInt64 length) override;
void copyFile(const String & destination, const String & source, size_t size) override;
void removeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
private:
std::unique_ptr<ReadBuffer> readFile(const String & file_name, size_t expected_file_size) override;
void removeFilesBatch(const Strings & file_names);
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureBlob::Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<const AzureObjectStorageSettings> settings;
};
}
#endif

View File

@ -939,12 +939,12 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry)
}
else if (src_disk && from_immutable_file)
{
LOG_TRACE(log, "Writing backup for file {} from {} (disk {}): data file #{}", info.data_file_name, src_file_desc, src_disk->getName(), info.data_file_index);
LOG_INFO(log, "Writing backup for file {} from {} (disk {}): data file #{}", info.data_file_name, src_file_desc, src_disk->getName(), info.data_file_index);
writer->copyFileFromDisk(info.data_file_name, src_disk, src_file_path, info.encrypted_by_disk, info.base_size, info.size - info.base_size);
}
else
{
LOG_TRACE(log, "Writing backup for file {} from {}: data file #{}", info.data_file_name, src_file_desc, info.data_file_index);
LOG_INFO(log, "Writing backup for file {} from {}: data file #{}", info.data_file_name, src_file_desc, info.data_file_index);
auto create_read_buffer = [entry, read_settings = writer->getReadSettings()] { return entry->getReadBuffer(read_settings); };
writer->copyDataToFile(info.data_file_name, create_read_buffer, info.base_size, info.size - info.base_size);
}

View File

@ -0,0 +1,172 @@
#include "config.h"
#include <Backups/BackupFactory.h>
#include <Common/Exception.h>
#if USE_AZURE_BLOB_STORAGE
#include <Backups/BackupIO_AzureBlobStorage.h>
#include <Storages/StorageAzureBlob.h>
#include <Backups/BackupImpl.h>
#include <IO/Archives/hasRegisteredArchiveFileExtension.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
#if USE_AZURE_BLOB_STORAGE
namespace
{
String removeFileNameFromURL(String & url)
{
Poco::URI url2{url};
String path = url2.getPath();
size_t slash_pos = path.find_last_of('/');
String file_name = path.substr(slash_pos + 1);
path.resize(slash_pos + 1);
url2.setPath(path);
url = url2.toString();
return file_name;
}
}
#endif
void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
{
auto creator_fn = []([[maybe_unused]] const BackupFactory::CreateParams & params) -> std::unique_ptr<IBackup>
{
#if USE_AZURE_BLOB_STORAGE
const String & id_arg = params.backup_info.id_arg;
const auto & args = params.backup_info.args;
StorageAzureBlob::Configuration configuration;
if (!id_arg.empty())
{
const auto & config = params.context->getConfigRef();
auto config_prefix = "named_collections." + id_arg;
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", id_arg);
if (config.has(config_prefix + ".connection_string"))
{
configuration.connection_url = config.getString(config_prefix + ".connection_string");
configuration.is_connection_string = true;
configuration.container = config.getString(config_prefix + ".container");
}
else
{
configuration.connection_url = config.getString(config_prefix + ".storage_account_url");
configuration.is_connection_string = false;
configuration.container = config.getString(config_prefix + ".container");
configuration.account_name = config.getString(config_prefix + ".account_name");
configuration.account_key = config.getString(config_prefix + ".account_key");
}
if (args.size() > 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Backup AzureBlobStorage requires 1 or 2 arguments: named_collection, [filename]");
if (args.size() == 1)
configuration.blob_path = args[0].safeGet<String>();
}
else
{
if (args.size() == 3)
{
configuration.connection_url = args[0].safeGet<String>();
configuration.is_connection_string = true;
configuration.container = args[1].safeGet<String>();
configuration.blob_path = args[2].safeGet<String>();
}
else if (args.size() == 5)
{
configuration.connection_url = args[0].safeGet<String>();
configuration.is_connection_string = false;
configuration.container = args[1].safeGet<String>();
configuration.blob_path = args[2].safeGet<String>();
configuration.account_name = args[3].safeGet<String>();
configuration.account_key = args[4].safeGet<String>();
}
else
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Backup AzureBlobStorage requires 3 or 5 arguments: connection string>/<url, container, path, [account name], [account key]");
}
}
BackupImpl::ArchiveParams archive_params;
if (hasRegisteredArchiveFileExtension(configuration.blob_path))
{
if (params.is_internal_backup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Using archives with backups on clusters is disabled");
archive_params.archive_name = removeFileNameFromURL(configuration.blob_path);
archive_params.compression_method = params.compression_method;
archive_params.compression_level = params.compression_level;
archive_params.password = params.password;
}
else
{
if (!params.password.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Password is not applicable, backup cannot be encrypted");
}
if (params.open_mode == IBackup::OpenMode::READ)
{
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
params.backup_info,
archive_params,
params.base_backup_info,
reader,
params.context,
/* use_same_s3_credentials_for_base_backup*/ false);
}
else
{
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
params.backup_info,
archive_params,
params.base_backup_info,
writer,
params.context,
params.is_internal_backup,
params.backup_coordination,
params.backup_uuid,
params.deduplicate_files,
/* use_same_s3_credentials_for_base_backup */ false);
}
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "AzureBlobStorage support is disabled");
#endif
};
factory.registerBackupEngine("AzureBlobStorage", creator_fn);
}
}

View File

@ -89,6 +89,7 @@ add_headers_and_sources(clickhouse_common_io Common/SSH)
add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/S3)
add_headers_and_sources(clickhouse_common_io IO/AzureBlobStorage)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
@ -141,6 +142,7 @@ endif()
if (TARGET ch_contrib::azure_sdk)
add_headers_and_sources(dbms Disks/ObjectStorages/AzureBlobStorage)
add_headers_and_sources(dbms IO/AzureBlobStorage)
endif()
if (TARGET ch_contrib::hdfs)
@ -496,6 +498,7 @@ if (TARGET ch_contrib::aws_s3)
endif()
if (TARGET ch_contrib::azure_sdk)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::azure_sdk)
dbms_target_link_libraries (PRIVATE ch_contrib::azure_sdk)
endif()

View File

@ -384,6 +384,10 @@ The server successfully detected this situation and will download merged part fr
M(S3PutObject, "Number of S3 API PutObject calls.") \
M(S3GetObject, "Number of S3 API GetObject calls.") \
\
M(AzureUploadPart, "Number of Azure blob storage API UploadPart calls") \
M(DiskAzureUploadPart, "Number of Disk Azure blob storage API UploadPart calls") \
M(AzureCopyObject, "Number of Azure blob storage API CopyObject calls") \
M(DiskAzureCopyObject, "Number of Disk Azure blob storage API CopyObject calls") \
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.") \
\

View File

@ -82,7 +82,8 @@ class IColumn;
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_single_part_copy_size, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single Azure blob storage read.", 0) \
M(UInt64, azure_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write", 0) \

View File

@ -166,6 +166,9 @@ std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Po
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getUInt64(config_prefix + ".max_upload_part_size", 5ULL * 1024 * 1024 * 1024),
config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size),
config.getBool(config_prefix + ".use_native_copy", false),
config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries)
);
}

View File

@ -92,10 +92,12 @@ private:
AzureObjectStorage::AzureObjectStorage(
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_)
SettingsPtr && settings_,
const String & container_)
: name(name_)
, client(std::move(client_))
, settings(std::move(settings_))
, container(container_)
, log(getLogger("AzureObjectStorage"))
{
}
@ -376,7 +378,8 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std
return std::make_unique<AzureObjectStorage>(
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context)
getAzureBlobStorageSettings(config, config_prefix, context),
container
);
}

View File

@ -24,12 +24,18 @@ struct AzureObjectStorageSettings
int max_single_read_retries_,
int max_single_download_retries_,
int list_object_keys_size_,
size_t max_upload_part_size_,
size_t max_single_part_copy_size_,
bool use_native_copy_,
size_t max_unexpected_write_error_retries_)
: max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
, list_object_keys_size(list_object_keys_size_)
, max_upload_part_size(max_upload_part_size_)
, max_single_part_copy_size(max_single_part_copy_size_)
, use_native_copy(use_native_copy_)
, max_unexpected_write_error_retries (max_unexpected_write_error_retries_)
{
}
@ -41,6 +47,10 @@ struct AzureObjectStorageSettings
size_t max_single_read_retries = 3;
size_t max_single_download_retries = 3;
int list_object_keys_size = 1000;
size_t min_upload_part_size = 16 * 1024 * 1024;
size_t max_upload_part_size = 5ULL * 1024 * 1024 * 1024;
size_t max_single_part_copy_size = 256 * 1024 * 1024;
bool use_native_copy = false;
size_t max_unexpected_write_error_retries = 4;
};
@ -56,7 +66,8 @@ public:
AzureObjectStorage(
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_);
SettingsPtr && settings_,
const String & container_);
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
@ -119,7 +130,7 @@ public:
const std::string & config_prefix,
ContextPtr context) override;
String getObjectsNamespace() const override { return ""; }
String getObjectsNamespace() const override { return container ; }
std::unique_ptr<IObjectStorage> cloneObjectStorage(
const std::string & new_namespace,
@ -131,11 +142,19 @@ public:
bool isRemote() const override { return true; }
std::shared_ptr<const AzureObjectStorageSettings> getSettings() { return settings.get(); }
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() override
{
return client.get();
}
private:
const String name;
/// client used to access the files in the Blob Storage cloud
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
MultiVersion<AzureObjectStorageSettings> settings;
const String container;
LoggerPtr log;
};

View File

@ -3,6 +3,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include "config.h"
namespace Poco
{
@ -120,6 +121,13 @@ public:
static bool canUseReadThroughCache(const ReadSettings & settings);
#if USE_AZURE_BLOB_STORAGE
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() override
{
return object_storage->getAzureBlobStorageClient();
}
#endif
private:
FileCacheKey getCacheKey(const std::string & path) const;

View File

@ -23,11 +23,22 @@
#include <Disks/DirectoryIterator.h>
#include <Common/ThreadPool.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/Exception.h>
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Common/MultiVersion.h>
#include <azure/storage/blobs.hpp>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
@ -214,6 +225,14 @@ public:
virtual WriteSettings patchSettings(const WriteSettings & write_settings) const;
#if USE_AZURE_BLOB_STORAGE
virtual std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "This function is only implemented for AzureBlobStorage");
}
#endif
private:
mutable std::mutex throttlers_mutex;
ThrottlerPtr remote_read_throttler;

View File

@ -213,10 +213,12 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
String container_name = config.getString(config_prefix + ".container_name", "default-container");
return std::make_unique<AzureObjectStorage>(
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context));
getAzureBlobStorageSettings(config, config_prefix, context),
container_name);
});
}

View File

@ -0,0 +1,340 @@
#include <IO/AzureBlobStorage/copyAzureBlobStorageFile.h>
#if USE_AZURE_BLOB_STORAGE
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>
#include <Interpreters/Context.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/StdStreamFromReadBuffer.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <Common/getRandomASCIIString.h>
#include <IO/SharedThreadPools.h>
namespace ProfileEvents
{
extern const Event AzureCopyObject;
extern const Event AzureUploadPart;
extern const Event DiskAzureCopyObject;
extern const Event DiskAzureUploadPart;
}
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int AZURE_BLOB_STORAGE_ERROR;
}
namespace
{
class UploadHelper
{
public:
UploadHelper(
const CreateReadBuffer & create_read_buffer_,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client_,
size_t offset_,
size_t total_size_,
const String & dest_container_for_logging_,
const String & dest_blob_,
std::shared_ptr<const AzureObjectStorageSettings> settings_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_azure_blob_storage_,
const Poco::Logger * log_)
: create_read_buffer(create_read_buffer_)
, client(client_)
, offset (offset_)
, total_size (total_size_)
, dest_container_for_logging(dest_container_for_logging_)
, dest_blob(dest_blob_)
, settings(settings_)
, schedule(schedule_)
, for_disk_azure_blob_storage(for_disk_azure_blob_storage_)
, log(log_)
, max_single_part_upload_size(settings_->max_single_part_upload_size)
{
}
virtual ~UploadHelper() = default;
protected:
std::function<std::unique_ptr<SeekableReadBuffer>()> create_read_buffer;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
size_t offset;
size_t total_size;
const String & dest_container_for_logging;
const String & dest_blob;
std::shared_ptr<const AzureObjectStorageSettings> settings;
ThreadPoolCallbackRunner<void> schedule;
bool for_disk_azure_blob_storage;
const Poco::Logger * log;
size_t max_single_part_upload_size;
struct UploadPartTask
{
size_t part_offset;
size_t part_size;
std::vector<std::string> block_ids;
bool is_finished = false;
std::exception_ptr exception;
};
size_t normal_part_size;
std::vector<std::string> block_ids;
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar;
void calculatePartSize()
{
auto max_upload_part_size = settings->max_upload_part_size;
if (!max_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be 0");
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = max_upload_part_size;
}
public:
void performCopy()
{
performMultipartUpload();
}
void completeMultipartUpload()
{
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
block_blob_client.CommitBlockList(block_ids);
}
void performMultipartUpload()
{
calculatePartSize();
size_t position = offset;
size_t end_position = offset + total_size;
try
{
while (position < end_position)
{
size_t next_position = std::min(position + normal_part_size, end_position);
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
uploadPart(position, part_size);
position = next_position;
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
waitForAllBackgroundTasks();
throw;
}
waitForAllBackgroundTasks();
completeMultipartUpload();
}
void uploadPart(size_t part_offset, size_t part_size)
{
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, Size: {}", dest_container_for_logging, dest_blob, part_size);
if (!part_size)
{
LOG_TRACE(log, "Skipping writing an empty part.");
return;
}
if (schedule)
{
UploadPartTask * task = nullptr;
{
std::lock_guard lock(bg_tasks_mutex);
task = &bg_tasks.emplace_back();
++num_added_bg_tasks;
}
/// Notify waiting thread when task finished
auto task_finish_notify = [this, task]()
{
std::lock_guard lock(bg_tasks_mutex);
task->is_finished = true;
++num_finished_bg_tasks;
/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};
try
{
task->part_offset = part_offset;
task->part_size = part_size;
schedule([this, task, task_finish_notify]()
{
try
{
processUploadPartRequest(*task);
}
catch (...)
{
task->exception = std::current_exception();
}
task_finish_notify();
}, Priority{});
}
catch (...)
{
task_finish_notify();
throw;
}
}
else
{
UploadPartTask task;
task.part_offset = part_offset;
task.part_size = part_size;
processUploadPartRequest(task);
block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end());
}
}
void processUploadPartRequest(UploadPartTask & task)
{
ProfileEvents::increment(ProfileEvents::AzureUploadPart);
if (for_disk_azure_blob_storage)
ProfileEvents::increment(ProfileEvents::DiskAzureUploadPart);
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), task.part_offset, task.part_size);
while (!read_buffer->eof())
{
auto size = read_buffer->available();
if (size > 0)
{
auto block_id = getRandomASCIIString(64);
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(read_buffer->position()), size);
block_blob_client.StageBlock(block_id, memory);
task.block_ids.emplace_back(block_id);
read_buffer->ignore(size);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}", dest_container_for_logging, dest_blob, block_id);
}
}
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, Parts: {}", dest_container_for_logging, dest_blob, bg_tasks.size());
}
void waitForAllBackgroundTasks()
{
if (!schedule)
return;
std::unique_lock lock(bg_tasks_mutex);
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks);
for (auto & task : tasks)
{
if (task.exception)
std::rethrow_exception(task.exception);
block_ids.insert(block_ids.end(),task.block_ids.begin(), task.block_ids.end());
}
}
};
}
void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureObjectStorageSettings> settings,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_azure_blob_storage)
{
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
helper.performCopy();
}
void copyAzureBlobStorageFile(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
const String & src_container_for_logging,
const String & src_blob,
size_t offset,
size_t size,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureObjectStorageSettings> settings,
const ReadSettings & read_settings,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_azure_blob_storage)
{
if (settings->use_native_copy)
{
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (for_disk_azure_blob_storage)
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
auto block_blob_client_src = src_client->GetBlockBlobClient(src_blob);
auto block_blob_client_dest = dest_client->GetBlockBlobClient(dest_blob);
auto source_uri = block_blob_client_src.GetUrl();
if (size < settings->max_single_part_copy_size)
{
block_blob_client_dest.CopyFromUri(source_uri);
}
else
{
Azure::Storage::Blobs::StartBlobCopyOperation operation = block_blob_client_dest.StartCopyFromUri(source_uri);
// Wait for the operation to finish, checking for status every 100 second.
auto copy_response = operation.PollUntilDone(std::chrono::milliseconds(100));
auto properties_model = copy_response.Value;
if (properties_model.CopySource.HasValue())
{
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy failed");
}
}
}
else
{
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_blob, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyAzureBlobStorageFile")};
helper.performCopy();
}
}
}
#endif

View File

@ -0,0 +1,56 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Storages/StorageAzureBlobCluster.h>
#include <Storages/StorageAzureBlob.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <base/types.h>
#include <functional>
#include <memory>
namespace DB
{
class SeekableReadBuffer;
using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// Copies a file from AzureBlobStorage to AzureBlobStorage.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyAzureBlobStorageFile(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
const String & src_container_for_logging,
const String & src_blob,
size_t src_offset,
size_t src_size,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureObjectStorageSettings> settings,
const ReadSettings & read_settings,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_azure_blob_storage = false);
/// Copies data from any seekable source to AzureBlobStorage.
/// The same functionality can be done by using the function copyData() and the class WriteBufferFromS3
/// however copyDataToS3File() is faster and spends less memory.
/// The callback `create_read_buffer` can be called from multiple threads in parallel, so that should be thread-safe.
/// The parameters `offset` and `size` specify a part in the source to copy.
void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureObjectStorageSettings> settings,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_azure_blob_storage = false);
}
#endif

View File

@ -297,7 +297,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
return std::make_shared<StorageAzureBlob>(
std::move(configuration),
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings)),
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings),configuration.container),
args.getContext(),
args.table_id,
args.columns,

View File

@ -262,7 +262,7 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings));
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container);
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context, false);
}
@ -293,7 +293,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
StoragePtr storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
context,
StorageID(getDatabaseName(), table_name),
columns,

View File

@ -40,7 +40,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
/// On worker node this filename won't contains globs
storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
context,
StorageID(getDatabaseName(), table_name),
columns,
@ -55,7 +55,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
storage = std::make_shared<StorageAzureBlobCluster>(
cluster_name,
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,270 @@
#!/usr/bin/env python3
import gzip
import json
import logging
import os
import io
import random
import threading
import time
from azure.storage.blob import BlobServiceClient
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
def generate_cluster_def(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/named_collections.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(
f"""<clickhouse>
<named_collections>
<azure_conf1>
<connection_string>DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:{port}/devstoreaccount1;</connection_string>
<container>cont</container>
<format>CSV</format>
</azure_conf1>
<azure_conf2>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container>cont</container>
<format>CSV</format>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</azure_conf2>
</named_collections>
</clickhouse>
"""
)
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_cluster_def(port)
cluster.add_instance(
"node",
main_configs=[path],
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def azure_query(
node, query, expect_error="false", try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error == "true":
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
if query_on_retry is not None:
node.query(query_on_retry)
continue
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(
str(connection_string)
)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def put_azure_file_content(filename, port, data):
container_name = "cont"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
try:
container_client = blob_service_client.create_container(container_name)
except:
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
buf = io.BytesIO(data)
blob_client.upload_blob(buf)
@pytest.fixture(autouse=True, scope="function")
def delete_all_files(cluster):
port = cluster.env_variables["AZURITE_PORT"]
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
containers = blob_service_client.list_containers()
for container in containers:
container_client = blob_service_client.get_container_client(container)
blob_list = container_client.list_blobs()
for blob in blob_list:
print(blob)
blob_client = container_client.get_blob_client(blob)
blob_client.delete_blob()
assert len(list(container_client.list_blobs())) == 0
yield
def test_backup_restore(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write_c.csv', 'CSV')",
)
azure_query(
node, f"INSERT INTO test_simple_write_connection_string VALUES (1, 'a')"
)
print(get_azure_file_content("test_simple_write_c.csv", port))
assert get_azure_file_content("test_simple_write_c.csv", port) == '1,"a"\n'
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write_c_backup.csv')"
azure_query(
node,
f"BACKUP TABLE test_simple_write_connection_string TO {backup_destination}",
)
print(get_azure_file_content("test_simple_write_c_backup.csv.backup", port))
azure_query(
node,
f"RESTORE TABLE test_simple_write_connection_string AS test_simple_write_connection_string_restored FROM {backup_destination};",
)
assert (
azure_query(node, f"SELECT * from test_simple_write_connection_string_restored")
== "1\ta\n"
)
def test_backup_restore_diff_container(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_simple_write_connection_string_cont1 (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write_c_cont1.csv', 'CSV')",
)
azure_query(
node, f"INSERT INTO test_simple_write_connection_string_cont1 VALUES (1, 'a')"
)
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont1', 'test_simple_write_c_backup_cont1.csv')"
azure_query(
node,
f"BACKUP TABLE test_simple_write_connection_string_cont1 TO {backup_destination}",
)
azure_query(
node,
f"RESTORE TABLE test_simple_write_connection_string_cont1 AS test_simple_write_connection_string_restored_cont1 FROM {backup_destination};",
)
assert (
azure_query(
node, f"SELECT * from test_simple_write_connection_string_restored_cont1"
)
== "1\ta\n"
)
def test_backup_restore_with_named_collection_azure_conf1(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write.csv', 'CSV')",
)
azure_query(node, f"INSERT INTO test_write_connection_string VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write.csv", port))
assert get_azure_file_content("test_simple_write.csv", port) == '1,"a"\n'
backup_destination = (
f"AzureBlobStorage(azure_conf1, 'test_simple_write_nc_backup.csv')"
)
azure_query(
node,
f"BACKUP TABLE test_write_connection_string TO {backup_destination}",
)
print(get_azure_file_content("test_simple_write_nc_backup.csv.backup", port))
azure_query(
node,
f"RESTORE TABLE test_write_connection_string AS test_write_connection_string_restored FROM {backup_destination};",
)
assert (
azure_query(node, f"SELECT * from test_write_connection_string_restored")
== "1\ta\n"
)
def test_backup_restore_with_named_collection_azure_conf2(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_write_connection_string_2 (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_write_2.csv', 'CSV')",
)
azure_query(node, f"INSERT INTO test_write_connection_string_2 VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write_2.csv", port))
assert get_azure_file_content("test_simple_write_2.csv", port) == '1,"a"\n'
backup_destination = (
f"AzureBlobStorage(azure_conf2, 'test_simple_write_nc_backup_2.csv')"
)
azure_query(
node,
f"BACKUP TABLE test_write_connection_string_2 TO {backup_destination}",
)
print(get_azure_file_content("test_simple_write_nc_backup_2.csv.backup", port))
azure_query(
node,
f"RESTORE TABLE test_write_connection_string_2 AS test_write_connection_string_restored_2 FROM {backup_destination};",
)
assert (
azure_query(node, f"SELECT * from test_write_connection_string_restored_2")
== "1\ta\n"
)