Enable connecting directly to storage accounts and creating new containers

This commit is contained in:
Jakub Kuklis 2021-12-01 14:34:38 +00:00
parent 910db6ca57
commit 2b7725bfc7
2 changed files with 59 additions and 8 deletions

View File

@ -25,4 +25,4 @@ endif()
if (USE_AZURE_BLOB_STORAGE)
message (STATUS "Using Azure Blob Storage - ${USE_AZURE_BLOB_STORAGE}")
endif()
endif()

View File

@ -6,12 +6,15 @@
#if USE_AZURE_BLOB_STORAGE
#include <optional>
#include <re2/re2.h>
#include <azure/identity/managed_identity_credential.hpp>
#include <Disks/BlobStorage/DiskBlobStorage.h>
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskCacheWrapper.h>
#include <Disks/RemoteDisksCommon.h>
#include <azure/identity/managed_identity_credential.hpp>
#include <re2/re2.h>
namespace DB
@ -91,6 +94,57 @@ std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractC
}
struct BlobStorageEndpoint
{
const String storage_account_url;
const String container_name;
const std::optional<bool> container_already_exists;
};
BlobStorageEndpoint processBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
String storage_account_url = config.getString(config_prefix + ".storage_account_url");
String container_name = config.getString(config_prefix + ".container_name", "default_container");
std::optional<bool> container_already_exists {};
if (config.has(config_prefix + ".container_already_exists"))
container_already_exists = {config.getBool(config_prefix + ".container_already_exists")};
return {storage_account_url, container_name, container_already_exists};
}
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getBlobContainerClient(const BlobStorageEndpoint & endpoint)
{
using namespace Azure::Storage::Blobs;
auto credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
auto final_url = endpoint.storage_account_url
+ (endpoint.storage_account_url.back() == '/' ? "" : "/")
+ endpoint.container_name;
if (endpoint.container_already_exists.value_or(false))
return std::make_shared<BlobContainerClient>(final_url, credential);
auto blob_service_client = Azure::Storage::Blobs::BlobServiceClient(endpoint.storage_account_url, credential);
if (!endpoint.container_already_exists.has_value())
{
ListBlobContainersOptions blob_containers_list_options;
blob_containers_list_options.Prefix = endpoint.container_name;
blob_containers_list_options.PageSizeHint = 1;
auto blob_containers = blob_service_client.ListBlobContainers().BlobContainers;
for (const auto & blob_container : blob_containers)
{
if (blob_container.Name == endpoint.container_name)
return std::make_shared<BlobContainerClient>(final_url, credential);
}
}
return std::make_shared<BlobContainerClient>(
blob_service_client.CreateBlobContainer(endpoint.container_name).Value);
}
void registerDiskBlobStorage(DiskFactory & factory)
{
auto creator = [](
@ -100,11 +154,8 @@ void registerDiskBlobStorage(DiskFactory & factory)
ContextPtr context,
const DisksMap & /*map*/)
{
auto endpoint_url = config.getString(config_prefix + ".endpoint");
// validate_endpoint_url(endpoint_url);
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
auto blob_container_client = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(endpoint_url, managed_identity_credential);
auto endpoint_details = processBlobStorageEndpoint(config, config_prefix);
auto blob_container_client = getBlobContainerClient(endpoint_details);
/// where the metadata files are stored locally
auto metadata_path = config.getString(config_prefix + ".metadata_path", context->getPath() + "disks/" + name + "/");