Fix issues with endpoint and prefix

This commit is contained in:
Smita Kulkarni 2024-02-21 20:04:12 +01:00
parent 973e17851a
commit 29b7bf64d4
2 changed files with 101 additions and 17 deletions

View File

@ -23,8 +23,32 @@ namespace ErrorCodes
struct AzureBlobStorageEndpoint
{
const String storage_account_url;
const String account_name;
const String container_name;
const String prefix;
const std::optional<bool> container_already_exists;
String getEndpoint()
{
String url = storage_account_url;
if (!account_name.empty())
url += "/" + account_name;
url += "/" + container_name + "/" + prefix;
return url;
}
String getEndpointWithoutContainer()
{
String url = storage_account_url;
if (!account_name.empty())
url += "/" + account_name;
return url;
}
};
@ -58,28 +82,64 @@ void validateContainerName(const String & container_name)
AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
std::string storage_url;
if (config.has(config_prefix + ".storage_account_url"))
String storage_url;
String account_name;
String container_name;
String prefix;
if (config.has(config_prefix + ".endpoint"))
{
String endpoint = config.getString(config_prefix + ".endpoint");
/// For authenitication methods other than managed identity (Eg: SAS, Workload Identity),
/// account name is not present in the endpoint
/// 'endpoint_contains_account_name' bool is used to understand how to split the endpoint (default : true)
bool endpoint_contains_account_name = config.getBool(config_prefix + ".endpoint_contains_account_name", true);
size_t pos = endpoint.find("//");
assert (pos != std::string::npos);
if (endpoint_contains_account_name)
{
size_t pos_begin = endpoint.find('/', pos+2);
size_t pos_end = endpoint.find('/',pos_begin+1);
account_name = endpoint.substr(pos_begin+1,(pos_end-pos_begin)-1);
size_t cont_pos_end = endpoint.find('/', pos_end+1);
container_name = endpoint.substr(pos_end+1,(cont_pos_end-pos_end)-1);
prefix = endpoint.substr(cont_pos_end+1);
storage_url = endpoint.substr(0,pos_begin);
}
else
{
size_t pos_begin = endpoint.find('/', pos+2);
size_t pos_end = endpoint.find('/',pos_begin+1);
container_name = endpoint.substr(pos_begin+1,(pos_end-pos_begin)-1);
container_name = endpoint.substr(pos_end+1);
storage_url = endpoint.substr(0,pos_begin);
}
}
else if (config.has(config_prefix + ".connection_string"))
{
storage_url = config.getString(config_prefix + ".connection_string");
container_name = config.getString(config_prefix + ".container_name");
}
else if (config.has(config_prefix + ".storage_account_url"))
{
storage_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_url);
container_name = config.getString(config_prefix + ".container_name");
}
else
{
if (config.has(config_prefix + ".connection_string"))
storage_url = config.getString(config_prefix + ".connection_string");
else if (config.has(config_prefix + ".endpoint"))
storage_url = config.getString(config_prefix + ".endpoint");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected either `connection_string` or `endpoint` in config");
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected either `storage_account_url` or `connection_string` or `endpoint` in config");
String container_name = config.getString(config_prefix + ".container_name", "default-container");
validateContainerName(container_name);
if (!container_name.empty())
validateContainerName(container_name);
std::optional<bool> container_already_exists {};
if (config.has(config_prefix + ".container_already_exists"))
container_already_exists = {config.getBool(config_prefix + ".container_already_exists")};
return {storage_url, container_name, container_already_exists};
return {storage_url, account_name, container_name, prefix, container_already_exists};
}
@ -133,15 +193,13 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
{
auto endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
auto container_name = endpoint.container_name;
auto final_url = container_name.empty()
? endpoint.storage_account_url
: (std::filesystem::path(endpoint.storage_account_url) / container_name).string();
auto final_url = endpoint.getEndpoint();
if (endpoint.container_already_exists.value_or(false))
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
auto blob_service_client = getAzureBlobStorageClientWithAuth<BlobServiceClient>(
endpoint.storage_account_url, container_name, config, config_prefix);
endpoint.getEndpointWithoutContainer(), container_name, config, config_prefix);
try
{

View File

@ -632,3 +632,29 @@ def test_endpoint(cluster):
)
assert 10 == int(node.query("SELECT count() FROM test"))
def test_endpoint_new_container(cluster):
node = cluster.instances[NODE_NAME]
account_name = "devstoreaccount1"
container_name = "cont3"
data_prefix = "data_prefix"
port = cluster.azurite_port
node.query(
f"""
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test (a Int32)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(
type = azure_blob_storage,
endpoint = 'http://azurite1:{port}/{account_name}/{container_name}/{data_prefix}',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
skip_access_check = 0);
INSERT INTO test SELECT number FROM numbers(10);
"""
)
assert 10 == int(node.query("SELECT count() FROM test"))