mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge pull request #32886 from kssenii/azure-rename
Rename files and code from BlobStorage to AzureBlobStorage
This commit is contained in:
commit
70937088e9
@ -107,7 +107,7 @@ if (USE_AWS_S3)
|
||||
endif()
|
||||
|
||||
if (USE_AZURE_BLOB_STORAGE)
|
||||
add_headers_and_sources(dbms Disks/BlobStorage)
|
||||
add_headers_and_sources(dbms Disks/AzureBlobStorage)
|
||||
endif()
|
||||
|
||||
if (USE_HDFS)
|
||||
|
@ -470,7 +470,7 @@
|
||||
M(497, ACCESS_DENIED) \
|
||||
M(498, LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED) \
|
||||
M(499, S3_ERROR) \
|
||||
M(500, BLOB_STORAGE_ERROR) \
|
||||
M(500, AZURE_BLOB_STORAGE_ERROR) \
|
||||
M(501, CANNOT_CREATE_DATABASE) \
|
||||
M(502, CANNOT_SIGQUEUE) \
|
||||
M(503, AGGREGATE_FUNCTION_THROW) \
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include <Disks/BlobStorage/BlobStorageAuth.h>
|
||||
#include <Disks/AzureBlobStorage/AzureBlobStorageAuth.h>
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
@ -17,7 +17,7 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
struct BlobStorageEndpoint
|
||||
struct AzureBlobStorageEndpoint
|
||||
{
|
||||
const String storage_account_url;
|
||||
const String container_name;
|
||||
@ -41,18 +41,18 @@ void validateContainerName(const String & container_name)
|
||||
auto len = container_name.length();
|
||||
if (len < 3 || len > 64)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Blob Storage container name is not valid, should have length between 3 and 64, but has length: {}", len);
|
||||
"AzureBlob Storage container name is not valid, should have length between 3 and 64, but has length: {}", len);
|
||||
|
||||
const auto * container_name_pattern_str = R"([a-z][a-z0-9-]+)";
|
||||
static const RE2 container_name_pattern(container_name_pattern_str);
|
||||
|
||||
if (!re2::RE2::FullMatch(container_name, container_name_pattern))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Blob Storage container name is not valid, should follow the format: {}, got: {}", container_name_pattern_str, container_name);
|
||||
"AzureBlob Storage container name is not valid, should follow the format: {}, got: {}", container_name_pattern_str, container_name);
|
||||
}
|
||||
|
||||
|
||||
BlobStorageEndpoint processBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
String storage_account_url = config.getString(config_prefix + ".storage_account_url");
|
||||
validateStorageAccountUrl(storage_account_url);
|
||||
@ -86,7 +86,7 @@ std::shared_ptr<BlobContainerClient> getClientWithConnectionString(
|
||||
|
||||
|
||||
template <class T>
|
||||
std::shared_ptr<T> getBlobStorageClientWithAuth(
|
||||
std::shared_ptr<T> getAzureBlobStorageClientWithAuth(
|
||||
const String & url, const String & container_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
if (config.has(config_prefix + ".connection_string"))
|
||||
@ -109,19 +109,19 @@ std::shared_ptr<T> getBlobStorageClientWithAuth(
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<BlobContainerClient> getBlobContainerClient(
|
||||
std::shared_ptr<BlobContainerClient> getAzureBlobContainerClient(
|
||||
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
auto endpoint = processBlobStorageEndpoint(config, config_prefix);
|
||||
auto endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
|
||||
auto container_name = endpoint.container_name;
|
||||
auto final_url = endpoint.storage_account_url
|
||||
+ (endpoint.storage_account_url.back() == '/' ? "" : "/")
|
||||
+ container_name;
|
||||
|
||||
if (endpoint.container_already_exists.value_or(false))
|
||||
return getBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
|
||||
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
|
||||
|
||||
auto blob_service_client = getBlobStorageClientWithAuth<BlobServiceClient>(endpoint.storage_account_url, container_name, config, config_prefix);
|
||||
auto blob_service_client = getAzureBlobStorageClientWithAuth<BlobServiceClient>(endpoint.storage_account_url, container_name, config, config_prefix);
|
||||
|
||||
if (!endpoint.container_already_exists.has_value())
|
||||
{
|
||||
@ -132,7 +132,7 @@ std::shared_ptr<BlobContainerClient> getBlobContainerClient(
|
||||
for (const auto & blob_container : blob_containers)
|
||||
{
|
||||
if (blob_container.Name == endpoint.container_name)
|
||||
return getBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
|
||||
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getBlobContainerClient(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getAzureBlobContainerClient(
|
||||
const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
|
||||
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
#include <Disks/BlobStorage/DiskBlobStorage.h>
|
||||
#include <Disks/AzureBlobStorage/DiskAzureBlobStorage.h>
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
@ -15,11 +15,11 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BLOB_STORAGE_ERROR;
|
||||
extern const int AZURE_BLOB_STORAGE_ERROR;
|
||||
}
|
||||
|
||||
|
||||
DiskBlobStorageSettings::DiskBlobStorageSettings(
|
||||
DiskAzureBlobStorageSettings::DiskAzureBlobStorageSettings(
|
||||
UInt64 max_single_part_upload_size_,
|
||||
UInt64 min_bytes_for_seek_,
|
||||
int max_single_read_retries_,
|
||||
@ -32,11 +32,11 @@ DiskBlobStorageSettings::DiskBlobStorageSettings(
|
||||
thread_pool_size(thread_pool_size_) {}
|
||||
|
||||
|
||||
class BlobStoragePathKeeper : public RemoteFSPathKeeper
|
||||
class AzureBlobStoragePathKeeper : public RemoteFSPathKeeper
|
||||
{
|
||||
public:
|
||||
/// RemoteFSPathKeeper constructed with a placeholder argument for chunk_limit, it is unused in this class
|
||||
BlobStoragePathKeeper() : RemoteFSPathKeeper(1000) {}
|
||||
AzureBlobStoragePathKeeper() : RemoteFSPathKeeper(1000) {}
|
||||
|
||||
void addPath(const String & path) override
|
||||
{
|
||||
@ -47,19 +47,19 @@ public:
|
||||
};
|
||||
|
||||
|
||||
DiskBlobStorage::DiskBlobStorage(
|
||||
DiskAzureBlobStorage::DiskAzureBlobStorage(
|
||||
const String & name_,
|
||||
DiskPtr metadata_disk_,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
SettingsPtr settings_,
|
||||
GetDiskSettings settings_getter_) :
|
||||
IDiskRemote(name_, "", metadata_disk_, "DiskBlobStorage", settings_->thread_pool_size),
|
||||
IDiskRemote(name_, "", metadata_disk_, "DiskAzureBlobStorage", settings_->thread_pool_size),
|
||||
blob_container_client(blob_container_client_),
|
||||
current_settings(std::move(settings_)),
|
||||
settings_getter(settings_getter_) {}
|
||||
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
|
||||
const String & path,
|
||||
const ReadSettings & read_settings,
|
||||
std::optional<size_t> /*estimated_size*/) const
|
||||
@ -71,7 +71,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
|
||||
|
||||
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||
|
||||
auto reader_impl = std::make_unique<ReadBufferFromBlobStorageGather>(
|
||||
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
|
||||
path, blob_container_client, metadata, settings->max_single_read_retries,
|
||||
settings->max_single_download_retries, read_settings, threadpool_read);
|
||||
|
||||
@ -88,7 +88,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
|
||||
const String & path,
|
||||
size_t buf_size,
|
||||
WriteMode mode)
|
||||
@ -96,38 +96,38 @@ std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
|
||||
auto metadata = readOrCreateMetaForWriting(path, mode);
|
||||
auto blob_path = path + "_" + getRandomASCIIString(8); /// NOTE: path contains the tmp_* prefix in the blob name
|
||||
|
||||
LOG_TRACE(log, "{} to file by path: {}. Blob Storage path: {}",
|
||||
LOG_TRACE(log, "{} to file by path: {}. AzureBlob Storage path: {}",
|
||||
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), blob_path);
|
||||
|
||||
auto buffer = std::make_unique<WriteBufferFromBlobStorage>(
|
||||
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
|
||||
blob_container_client,
|
||||
blob_path,
|
||||
current_settings.get()->max_single_part_upload_size,
|
||||
buf_size);
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromAzureBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
|
||||
}
|
||||
|
||||
|
||||
DiskType DiskBlobStorage::getType() const
|
||||
DiskType DiskAzureBlobStorage::getType() const
|
||||
{
|
||||
return DiskType::BlobStorage;
|
||||
return DiskType::AzureBlobStorage;
|
||||
}
|
||||
|
||||
|
||||
bool DiskBlobStorage::isRemote() const
|
||||
bool DiskAzureBlobStorage::isRemote() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool DiskBlobStorage::supportZeroCopyReplication() const
|
||||
bool DiskAzureBlobStorage::supportZeroCopyReplication() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool DiskBlobStorage::checkUniqueId(const String & id) const
|
||||
bool DiskAzureBlobStorage::checkUniqueId(const String & id) const
|
||||
{
|
||||
Azure::Storage::Blobs::ListBlobsOptions blobs_list_options;
|
||||
blobs_list_options.Prefix = id;
|
||||
@ -146,9 +146,9 @@ bool DiskBlobStorage::checkUniqueId(const String & id) const
|
||||
}
|
||||
|
||||
|
||||
void DiskBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
void DiskAzureBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
{
|
||||
auto * paths_keeper = dynamic_cast<BlobStoragePathKeeper *>(fs_paths_keeper.get());
|
||||
auto * paths_keeper = dynamic_cast<AzureBlobStoragePathKeeper *>(fs_paths_keeper.get());
|
||||
|
||||
if (paths_keeper)
|
||||
{
|
||||
@ -158,7 +158,7 @@ void DiskBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
{
|
||||
auto delete_info = blob_container_client->DeleteBlob(path);
|
||||
if (!delete_info.Value.Deleted)
|
||||
throw Exception(ErrorCodes::BLOB_STORAGE_ERROR, "Failed to delete file in Blob Storage: {}", path);
|
||||
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
|
||||
}
|
||||
catch (const Azure::Storage::StorageException& e)
|
||||
{
|
||||
@ -170,13 +170,13 @@ void DiskBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
|
||||
}
|
||||
|
||||
|
||||
RemoteFSPathKeeperPtr DiskBlobStorage::createFSPathKeeper() const
|
||||
RemoteFSPathKeeperPtr DiskAzureBlobStorage::createFSPathKeeper() const
|
||||
{
|
||||
return std::make_shared<BlobStoragePathKeeper>();
|
||||
return std::make_shared<AzureBlobStoragePathKeeper>();
|
||||
}
|
||||
|
||||
|
||||
void DiskBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
|
||||
void DiskAzureBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
|
||||
{
|
||||
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);
|
||||
|
@ -1,14 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
#include <Common/config.h>
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <Disks/IDiskRemote.h>
|
||||
#include <IO/ReadBufferFromBlobStorage.h>
|
||||
#include <IO/WriteBufferFromBlobStorage.h>
|
||||
#include <IO/ReadBufferFromAzureBlobStorage.h>
|
||||
#include <IO/WriteBufferFromAzureBlobStorage.h>
|
||||
#include <IO/SeekAvoidingReadBuffer.h>
|
||||
|
||||
#include <azure/identity/managed_identity_credential.hpp>
|
||||
@ -18,9 +16,9 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct DiskBlobStorageSettings final
|
||||
struct DiskAzureBlobStorageSettings final
|
||||
{
|
||||
DiskBlobStorageSettings(
|
||||
DiskAzureBlobStorageSettings(
|
||||
UInt64 max_single_part_upload_size_,
|
||||
UInt64 min_bytes_for_seek_,
|
||||
int max_single_read_retries,
|
||||
@ -35,14 +33,14 @@ struct DiskBlobStorageSettings final
|
||||
};
|
||||
|
||||
|
||||
class DiskBlobStorage final : public IDiskRemote
|
||||
class DiskAzureBlobStorage final : public IDiskRemote
|
||||
{
|
||||
public:
|
||||
|
||||
using SettingsPtr = std::unique_ptr<DiskBlobStorageSettings>;
|
||||
using SettingsPtr = std::unique_ptr<DiskAzureBlobStorageSettings>;
|
||||
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextPtr)>;
|
||||
|
||||
DiskBlobStorage(
|
||||
DiskAzureBlobStorage(
|
||||
const String & name_,
|
||||
DiskPtr metadata_disk_,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
@ -78,7 +76,7 @@ private:
|
||||
/// client used to access the files in the Blob Storage cloud
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
|
||||
|
||||
MultiVersion<DiskBlobStorageSettings> current_settings;
|
||||
MultiVersion<DiskAzureBlobStorageSettings> current_settings;
|
||||
/// Gets disk settings from context.
|
||||
GetDiskSettings settings_getter;
|
||||
};
|
@ -9,8 +9,8 @@
|
||||
#include <Disks/DiskRestartProxy.h>
|
||||
#include <Disks/DiskCacheWrapper.h>
|
||||
#include <Disks/RemoteDisksCommon.h>
|
||||
#include <Disks/BlobStorage/DiskBlobStorage.h>
|
||||
#include <Disks/BlobStorage/BlobStorageAuth.h>
|
||||
#include <Disks/AzureBlobStorage/DiskAzureBlobStorage.h>
|
||||
#include <Disks/AzureBlobStorage/AzureBlobStorageAuth.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -62,9 +62,9 @@ void checkRemoveAccess(IDisk & disk)
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
|
||||
std::unique_ptr<DiskAzureBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
|
||||
{
|
||||
return std::make_unique<DiskBlobStorageSettings>(
|
||||
return std::make_unique<DiskAzureBlobStorageSettings>(
|
||||
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
|
||||
config.getInt(config_prefix + ".max_single_read_retries", 3),
|
||||
@ -74,7 +74,7 @@ std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractC
|
||||
}
|
||||
|
||||
|
||||
void registerDiskBlobStorage(DiskFactory & factory)
|
||||
void registerDiskAzureBlobStorage(DiskFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
const String & name,
|
||||
@ -85,33 +85,33 @@ void registerDiskBlobStorage(DiskFactory & factory)
|
||||
{
|
||||
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
|
||||
|
||||
std::shared_ptr<IDisk> blob_storage_disk = std::make_shared<DiskBlobStorage>(
|
||||
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskAzureBlobStorage>(
|
||||
name,
|
||||
metadata_disk,
|
||||
getBlobContainerClient(config, config_prefix),
|
||||
getAzureBlobContainerClient(config, config_prefix),
|
||||
getSettings(config, config_prefix, context),
|
||||
getSettings
|
||||
);
|
||||
|
||||
if (!config.getBool(config_prefix + ".skip_access_check", false))
|
||||
{
|
||||
checkWriteAccess(*blob_storage_disk);
|
||||
checkReadAccess(*blob_storage_disk);
|
||||
checkReadWithOffset(*blob_storage_disk);
|
||||
checkRemoveAccess(*blob_storage_disk);
|
||||
checkWriteAccess(*azure_blob_storage_disk);
|
||||
checkReadAccess(*azure_blob_storage_disk);
|
||||
checkReadWithOffset(*azure_blob_storage_disk);
|
||||
checkRemoveAccess(*azure_blob_storage_disk);
|
||||
}
|
||||
|
||||
blob_storage_disk->startup();
|
||||
azure_blob_storage_disk->startup();
|
||||
|
||||
if (config.getBool(config_prefix + ".cache_enabled", true))
|
||||
{
|
||||
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
|
||||
blob_storage_disk = wrapWithCache(blob_storage_disk, "blob-storage-cache", cache_path, metadata_path);
|
||||
azure_blob_storage_disk = wrapWithCache(azure_blob_storage_disk, "azure-blob-storage-cache", cache_path, metadata_path);
|
||||
}
|
||||
|
||||
return std::make_shared<DiskRestartProxy>(blob_storage_disk);
|
||||
return std::make_shared<DiskRestartProxy>(azure_blob_storage_disk);
|
||||
};
|
||||
factory.registerDiskType("blob_storage", creator);
|
||||
factory.registerDiskType("azure_blob_storage", creator);
|
||||
}
|
||||
|
||||
}
|
||||
@ -121,7 +121,7 @@ void registerDiskBlobStorage(DiskFactory & factory)
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void registerDiskBlobStorage(DiskFactory &) {}
|
||||
void registerDiskAzureBlobStorage(DiskFactory &) {}
|
||||
|
||||
}
|
||||
|
@ -10,10 +10,10 @@ enum class DiskType
|
||||
Local,
|
||||
RAM,
|
||||
S3,
|
||||
BlobStorage,
|
||||
HDFS,
|
||||
Encrypted,
|
||||
WebServer,
|
||||
AzureBlobStorage,
|
||||
};
|
||||
|
||||
inline String toString(DiskType disk_type)
|
||||
@ -26,14 +26,14 @@ inline String toString(DiskType disk_type)
|
||||
return "memory";
|
||||
case DiskType::S3:
|
||||
return "s3";
|
||||
case DiskType::BlobStorage:
|
||||
return "blob_storage";
|
||||
case DiskType::HDFS:
|
||||
return "hdfs";
|
||||
case DiskType::Encrypted:
|
||||
return "encrypted";
|
||||
case DiskType::WebServer:
|
||||
return "web";
|
||||
case DiskType::AzureBlobStorage:
|
||||
return "azure_blob_storage";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
@ -9,7 +9,7 @@
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
#include <IO/ReadBufferFromBlobStorage.h>
|
||||
#include <IO/ReadBufferFromAzureBlobStorage.h>
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
@ -35,9 +35,9 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
|
||||
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
SeekableReadBufferPtr ReadBufferFromBlobStorageGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
|
||||
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
|
||||
{
|
||||
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, path, max_single_read_retries,
|
||||
return std::make_unique<ReadBufferFromAzureBlobStorage>(blob_container_client, path, max_single_read_retries,
|
||||
max_single_download_retries, settings.remote_fs_buffer_size, threadpool_read, read_until_position_);
|
||||
}
|
||||
#endif
|
||||
|
@ -102,11 +102,11 @@ private:
|
||||
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
/// Reads data from Blob Storage using paths stored in metadata.
|
||||
class ReadBufferFromBlobStorageGather final : public ReadBufferFromRemoteFSGather
|
||||
/// Reads data from AzureBlob Storage using paths stored in metadata.
|
||||
class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFSGather
|
||||
{
|
||||
public:
|
||||
ReadBufferFromBlobStorageGather(
|
||||
ReadBufferFromAzureBlobStorageGather(
|
||||
const String & path_,
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
IDiskRemote::Metadata metadata_,
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "WriteIndirectBufferFromRemoteFS.h"
|
||||
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromBlobStorage.h>
|
||||
#include <IO/WriteBufferFromAzureBlobStorage.h>
|
||||
#include <Storages/HDFS/WriteBufferFromHDFS.h>
|
||||
#include <IO/WriteBufferFromHTTP.h>
|
||||
|
||||
@ -60,7 +60,7 @@ class WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>;
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
template
|
||||
class WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>;
|
||||
class WriteIndirectBufferFromRemoteFS<WriteBufferFromAzureBlobStorage>;
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
|
@ -15,7 +15,7 @@ void registerDiskS3(DiskFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
void registerDiskBlobStorage(DiskFactory & factory);
|
||||
void registerDiskAzureBlobStorage(DiskFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_SSL
|
||||
@ -41,7 +41,7 @@ void registerDisks()
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
registerDiskBlobStorage(factory);
|
||||
registerDiskAzureBlobStorage(factory);
|
||||
#endif
|
||||
|
||||
#if USE_SSL
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <IO/ReadBufferFromBlobStorage.h>
|
||||
#include <IO/ReadBufferFromAzureBlobStorage.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <base/sleep.h>
|
||||
@ -22,7 +22,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
|
||||
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
const String & path_,
|
||||
size_t max_single_read_retries_,
|
||||
@ -48,7 +48,7 @@ ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
|
||||
}
|
||||
|
||||
|
||||
bool ReadBufferFromBlobStorage::nextImpl()
|
||||
bool ReadBufferFromAzureBlobStorage::nextImpl()
|
||||
{
|
||||
if (read_until_position)
|
||||
{
|
||||
@ -102,7 +102,7 @@ bool ReadBufferFromBlobStorage::nextImpl()
|
||||
}
|
||||
|
||||
|
||||
off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
|
||||
off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence)
|
||||
{
|
||||
if (initialized)
|
||||
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
@ -119,13 +119,13 @@ off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
|
||||
}
|
||||
|
||||
|
||||
off_t ReadBufferFromBlobStorage::getPosition()
|
||||
off_t ReadBufferFromAzureBlobStorage::getPosition()
|
||||
{
|
||||
return offset - available();
|
||||
}
|
||||
|
||||
|
||||
void ReadBufferFromBlobStorage::initialize()
|
||||
void ReadBufferFromAzureBlobStorage::initialize()
|
||||
{
|
||||
if (initialized)
|
||||
return;
|
@ -14,11 +14,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromBlobStorage : public SeekableReadBuffer
|
||||
class ReadBufferFromAzureBlobStorage : public SeekableReadBuffer
|
||||
{
|
||||
public:
|
||||
|
||||
explicit ReadBufferFromBlobStorage(
|
||||
explicit ReadBufferFromAzureBlobStorage(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
const String & path_,
|
||||
size_t max_single_read_retries_,
|
||||
@ -55,7 +55,7 @@ private:
|
||||
char * data_ptr;
|
||||
size_t data_capacity;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("ReadBufferFromBlobStorage");
|
||||
Poco::Logger * log = &Poco::Logger::get("ReadBufferFromAzureBlobStorage");
|
||||
};
|
||||
|
||||
}
|
@ -4,7 +4,7 @@
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <IO/WriteBufferFromBlobStorage.h>
|
||||
#include <IO/WriteBufferFromAzureBlobStorage.h>
|
||||
#include <Disks/RemoteDisksCommon.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
WriteBufferFromBlobStorage::WriteBufferFromBlobStorage(
|
||||
WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
const String & blob_path_,
|
||||
size_t max_single_part_upload_size_,
|
||||
@ -23,13 +23,13 @@ WriteBufferFromBlobStorage::WriteBufferFromBlobStorage(
|
||||
blob_path(blob_path_) {}
|
||||
|
||||
|
||||
WriteBufferFromBlobStorage::~WriteBufferFromBlobStorage()
|
||||
WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromBlobStorage::nextImpl()
|
||||
void WriteBufferFromAzureBlobStorage::nextImpl()
|
||||
{
|
||||
if (!offset())
|
||||
return;
|
||||
@ -54,7 +54,7 @@ void WriteBufferFromBlobStorage::nextImpl()
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromBlobStorage::finalizeImpl()
|
||||
void WriteBufferFromAzureBlobStorage::finalizeImpl()
|
||||
{
|
||||
next();
|
||||
|
@ -17,17 +17,17 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class WriteBufferFromBlobStorage : public BufferWithOwnMemory<WriteBuffer>
|
||||
class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer>
|
||||
{
|
||||
public:
|
||||
|
||||
explicit WriteBufferFromBlobStorage(
|
||||
explicit WriteBufferFromAzureBlobStorage(
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
||||
const String & blob_path_,
|
||||
size_t max_single_part_upload_size_,
|
||||
size_t buf_size_);
|
||||
|
||||
~WriteBufferFromBlobStorage() override;
|
||||
~WriteBufferFromAzureBlobStorage() override;
|
||||
|
||||
void nextImpl() override;
|
||||
|
@ -2,7 +2,7 @@
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<blob_storage_disk>
|
||||
<type>blob_storage</type>
|
||||
<type>azure_blob_storage</type>
|
||||
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
|
||||
<container_name>cont</container_name>
|
||||
<skip_access_check>false</skip_access_check>
|
@ -2,7 +2,7 @@
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<blob_storage_disk>
|
||||
<type>blob_storage</type>
|
||||
<type>azure_blob_storage</type>
|
||||
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
|
||||
<container_name>cont</container_name>
|
||||
<container_already_exists>false</container_already_exists>
|
@ -12,7 +12,7 @@ CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node/configs/config.d/storage_conf.
|
||||
|
||||
NODE_NAME = "node"
|
||||
TABLE_NAME = "blob_storage_table"
|
||||
BLOB_STORAGE_DISK = "blob_storage_disk"
|
||||
AZURE_BLOB_STORAGE_DISK = "blob_storage_disk"
|
||||
LOCAL_DISK = "hdd"
|
||||
CONTAINER_NAME = "cont"
|
||||
|
||||
@ -196,7 +196,7 @@ def test_move_partition_to_another_disk(cluster):
|
||||
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{LOCAL_DISK}'")
|
||||
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
|
||||
|
||||
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{BLOB_STORAGE_DISK}'")
|
||||
node.query(f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{AZURE_BLOB_STORAGE_DISK}'")
|
||||
assert node.query(f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(8192)"
|
||||
|
||||
|
||||
@ -327,7 +327,7 @@ def test_restart_during_load(cluster):
|
||||
def restart_disk():
|
||||
for iii in range(0, 2):
|
||||
logging.info(f"Restarting disk, attempt {iii}")
|
||||
node.query(f"SYSTEM RESTART DISK {BLOB_STORAGE_DISK}")
|
||||
node.query(f"SYSTEM RESTART DISK {AZURE_BLOB_STORAGE_DISK}")
|
||||
logging.info(f"Disk restarted, attempt {iii}")
|
||||
time.sleep(0.5)
|
||||
|
Loading…
Reference in New Issue
Block a user