Merge pull request #64116 from ClickHouse/fix_azure_native_copy

Properly support native copy for azure
This commit is contained in:
alesapin 2024-05-20 10:04:16 +00:00 committed by GitHub
commit 4ca2c0fcd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 301 additions and 35 deletions

View File

@ -39,6 +39,7 @@ public:
std::optional<UUID> backup_uuid;
bool deduplicate_files = true;
bool allow_s3_native_copy = true;
bool allow_azure_native_copy = true;
bool use_same_s3_credentials_for_base_backup = false;
bool azure_attempt_to_create_container = true;
ReadSettings read_settings;

View File

@ -31,22 +31,28 @@ namespace ErrorCodes
BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
, configuration(configuration_)
{
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration_.container);
object_storage = std::make_unique<AzureObjectStorage>(
"BackupReaderAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration.container,
configuration.getConnectionURLWithContainer());
client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
auto settings_copy = *object_storage->getSettings();
settings_copy.use_native_copy = allow_azure_native_copy;
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
}
BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;
@ -76,9 +82,9 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
DiskPtr destination_disk, const String & destination_path, WriteMode write_mode)
{
auto destination_data_source_description = destination_disk->getDataSourceDescription();
if ((destination_data_source_description.type == DataSourceType::ObjectStorage)
&& (destination_data_source_description.object_storage_type == ObjectStorageType::Azure)
&& (destination_data_source_description.is_encrypted == encrypted_in_backup))
LOG_TRACE(log, "Source description {}, desctionation description {}", data_source_description.description, destination_data_source_description.description);
if (destination_data_source_description.sameKind(data_source_description)
&& destination_data_source_description.is_encrypted == encrypted_in_backup)
{
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
@ -116,12 +122,13 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_,
bool attempt_to_create_container)
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURLWithContainer(), false, false}
, configuration(configuration_)
{
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container);
@ -130,9 +137,12 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
std::move(client_ptr),
StorageAzureBlob::createSettings(context_),
configuration_.container);
configuration_.container,
configuration.getConnectionURLWithContainer());
client = object_storage->getAzureBlobStorageClient();
settings = object_storage->getSettings();
auto settings_copy = *object_storage->getSettings();
settings_copy.use_native_copy = allow_azure_native_copy;
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
}
void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
@ -140,7 +150,9 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(const String & path_in_backu
{
/// Use the native copy as a more optimal way to copy a file from AzureBlobStorage to AzureBlobStorage if it's possible.
auto source_data_source_description = src_disk->getDataSourceDescription();
if (source_data_source_description.sameKind(data_source_description) && (source_data_source_description.is_encrypted == copy_encrypted))
LOG_TRACE(log, "Source description {}, desctionation description {}", source_data_source_description.description, data_source_description.description);
if (source_data_source_description.sameKind(data_source_description)
&& source_data_source_description.is_encrypted == copy_encrypted)
{
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
/// In this case we can't use the native copy.

View File

@ -16,7 +16,12 @@ namespace DB
class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
BackupReaderAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_);
~BackupReaderAzureBlobStorage() override;
bool fileExists(const String & file_name) override;
@ -37,7 +42,13 @@ private:
class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(StorageAzureBlob::Configuration configuration_, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_, bool attempt_to_create_container);
BackupWriterAzureBlobStorage(
StorageAzureBlob::Configuration configuration_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_,
bool attempt_to_create_container);
~BackupWriterAzureBlobStorage() override;
bool fileExists(const String & file_name) override;

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
M(Bool, decrypt_files_from_encrypted_disks) \
M(Bool, deduplicate_files) \
M(Bool, allow_s3_native_copy) \
M(Bool, allow_azure_native_copy) \
M(Bool, use_same_s3_credentials_for_base_backup) \
M(Bool, azure_attempt_to_create_container) \
M(Bool, read_from_filesystem_cache) \

View File

@ -44,6 +44,9 @@ struct BackupSettings
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
bool allow_s3_native_copy = true;
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
bool allow_azure_native_copy = true;
/// Whether base backup to S3 should inherit credentials from the BACKUP query.
bool use_same_s3_credentials_for_base_backup = false;

View File

@ -598,6 +598,7 @@ void BackupsWorker::doBackup(
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
backup_create_params.allow_azure_native_copy = backup_settings.allow_azure_native_copy;
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
backup_create_params.azure_attempt_to_create_container = backup_settings.azure_attempt_to_create_container;
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);

View File

@ -135,10 +135,12 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
if (params.open_mode == IBackup::OpenMode::READ)
{
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context);
auto reader = std::make_shared<BackupReaderAzureBlobStorage>(
configuration,
params.allow_azure_native_copy,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
params.backup_info,
@ -150,11 +152,13 @@ void registerBackupEngineAzureBlobStorage(BackupFactory & factory)
}
else
{
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(configuration,
params.read_settings,
params.write_settings,
params.context,
params.azure_attempt_to_create_container);
auto writer = std::make_shared<BackupWriterAzureBlobStorage>(
configuration,
params.allow_azure_native_copy,
params.read_settings,
params.write_settings,
params.context,
params.azure_attempt_to_create_container);
return std::make_unique<BackupImpl>(
params.backup_info,

View File

@ -107,11 +107,13 @@ AzureObjectStorage::AzureObjectStorage(
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_,
const String & object_namespace_)
const String & object_namespace_,
const String & description_)
: name(name_)
, client(std::move(client_))
, settings(std::move(settings_))
, object_namespace(object_namespace_)
, description(description_)
, log(getLogger("AzureObjectStorage"))
{
}
@ -409,7 +411,8 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
object_namespace
object_namespace,
description
);
}

View File

@ -81,7 +81,8 @@ public:
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_,
const String & object_namespace_);
const String & object_namespace_,
const String & description_);
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
@ -93,7 +94,7 @@ public:
std::string getCommonKeyPrefix() const override { return ""; }
std::string getDescription() const override { return client.get()->GetUrl(); }
std::string getDescription() const override { return description; }
bool exists(const StoredObject & object) const override;
@ -172,6 +173,7 @@ private:
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
MultiVersion<AzureObjectStorageSettings> settings;
const String object_namespace; /// container + prefix
const String description; /// url + container
LoggerPtr log;
};

View File

@ -306,11 +306,14 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
bool /* skip_access_check */) -> ObjectStoragePtr
{
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
std::string endpoint_string = endpoint.getEndpoint();
return createObjectStorage<AzureObjectStorage>(
ObjectStorageType::Azure, config, config_prefix, name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix);
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix,
endpoint.prefix.empty() ? endpoint_string : endpoint_string.substr(0, endpoint_string.length() - endpoint.prefix.length()));
};
factory.registerObjectStorageType("azure_blob_storage", creator);
factory.registerObjectStorageType("azure", creator);

View File

@ -289,6 +289,7 @@ void copyAzureBlobStorageFile(
if (settings->use_native_copy)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (dest_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);

View File

@ -302,8 +302,8 @@ void registerStorageAzureBlob(StorageFactory & factory)
auto settings = StorageAzureBlob::createSettings(args.getContext());
return std::make_shared<StorageAzureBlob>(
std::move(configuration),
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings),configuration.container),
configuration,
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
args.getContext(),
args.table_id,
args.columns,
@ -491,6 +491,12 @@ Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl());
}
std::string StorageAzureBlob::Configuration::getConnectionURLWithContainer() const
{
auto url = getConnectionURL();
return fs::path(url.toString()) / container;
}
bool StorageAzureBlob::Configuration::withGlobsIgnorePartitionWildcard() const
{
if (!withPartitionWildcard())

View File

@ -45,6 +45,8 @@ public:
Poco::URI getConnectionURL() const;
std::string getConnectionURLWithContainer() const;
std::string connection_url;
bool is_connection_string;

View File

@ -333,7 +333,7 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer());
if (configuration.format == "auto")
return StorageAzureBlob::getTableStructureAndFormatFromData(object_storage.get(), configuration, std::nullopt, context).first;
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
@ -365,7 +365,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
StoragePtr storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
context,
StorageID(getDatabaseName(), table_name),
columns,

View File

@ -39,7 +39,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
/// On worker node this filename won't contains globs
storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
context,
StorageID(getDatabaseName(), table_name),
columns,
@ -54,7 +54,7 @@ StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
storage = std::make_shared<StorageAzureBlobCluster>(
cluster_name,
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container),
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings), configuration.container, configuration.getConnectionURLWithContainer()),
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,215 @@
#!/usr/bin/env python3
import gzip
import json
import logging
import os
import io
import random
import threading
import time
from azure.storage.blob import BlobServiceClient
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
def generate_config(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/storage_conf.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
TEMPLATE = """
<clickhouse>
<storage_configuration>
<disks>
<disk_azure>
<metadata_type>local</metadata_type>
<type>object_storage</type>
<object_storage_type>azure_blob_storage</object_storage_type>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<use_native_copy>true</use_native_copy>
</disk_azure>
<disk_azure_other_bucket>
<metadata_type>local</metadata_type>
<type>object_storage</type>
<object_storage_type>azure_blob_storage</object_storage_type>
<use_native_copy>true</use_native_copy>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>othercontainer</container_name>
<skip_access_check>false</skip_access_check>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</disk_azure_other_bucket>
<disk_azure_cache>
<type>cache</type>
<disk>disk_azure</disk>
<path>/tmp/azure_cache/</path>
<max_size>1000000000</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</disk_azure_cache>
</disks>
<policies>
<policy_azure>
<volumes>
<main>
<disk>disk_azure</disk>
</main>
</volumes>
</policy_azure>
<policy_azure_other_bucket>
<volumes>
<main>
<disk>disk_azure_other_bucket</disk>
</main>
</volumes>
</policy_azure_other_bucket>
<policy_azure_cache>
<volumes>
<main>
<disk>disk_azure_cache</disk>
</main>
</volumes>
</policy_azure_cache>
</policies>
</storage_configuration>
<backups>
<allowed_disk>disk_azure</allowed_disk>
<allowed_disk>disk_azure_cache</allowed_disk>
<allowed_disk>disk_azure_other_bucket</allowed_disk>
</backups>
</clickhouse>
"""
f.write(TEMPLATE.format(port=port))
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_config(port)
cluster.add_instance(
"node1",
main_configs=[path],
with_azurite=True,
)
cluster.add_instance(
"node2",
main_configs=[path],
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def azure_query(
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error:
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
if query_on_retry is not None:
node.query(query_on_retry)
continue
def test_backup_restore_on_merge_tree_same_container(cluster):
node1 = cluster.instances["node1"]
azure_query(
node1,
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'",
)
azure_query(node1, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_backup')"
print("BACKUP DEST", backup_destination)
azure_query(
node1,
f"BACKUP TABLE test_simple_merge_tree TO {backup_destination}",
)
assert node1.contains_in_log("using native copy")
azure_query(
node1,
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored FROM {backup_destination};",
)
assert (
azure_query(node1, f"SELECT * from test_simple_merge_tree_restored") == "1\ta\n"
)
azure_query(node1, f"DROP TABLE test_simple_merge_tree")
azure_query(node1, f"DROP TABLE test_simple_merge_tree_restored")
def test_backup_restore_on_merge_tree_different_container(cluster):
node2 = cluster.instances["node2"]
azure_query(
node2,
f"CREATE TABLE test_simple_merge_tree_different_bucket(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_other_bucket'",
)
azure_query(
node2, f"INSERT INTO test_simple_merge_tree_different_bucket VALUES (1, 'a')"
)
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_different_bucket_backup_different_bucket')"
print("BACKUP DEST", backup_destination)
azure_query(
node2,
f"BACKUP TABLE test_simple_merge_tree_different_bucket TO {backup_destination}",
)
assert not node2.contains_in_log("using native copy")
azure_query(
node2,
f"RESTORE TABLE test_simple_merge_tree_different_bucket AS test_simple_merge_tree_different_bucket_restored FROM {backup_destination};",
)
assert (
azure_query(
node2, f"SELECT * from test_simple_merge_tree_different_bucket_restored"
)
== "1\ta\n"
)
assert not node2.contains_in_log("using native copy")
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket")
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored")