Merge pull request #60251 from ClickHouse/Fix_endpoint_for_azureblobstorage

Azure Blob Storage : Fix issues endpoint and prefix
This commit is contained in:
SmitaRKulkarni 2024-03-01 10:23:49 +01:00 committed by GitHub
commit 89fff6852a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 244 additions and 35 deletions

View File

@ -19,6 +19,8 @@ CREATE TABLE azure_blob_storage_table (name String, value UInt32)
### Engine parameters
- `endpoint` — AzureBlobStorage endpoint URL with container & prefix. Optionally can contain account_name if the authentication method used needs it. (http://azurite1:{port}/[account_name]{container_name}/{data_prefix}) or these parameters can be provided separately using storage_account_url, account_name & container. For specifying prefix, endpoint should be used.
- `endpoint_contains_account_name` - This flag is used to specify if endpoint contains account_name as it is only needed for certain authentication methods. (Default : true)
- `connection_string|storage_account_url` — connection_string includes account name & key ([Create connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#configure-a-connection-string-for-an-azure-storage-account)) or you could also provide the storage account url here and account name & account key as separate parameters (see parameters account_name & account_key)
- `container_name` - Container name
- `blobpath` - file path. Supports following wildcards in readonly mode: `*`, `**`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings.

View File

@ -1242,7 +1242,9 @@ Configuration markup:
```
Connection parameters:
* `storage_account_url` - **Required**, Azure Blob Storage account URL, like `http://account.blob.core.windows.net` or `http://azurite1:10000/devstoreaccount1`.
* `endpoint` — AzureBlobStorage endpoint URL with container & prefix. Optionally can contain account_name if the authentication method used needs it. (`http://account.blob.core.windows.net:{port}/[account_name]{container_name}/{data_prefix}`) or these parameters can be provided separately using storage_account_url, account_name & container. For specifying prefix, endpoint should be used.
* `endpoint_contains_account_name` - This flag is used to specify if endpoint contains account_name as it is only needed for certain authentication methods. (Default : true)
* `storage_account_url` - Required if endpoint is not specified, Azure Blob Storage account URL, like `http://account.blob.core.windows.net` or `http://azurite1:10000/devstoreaccount1`.
* `container_name` - Target container name, defaults to `default-container`.
* `container_already_exists` - If set to `false`, a new container `container_name` is created in the storage account, if set to `true`, disk connects to the container directly, and if left unset, disk connects to the account, checks if the container `container_name` exists, and creates it if it doesn't exist yet.

View File

@ -20,13 +20,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct AzureBlobStorageEndpoint
{
const String storage_account_url;
const String container_name;
const std::optional<bool> container_already_exists;
};
void validateStorageAccountUrl(const String & storage_account_url)
{
@ -58,28 +51,89 @@ void validateContainerName(const String & container_name)
AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
std::string storage_url;
if (config.has(config_prefix + ".storage_account_url"))
String storage_url;
String account_name;
String container_name;
String prefix;
if (config.has(config_prefix + ".endpoint"))
{
String endpoint = config.getString(config_prefix + ".endpoint");
/// For some authentication methods account name is not present in the endpoint
/// 'endpoint_contains_account_name' bool is used to understand how to split the endpoint (default : true)
bool endpoint_contains_account_name = config.getBool(config_prefix + ".endpoint_contains_account_name", true);
size_t pos = endpoint.find("//");
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected '//' in endpoint");
if (endpoint_contains_account_name)
{
size_t acc_pos_begin = endpoint.find('/', pos+2);
if (acc_pos_begin == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected account_name in endpoint");
storage_url = endpoint.substr(0,acc_pos_begin);
size_t acc_pos_end = endpoint.find('/',acc_pos_begin+1);
if (acc_pos_end == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected container_name in endpoint");
account_name = endpoint.substr(acc_pos_begin+1,(acc_pos_end-acc_pos_begin)-1);
size_t cont_pos_end = endpoint.find('/', acc_pos_end+1);
if (cont_pos_end != std::string::npos)
{
container_name = endpoint.substr(acc_pos_end+1,(cont_pos_end-acc_pos_end)-1);
prefix = endpoint.substr(cont_pos_end+1);
}
else
{
container_name = endpoint.substr(acc_pos_end+1);
}
}
else
{
size_t cont_pos_begin = endpoint.find('/', pos+2);
if (cont_pos_begin == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected container_name in endpoint");
storage_url = endpoint.substr(0,cont_pos_begin);
size_t cont_pos_end = endpoint.find('/',cont_pos_begin+1);
if (cont_pos_end != std::string::npos)
{
container_name = endpoint.substr(cont_pos_begin+1,(cont_pos_end-cont_pos_begin)-1);
prefix = endpoint.substr(cont_pos_end+1);
}
else
{
container_name = endpoint.substr(cont_pos_begin+1);
}
}
}
else if (config.has(config_prefix + ".connection_string"))
{
storage_url = config.getString(config_prefix + ".connection_string");
container_name = config.getString(config_prefix + ".container_name");
}
else if (config.has(config_prefix + ".storage_account_url"))
{
storage_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_url);
container_name = config.getString(config_prefix + ".container_name");
}
else
{
if (config.has(config_prefix + ".connection_string"))
storage_url = config.getString(config_prefix + ".connection_string");
else if (config.has(config_prefix + ".endpoint"))
storage_url = config.getString(config_prefix + ".endpoint");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected either `connection_string` or `endpoint` in config");
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected either `storage_account_url` or `connection_string` or `endpoint` in config");
String container_name = config.getString(config_prefix + ".container_name", "default-container");
validateContainerName(container_name);
if (!container_name.empty())
validateContainerName(container_name);
std::optional<bool> container_already_exists {};
if (config.has(config_prefix + ".container_already_exists"))
container_already_exists = {config.getBool(config_prefix + ".container_already_exists")};
return {storage_url, container_name, container_already_exists};
return {storage_url, account_name, container_name, prefix, container_already_exists};
}
@ -133,15 +187,13 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
{
auto endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
auto container_name = endpoint.container_name;
auto final_url = container_name.empty()
? endpoint.storage_account_url
: (std::filesystem::path(endpoint.storage_account_url) / container_name).string();
auto final_url = endpoint.getEndpoint();
if (endpoint.container_already_exists.value_or(false))
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
auto blob_service_client = getAzureBlobStorageClientWithAuth<BlobServiceClient>(
endpoint.storage_account_url, container_name, config, config_prefix);
endpoint.getEndpointWithoutContainer(), container_name, config, config_prefix);
try
{

View File

@ -10,9 +10,46 @@
namespace DB
{
struct AzureBlobStorageEndpoint
{
const String storage_account_url;
const String account_name;
const String container_name;
const String prefix;
const std::optional<bool> container_already_exists;
String getEndpoint()
{
String url = storage_account_url;
if (!account_name.empty())
url += "/" + account_name;
if (!container_name.empty())
url += "/" + container_name;
if (!prefix.empty())
url += "/" + prefix;
return url;
}
String getEndpointWithoutContainer()
{
String url = storage_account_url;
if (!account_name.empty())
url += "/" + account_name;
return url;
}
};
std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> getAzureBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/);
}

View File

@ -93,11 +93,11 @@ AzureObjectStorage::AzureObjectStorage(
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_,
const String & container_)
const String & object_namespace_)
: name(name_)
, client(std::move(client_))
, settings(std::move(settings_))
, container(container_)
, object_namespace(object_namespace_)
, log(getLogger("AzureObjectStorage"))
{
}
@ -379,7 +379,7 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
container
object_namespace
);
}

View File

@ -67,7 +67,7 @@ public:
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_,
const String & container_);
const String & object_namespace_);
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
@ -130,7 +130,7 @@ public:
const std::string & config_prefix,
ContextPtr context) override;
String getObjectsNamespace() const override { return container ; }
String getObjectsNamespace() const override { return object_namespace ; }
std::unique_ptr<IObjectStorage> cloneObjectStorage(
const std::string & new_namespace,
@ -154,7 +154,7 @@ private:
/// client used to access the files in the Blob Storage cloud
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
MultiVersion<AzureObjectStorageSettings> settings;
const String container;
const String object_namespace; /// container + prefix
LoggerPtr log;
};

View File

@ -213,12 +213,12 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
String container_name = config.getString(config_prefix + ".container_name", "default-container");
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
return std::make_unique<AzureObjectStorage>(
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
container_name);
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix);
});
}

View File

@ -613,7 +613,8 @@ def test_endpoint(cluster):
container_client = cluster.blob_service_client.get_container_client(container_name)
container_client.create_container()
node.query(
azure_query(
node,
f"""
DROP TABLE IF EXISTS test SYNC;
@ -622,13 +623,128 @@ def test_endpoint(cluster):
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}/{account_name}/{container_name}/{data_prefix}',
endpoint_contains_account_name = 'true',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
container_already_exists = 1,
skip_access_check = 0);
INSERT INTO test SELECT number FROM numbers(10);
"""
""",
)
assert 10 == int(node.query("SELECT count() FROM test"))
def test_endpoint_new_container(cluster):
node = cluster.instances[NODE_NAME]
account_name = "devstoreaccount1"
container_name = "cont3"
data_prefix = "data_prefix"
port = cluster.azurite_port
azure_query(
node,
f"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}/{account_name}/{container_name}/{data_prefix}',
endpoint_contains_account_name = 'true',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0);
INSERT INTO test SELECT number FROM numbers(10);
""",
)
assert 10 == int(node.query("SELECT count() FROM test"))
def test_endpoint_without_prefix(cluster):
node = cluster.instances[NODE_NAME]
account_name = "devstoreaccount1"
container_name = "cont4"
port = cluster.azurite_port
azure_query(
node,
f"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}/{account_name}/{container_name}',
endpoint_contains_account_name = 'true',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0);
INSERT INTO test SELECT number FROM numbers(10);
""",
)
assert 10 == int(node.query("SELECT count() FROM test"))
def test_endpoint_error_check(cluster):
node = cluster.instances[NODE_NAME]
account_name = "devstoreaccount1"
port = cluster.azurite_port
query = f"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}/{account_name}',
endpoint_contains_account_name = 'true',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0);
"""
expected_err_msg = "Expected container_name in endpoint"
assert expected_err_msg in azure_query(node, query, expect_error="true")
query = f"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}',
endpoint_contains_account_name = 'true',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0);
"""
expected_err_msg = "Expected account_name in endpoint"
assert expected_err_msg in azure_query(node, query, expect_error="true")
query = f"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}',
endpoint_contains_account_name = 'false',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0);
"""
expected_err_msg = "Expected container_name in endpoint"
assert expected_err_msg in azure_query(node, query, expect_error="true")