Merge branch 'i_object_storage' of github.com:ClickHouse/ClickHouse into i_object_storage

This commit is contained in:
alesapin 2022-05-20 11:49:09 +02:00
commit 159d4a41c2
30 changed files with 440 additions and 1340 deletions

View File

@ -66,27 +66,27 @@ AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::Abstr
template <class T> template <class T>
std::shared_ptr<T> getClientWithConnectionString(const String & connection_str, const String & container_name) = delete; std::unique_ptr<T> getClientWithConnectionString(const String & connection_str, const String & container_name) = delete;
template<> template<>
std::shared_ptr<BlobServiceClient> getClientWithConnectionString( std::unique_ptr<BlobServiceClient> getClientWithConnectionString(
const String & connection_str, const String & /*container_name*/) const String & connection_str, const String & /*container_name*/)
{ {
return std::make_shared<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(connection_str)); return std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(connection_str));
} }
template<> template<>
std::shared_ptr<BlobContainerClient> getClientWithConnectionString( std::unique_ptr<BlobContainerClient> getClientWithConnectionString(
const String & connection_str, const String & container_name) const String & connection_str, const String & container_name)
{ {
return std::make_shared<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(connection_str, container_name)); return std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(connection_str, container_name));
} }
template <class T> template <class T>
std::shared_ptr<T> getAzureBlobStorageClientWithAuth( std::unique_ptr<T> getAzureBlobStorageClientWithAuth(
const String & url, const String & container_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix) const String & url, const String & container_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{ {
if (config.has(config_prefix + ".connection_string")) if (config.has(config_prefix + ".connection_string"))
@ -101,15 +101,15 @@ std::shared_ptr<T> getAzureBlobStorageClientWithAuth(
config.getString(config_prefix + ".account_name"), config.getString(config_prefix + ".account_name"),
config.getString(config_prefix + ".account_key") config.getString(config_prefix + ".account_key")
); );
return std::make_shared<T>(url, storage_shared_key_credential); return std::make_unique<T>(url, storage_shared_key_credential);
} }
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>(); auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
return std::make_shared<T>(url, managed_identity_credential); return std::make_unique<T>(url, managed_identity_credential);
} }
std::shared_ptr<BlobContainerClient> getAzureBlobContainerClient( std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix) const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{ {
auto endpoint = processAzureBlobStorageEndpoint(config, config_prefix); auto endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
@ -136,10 +136,20 @@ std::shared_ptr<BlobContainerClient> getAzureBlobContainerClient(
} }
} }
return std::make_shared<BlobContainerClient>( return std::make_unique<BlobContainerClient>(
blob_service_client->CreateBlobContainer(container_name).Value); blob_service_client->CreateBlobContainer(container_name).Value);
} }
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
{
return std::make_unique<AzureObjectStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3)
);
}
} }
#endif #endif

View File

@ -4,15 +4,17 @@
#if USE_AZURE_BLOB_STORAGE #if USE_AZURE_BLOB_STORAGE
#include <Disks/IDiskRemote.h>
#include <azure/storage/blobs.hpp> #include <azure/storage/blobs.hpp>
#include <Disks/AzureObjectStorage.h>
namespace DB namespace DB
{ {
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> getAzureBlobContainerClient( std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> getAzureBlobContainerClient(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix); const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/);
} }
#endif #endif

View File

@ -1,168 +0,0 @@
#include <Disks/AzureBlobStorage/DiskAzureBlobStorage.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/RemoteDisksCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Common/getRandomASCIIString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int AZURE_BLOB_STORAGE_ERROR;
}
DiskAzureBlobStorageSettings::DiskAzureBlobStorageSettings(
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int max_single_read_retries_,
int max_single_download_retries_,
int thread_pool_size_) :
max_single_part_upload_size(max_single_part_upload_size_),
min_bytes_for_seek(min_bytes_for_seek_),
max_single_read_retries(max_single_read_retries_),
max_single_download_retries(max_single_download_retries_),
thread_pool_size(thread_pool_size_) {}
DiskAzureBlobStorage::DiskAzureBlobStorage(
const String & name_,
DiskPtr metadata_disk_,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
SettingsPtr settings_,
GetDiskSettings settings_getter_) :
IDiskRemote(name_, "", metadata_disk_, nullptr, "DiskAzureBlobStorage", settings_->thread_pool_size),
blob_container_client(blob_container_client_),
current_settings(std::move(settings_)),
settings_getter(settings_getter_) {}
std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
const String & path,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto settings = current_settings.get();
auto metadata = readMetadata(path);
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
blob_container_client, metadata.remote_fs_root_path, metadata.remote_fs_objects,
settings->max_single_read_retries, settings->max_single_download_retries, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(reader_impl));
}
else
{
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), current_settings.get()->min_bytes_for_seek);
}
}
std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
const String & path,
size_t buf_size,
WriteMode mode,
const WriteSettings &)
{
auto blob_path = path + "_" + getRandomASCIIString(8); /// NOTE: path contains the tmp_* prefix in the blob name
LOG_TRACE(log, "{} to file by path: {}. AzureBlob Storage path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), blob_path);
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
blob_container_client,
blob_path,
current_settings.get()->max_single_part_upload_size,
buf_size);
auto create_metadata_callback = [this, path, mode, blob_path] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_path, count] (Metadata & metadata) { metadata.addObject(blob_path, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(create_metadata_callback), blob_path);
}
DiskType DiskAzureBlobStorage::getType() const
{
return DiskType::AzureBlobStorage;
}
bool DiskAzureBlobStorage::isRemote() const
{
return true;
}
bool DiskAzureBlobStorage::supportZeroCopyReplication() const
{
return true;
}
bool DiskAzureBlobStorage::checkUniqueId(const String & id) const
{
Azure::Storage::Blobs::ListBlobsOptions blobs_list_options;
blobs_list_options.Prefix = id;
blobs_list_options.PageSizeHint = 1;
auto blobs_list_response = blob_container_client->ListBlobs(blobs_list_options);
auto blobs_list = blobs_list_response.Blobs;
for (const auto & blob : blobs_list)
{
if (id == blob.Name)
return true;
}
return false;
}
void DiskAzureBlobStorage::removeFromRemoteFS(const std::vector<String> & paths)
{
for (const auto & path : paths)
{
try
{
auto delete_info = blob_container_client->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
}
catch (const Azure::Storage::StorageException & e)
{
LOG_INFO(log, "Caught an error while deleting file {} : {}", path, e.Message);
throw;
}
}
}
void DiskAzureBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
{
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);
current_settings.set(std::move(new_settings));
if (AsyncExecutor * exec = dynamic_cast<AsyncExecutor*>(&getExecutor()))
exec->setMaxThreads(current_settings.get()->thread_pool_size);
}
}
#endif

View File

@ -1,86 +0,0 @@
#pragma once
#include <Common/config.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/IDiskRemote.h>
#include <IO/ReadBufferFromAzureBlobStorage.h>
#include <IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <azure/identity/managed_identity_credential.hpp>
#include <azure/storage/blobs.hpp>
namespace DB
{
struct DiskAzureBlobStorageSettings final
{
DiskAzureBlobStorageSettings(
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int max_single_read_retries,
int max_single_download_retries,
int thread_pool_size_);
size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
UInt64 min_bytes_for_seek;
size_t max_single_read_retries;
size_t max_single_download_retries;
size_t thread_pool_size;
};
class DiskAzureBlobStorage final : public IDiskRemote
{
public:
using SettingsPtr = std::unique_ptr<DiskAzureBlobStorageSettings>;
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextPtr)>;
DiskAzureBlobStorage(
const String & name_,
DiskPtr metadata_disk_,
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
SettingsPtr settings_,
GetDiskSettings settings_getter_);
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode,
const WriteSettings & settings) override;
DiskType getType() const override;
bool isRemote() const override;
bool supportZeroCopyReplication() const override;
bool checkUniqueId(const String & id) const override;
void removeFromRemoteFS(const std::vector<String> & paths) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) override;
private:
/// client used to access the files in the Blob Storage cloud
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
MultiVersion<DiskAzureBlobStorageSettings> current_settings;
/// Gets disk settings from context.
GetDiskSettings settings_getter;
};
}
#endif

View File

@ -7,9 +7,9 @@
#include <Disks/DiskRestartProxy.h> #include <Disks/DiskRestartProxy.h>
#include <Disks/DiskCacheWrapper.h> #include <Disks/DiskCacheWrapper.h>
#include <Disks/RemoteDisksCommon.h> #include <Disks/RemoteDisksCommon.h>
#include <Disks/AzureBlobStorage/DiskAzureBlobStorage.h>
#include <Disks/AzureBlobStorage/AzureBlobStorageAuth.h> #include <Disks/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Disks/AzureObjectStorage.h>
#include <Disks/DiskObjectStorage.h>
namespace DB namespace DB
{ {
@ -26,14 +26,12 @@ constexpr char test_file[] = "test.txt";
constexpr char test_str[] = "test"; constexpr char test_str[] = "test";
constexpr size_t test_str_size = 4; constexpr size_t test_str_size = 4;
void checkWriteAccess(IDisk & disk) void checkWriteAccess(IDisk & disk)
{ {
auto file = disk.writeFile(test_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite); auto file = disk.writeFile(test_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write(test_str, test_str_size); file->write(test_str, test_str_size);
} }
void checkReadAccess(IDisk & disk) void checkReadAccess(IDisk & disk)
{ {
auto file = disk.readFile(test_file); auto file = disk.readFile(test_file);
@ -43,7 +41,6 @@ void checkReadAccess(IDisk & disk)
throw Exception("No read access to disk", ErrorCodes::PATH_ACCESS_DENIED); throw Exception("No read access to disk", ErrorCodes::PATH_ACCESS_DENIED);
} }
void checkReadWithOffset(IDisk & disk) void checkReadWithOffset(IDisk & disk)
{ {
auto file = disk.readFile(test_file); auto file = disk.readFile(test_file);
@ -56,24 +53,11 @@ void checkReadWithOffset(IDisk & disk)
throw Exception("Failed to read file with offset", ErrorCodes::PATH_ACCESS_DENIED); throw Exception("Failed to read file with offset", ErrorCodes::PATH_ACCESS_DENIED);
} }
void checkRemoveAccess(IDisk & disk) void checkRemoveAccess(IDisk & disk)
{ {
disk.removeFile(test_file); disk.removeFile(test_file);
} }
std::unique_ptr<DiskAzureBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
{
return std::make_unique<DiskAzureBlobStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".thread_pool_size", 16)
);
}
} }
void registerDiskAzureBlobStorage(DiskFactory & factory) void registerDiskAzureBlobStorage(DiskFactory & factory)
@ -87,12 +71,27 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
{ {
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context); auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskAzureBlobStorage>( FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
ObjectStoragePtr azure_object_storage = std::make_unique<AzureObjectStorage>(
std::move(cache),
name, name,
metadata_disk,
getAzureBlobContainerClient(config, config_prefix), getAzureBlobContainerClient(config, config_prefix),
getSettings(config, config_prefix, context), getAzureBlobStorageSettings(config, config_prefix, context));
getSettings
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
name,
/* no namespaces */"",
"DiskAzureBlobStorage",
metadata_disk,
std::move(azure_object_storage),
DiskType::AzureBlobStorage,
send_metadata,
copy_thread_pool_size
); );
if (!config.getBool(config_prefix + ".skip_access_check", false)) if (!config.getBool(config_prefix + ".skip_access_check", false))
@ -103,9 +102,17 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
checkRemoveAccess(*azure_blob_storage_disk); checkRemoveAccess(*azure_blob_storage_disk);
} }
#ifdef NDEBUG
bool use_cache = true;
#else
/// Current cache implementation lead to allocations in destructor of
/// read buffer.
bool use_cache = false;
#endif
azure_blob_storage_disk->startup(context); azure_blob_storage_disk->startup(context);
if (config.getBool(config_prefix + ".cache_enabled", true)) if (config.getBool(config_prefix + ".cache_enabled", use_cache))
{ {
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/"); String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
azure_blob_storage_disk = wrapWithCache(azure_blob_storage_disk, "azure-blob-storage-cache", cache_path, metadata_path); azure_blob_storage_disk = wrapWithCache(azure_blob_storage_disk, "azure-blob-storage-cache", cache_path, metadata_path);

View File

@ -0,0 +1,218 @@
#include <Disks/AzureObjectStorage.h>
#if USE_AZURE_BLOB_STORAGE
#include <IO/ReadBufferFromAzureBlobStorage.h>
#include <IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/AzureBlobStorage/AzureBlobStorageAuth.h>
namespace DB
{
namespace ErrorCodes
{
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int UNSUPPORTED_METHOD;
}
AzureObjectStorage::AzureObjectStorage(
FileCachePtr && cache_,
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_)
: IObjectStorage(std::move(cache_))
, name(name_)
, client(std::move(client_))
, settings(std::move(settings_))
{
}
bool AzureObjectStorage::exists(const std::string & uri) const
{
auto client_ptr = client.get();
/// What a shame, no Exists method...
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = uri;
options.PageSizeHint = 1;
auto blobs_list_response = client_ptr->ListBlobs(options);
auto blobs_list = blobs_list_response.Blobs;
for (const auto & blob : blobs_list)
{
if (uri == blob.Name)
return true;
}
return false;
}
std::unique_ptr<SeekableReadBuffer> AzureObjectStorage::readObject( /// NOLINT
const std::string & path,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto settings_ptr = settings.get();
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(), path, settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries, read_settings.remote_fs_buffer_size);
}
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
const std::string & common_path_prefix,
const BlobsPathToSize & blobs_to_read,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto settings_ptr = settings.get();
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
client.get(), common_path_prefix, blobs_to_read,
settings_ptr->max_single_read_retries, settings_ptr->max_single_download_retries, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(reader_impl));
}
else
{
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(reader_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings_ptr->min_bytes_for_seek);
}
}
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NOLINT
const std::string & path,
WriteMode mode,
std::optional<ObjectAttributes>,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings &)
{
if (mode != WriteMode::Rewrite)
throw Exception("Azure storage doesn't support append", ErrorCodes::UNSUPPORTED_METHOD);
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
path,
settings.get()->max_single_part_upload_size,
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
}
void AzureObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & children) const
{
auto client_ptr = client.get();
Azure::Storage::Blobs::ListBlobsOptions blobs_list_options;
blobs_list_options.Prefix = path;
auto blobs_list_response = client_ptr->ListBlobs(blobs_list_options);
auto blobs_list = blobs_list_response.Blobs;
for (const auto & blob : blobs_list)
children.emplace_back(blob.Name, blob.BlobSize);
}
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void AzureObjectStorage::removeObject(const std::string & path)
{
auto client_ptr = client.get();
auto delete_info = client_ptr->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
}
void AzureObjectStorage::removeObjects(const std::vector<std::string> & paths)
{
auto client_ptr = client.get();
for (const auto & path : paths)
{
auto delete_info = client_ptr->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
}
}
void AzureObjectStorage::removeObjectIfExists(const std::string & path)
{
auto client_ptr = client.get();
auto delete_info = client_ptr->DeleteBlob(path);
}
void AzureObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
{
auto client_ptr = client.get();
for (const auto & path : paths)
auto delete_info = client_ptr->DeleteBlob(path);
}
ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) const
{
auto client_ptr = client.get();
auto blob_client = client_ptr->GetBlobClient(path);
auto properties = blob_client.GetProperties().Value;
ObjectMetadata result;
result.size_bytes = properties.BlobSize;
if (!properties.Metadata.empty())
{
result.attributes.emplace();
for (const auto & [key, value] : properties.Metadata)
(*result.attributes)[key] = value;
}
result.last_modified.emplace(properties.LastModified.time_since_epoch().count());
return result;
}
void AzureObjectStorage::copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
std::optional<ObjectAttributes> object_to_attributes)
{
auto client_ptr = client.get();
auto dest_blob_client = client_ptr->GetBlobClient(object_to);
auto source_blob_client = client_ptr->GetBlobClient(object_from);
Azure::Storage::Blobs::CopyBlobFromUriOptions copy_options;
if (object_to_attributes.has_value())
{
for (const auto & [key, value] : *object_to_attributes)
copy_options.Metadata[key] = value;
}
dest_blob_client.CopyFromUri(source_blob_client.GetUrl(), copy_options);
}
void AzureObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
auto new_settings = getAzureBlobStorageSettings(config, config_prefix, context);
auto new_client = getAzureBlobContainerClient(config, config_prefix);
client.set(std::move(new_client));
settings.set(std::move(new_settings));
}
std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(const std::string &, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
return std::make_unique<AzureObjectStorage>(
nullptr,
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context)
);
}
}
#endif

View File

@ -0,0 +1,113 @@
#pragma once
#include <Common/config.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/RemoteDisksCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IObjectStorage.h>
#include <Common/getRandomASCIIString.h>
namespace DB
{
struct AzureObjectStorageSettings
{
AzureObjectStorageSettings(
uint64_t max_single_part_upload_size_,
uint64_t min_bytes_for_seek_,
int max_single_read_retries_,
int max_single_download_retries_)
: max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
{
}
size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
uint64_t min_bytes_for_seek;
size_t max_single_read_retries;
size_t max_single_download_retries;
};
using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
using AzureClientPtr = std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient>;
class AzureObjectStorage : public IObjectStorage
{
public:
using SettingsPtr = std::unique_ptr<AzureObjectStorageSettings>;
AzureObjectStorage(
FileCachePtr && cache_,
const String & name_,
AzureClientPtr && client_,
SettingsPtr && settings_);
bool exists(const std::string & uri) const override;
std::unique_ptr<SeekableReadBuffer> readObject( /// NOLINT
const std::string & path,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const std::string & common_path_prefix,
const BlobsPathToSize & blobs_to_read,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, BlobsPathToSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;
void removeObjects(const std::vector<std::string> & paths) override;
void removeObjectIfExists(const std::string & path) override;
void removeObjectsIfExist(const std::vector<std::string> & paths) override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;
void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override {}
void startup() override {}
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
String getObjectsNamespace() const override { return ""; }
std::unique_ptr<IObjectStorage> cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override;
private:
const String name;
/// client used to access the files in the Blob Storage cloud
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
MultiVersion<AzureObjectStorageSettings> settings;
};
}
#endif

View File

@ -15,6 +15,7 @@
#include <Disks/IO/ThreadPoolRemoteFSReader.h> #include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Common/FileCache.h> #include <Common/FileCache.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB namespace DB
{ {
@ -875,7 +876,7 @@ void DiskObjectStorageMetadataHelper::findLastRevision()
LOG_INFO(disk->log, "Found last revision number {} for disk {}", revision_counter, disk->name); LOG_INFO(disk->log, "Found last revision number {} for disk {}", revision_counter, disk->name);
} }
int DiskObjectStorageMetadataHelper::readSchemaVersion(IObjectStorage * object_storage, const String & source_path) const int DiskObjectStorageMetadataHelper::readSchemaVersion(IObjectStorage * object_storage, const String & source_path)
{ {
const std::string path = source_path + SCHEMA_VERSION_OBJECT; const std::string path = source_path + SCHEMA_VERSION_OBJECT;
int version = 0; int version = 0;

View File

@ -298,7 +298,7 @@ public:
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const; void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const;
void findLastRevision(); void findLastRevision();
int readSchemaVersion(IObjectStorage * object_storage, const String & source_path) const; static int readSchemaVersion(IObjectStorage * object_storage, const String & source_path);
void saveSchemaVersion(const int & version) const; void saveSchemaVersion(const int & version) const;
void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const; void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const;
void migrateFileToRestorableSchema(const String & path) const; void migrateFileToRestorableSchema(const String & path) const;

View File

@ -5,8 +5,10 @@
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{extern const int DEADLOCK_AVOIDED; {
extern const int DEADLOCK_AVOIDED;
} }
using Millis = std::chrono::milliseconds; using Millis = std::chrono::milliseconds;

View File

@ -9,8 +9,13 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Disks/IDiskRemote.h> #include <Disks/IDisk.h>
#include <Disks/IObjectStorage.h>
#include <IO/ReadBufferFromFile.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h> #include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h> #include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h> #include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h> #include <Disks/IO/ReadBufferFromRemoteFSGather.h>
@ -173,7 +178,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{ {
auto reader = IDiskRemote::getThreadPoolReader(); auto reader = IObjectStorage::getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek); return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
} }
else else

View File

@ -1,10 +1,13 @@
#pragma once #pragma once
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <Core/UUID.h> #include <Core/UUID.h>
#include <set> #include <set>
#include <Interpreters/Context_fwd.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
namespace DB namespace DB
{ {

View File

@ -35,7 +35,6 @@ bool HDFSObjectStorage::exists(const std::string & hdfs_uri) const
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String remote_fs_object_path = hdfs_uri.substr(begin_of_path); const String remote_fs_object_path = hdfs_uri.substr(begin_of_path);
return (0 == hdfsExists(hdfs_fs.get(), remote_fs_object_path.c_str())); return (0 == hdfsExists(hdfs_fs.get(), remote_fs_object_path.c_str()));
} }
std::unique_ptr<SeekableReadBuffer> HDFSObjectStorage::readObject( /// NOLINT std::unique_ptr<SeekableReadBuffer> HDFSObjectStorage::readObject( /// NOLINT
@ -72,9 +71,9 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS API doesn't support custom attributes/metadata for stored objects");
/// Single O_WRONLY in libhdfs adds O_TRUNC /// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(path, auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
config, settings->replication, buf_size, path, config, settings->replication, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND); mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), path); return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), path);
} }

View File

@ -1,8 +0,0 @@
#pragma once
#include
namespace DB
{
}

View File

@ -1,702 +0,0 @@
#include <Disks/IDiskRemote.h>
#include "Disks/DiskFactory.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <Common/checkStackSize.h>
#include <boost/algorithm/string.hpp>
#include <Common/filesystemHelpers.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Common/FileCache.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DISK_INDEX;
extern const int UNKNOWN_FORMAT;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;;
extern const int FILE_DOESNT_EXIST;
extern const int BAD_FILE_TYPE;
}
IDiskRemote::Metadata IDiskRemote::Metadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
updater(result);
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
metadata_disk_->removeFile(metadata_file_path_);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
{
if (overwrite || !metadata_disk_->exists(metadata_file_path_))
{
return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync);
}
else
{
auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
if (result.read_only)
throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED);
return result;
}
}
void IDiskRemote::Metadata::load()
{
const ReadSettings read_settings;
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, *buf);
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
assertChar('\n', *buf);
UInt32 remote_fs_objects_count;
readIntText(remote_fs_objects_count, *buf);
assertChar('\t', *buf);
readIntText(total_size, *buf);
assertChar('\n', *buf);
remote_fs_objects.resize(remote_fs_objects_count);
for (size_t i = 0; i < remote_fs_objects_count; ++i)
{
String remote_fs_object_path;
size_t remote_fs_object_size;
readIntText(remote_fs_object_size, *buf);
assertChar('\t', *buf);
readEscapedString(remote_fs_object_path, *buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath());
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', *buf);
remote_fs_objects[i].relative_path = remote_fs_object_path;
remote_fs_objects[i].bytes_size = remote_fs_object_size;
}
readIntText(ref_count, *buf);
assertChar('\n', *buf);
if (version >= VERSION_READ_ONLY_FLAG)
{
readBoolText(read_only, *buf);
assertChar('\n', *buf);
}
}
/// Load metadata by path or create empty if `create` flag is set.
IDiskRemote::Metadata::Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_)
: remote_fs_root_path(remote_fs_root_path_)
, metadata_file_path(metadata_file_path_)
, metadata_disk(metadata_disk_)
{
}
void IDiskRemote::Metadata::addObject(const String & path, size_t size)
{
total_size += size;
remote_fs_objects.emplace_back(path, size);
}
void IDiskRemote::Metadata::saveToBuffer(WriteBuffer & buf, bool sync)
{
writeIntText(VERSION_RELATIVE_PATHS, buf);
writeChar('\n', buf);
writeIntText(remote_fs_objects.size(), buf);
writeChar('\t', buf);
writeIntText(total_size, buf);
writeChar('\n', buf);
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
{
writeIntText(remote_fs_object_size, buf);
writeChar('\t', buf);
writeEscapedString(remote_fs_object_path, buf);
writeChar('\n', buf);
}
writeIntText(ref_count, buf);
writeChar('\n', buf);
writeBoolText(read_only, buf);
writeChar('\n', buf);
buf.finalize();
if (sync)
buf.sync();
}
/// Fsync metadata file if 'sync' flag is set.
void IDiskRemote::Metadata::save(bool sync)
{
auto buf = metadata_disk->writeFile(metadata_file_path, 1024);
saveToBuffer(*buf, sync);
}
std::string IDiskRemote::Metadata::serializeToString()
{
WriteBufferFromOwnString write_buf;
saveToBuffer(write_buf, false);
return write_buf.str();
}
IDiskRemote::Metadata IDiskRemote::readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const
{
return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path);
}
IDiskRemote::Metadata IDiskRemote::readMetadata(const String & path) const
{
std::shared_lock lock(metadata_mutex);
return readMetadataUnlocked(path, lock);
}
IDiskRemote::Metadata IDiskRemote::readUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
IDiskRemote::Metadata IDiskRemote::readUpdateStoreMetadataAndRemove(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater);
}
IDiskRemote::Metadata IDiskRemote::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, IDiskRemote::MetadataUpdater updater)
{
if (mode == WriteMode::Rewrite || !metadata_disk->exists(path))
{
std::unique_lock lock(metadata_mutex);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
else
{
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
}
IDiskRemote::Metadata IDiskRemote::createAndStoreMetadata(const String & path, bool sync)
{
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync);
}
IDiskRemote::Metadata IDiskRemote::createUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
std::unordered_map<String, String> IDiskRemote::getSerializedMetadata(const std::vector<std::string> & file_paths) const
{
std::unordered_map<String, String> metadatas;
std::shared_lock lock(metadata_mutex);
for (const auto & path : file_paths)
{
IDiskRemote::Metadata metadata = readMetadataUnlocked(path, lock);
metadata.ref_count = 0;
metadatas[path] = metadata.serializeToString();
}
return metadatas;
}
void IDiskRemote::removeMetadata(const String & path, std::vector<String> & paths_to_remove)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
if (!metadata_disk->exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
if (!metadata_disk->isFile(path))
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path);
try
{
auto metadata_updater = [&paths_to_remove, this] (Metadata & metadata)
{
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
{
paths_to_remove.push_back(remote_fs_root_path + remote_fs_object_path);
if (cache)
{
auto key = cache->hash(remote_fs_object_path);
cache->remove(key);
}
}
return false;
}
else /// In other case decrement number of references, save metadata and delete hardlink.
{
--metadata.ref_count;
}
return true;
};
readUpdateStoreMetadataAndRemove(path, false, metadata_updater);
/// If there is no references - delete content from remote FS.
}
catch (const Exception & e)
{
/// If it's impossible to read meta - just remove it from FS.
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
{
LOG_WARNING(log,
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
backQuote(path), e.nested() ? e.nested()->message() : e.message());
metadata_disk->removeFile(path);
}
else
throw;
}
}
void IDiskRemote::removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (metadata_disk->isFile(path))
{
removeMetadata(path, paths_to_remove[path]);
}
else
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
removeMetadataRecursive(it->path(), paths_to_remove);
metadata_disk->removeDirectory(path);
}
}
std::vector<String> IDiskRemote::getRemotePaths(const String & local_path) const
{
auto metadata = readMetadata(local_path);
std::vector<String> remote_paths;
for (const auto & [remote_path, _] : metadata.remote_fs_objects)
remote_paths.push_back(fs::path(metadata.remote_fs_root_path) / remote_path);
return remote_paths;
}
void IDiskRemote::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
{
/// Protect against concurrent delition of files (for example because of a merge).
if (metadata_disk->isFile(local_path))
{
try
{
paths_map.emplace_back(local_path, getRemotePaths(local_path));
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
return;
throw;
}
}
else
{
DiskDirectoryIteratorPtr it;
try
{
it = iterateDirectory(local_path);
}
catch (const fs::filesystem_error & e)
{
if (e.code() == std::errc::no_such_file_or_directory)
return;
throw;
}
for (; it->isValid(); it->next())
IDiskRemote::getRemotePathsRecursive(fs::path(local_path) / it->name(), paths_map);
}
}
DiskPtr DiskRemoteReservation::getDisk(size_t i) const
{
if (i != 0)
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
return disk;
}
void DiskRemoteReservation::update(UInt64 new_size)
{
std::lock_guard lock(disk->reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
DiskRemoteReservation::~DiskRemoteReservation()
{
try
{
std::lock_guard lock(disk->reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
IDiskRemote::IDiskRemote(
const String & name_,
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
FileCachePtr cache_,
const String & log_name_,
size_t thread_pool_size)
: IDisk(std::make_unique<AsyncExecutor>(log_name_, thread_pool_size))
, log(&Poco::Logger::get(log_name_))
, name(name_)
, remote_fs_root_path(remote_fs_root_path_)
, metadata_disk(metadata_disk_)
, cache(cache_)
{
}
String IDiskRemote::getCacheBasePath() const
{
return cache ? cache->getBasePath() : "";
}
bool IDiskRemote::exists(const String & path) const
{
return metadata_disk->exists(path);
}
bool IDiskRemote::isFile(const String & path) const
{
return metadata_disk->isFile(path);
}
void IDiskRemote::createFile(const String & path)
{
createAndStoreMetadata(path, false);
}
size_t IDiskRemote::getFileSize(const String & path) const
{
return readMetadata(path).total_size;
}
void IDiskRemote::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
metadata_disk->moveFile(from_path, to_path);
}
void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
{
const String tmp_path = to_path + ".old";
moveFile(to_path, tmp_path);
moveFile(from_path, to_path);
removeFile(tmp_path);
}
else
moveFile(from_path, to_path);
}
void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only)
{
std::vector<String> paths_to_remove;
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(paths_to_remove);
}
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
std::vector<String> paths_to_remove;
if (metadata_disk->exists(path))
{
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(paths_to_remove);
}
}
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
std::unordered_map<String, std::vector<String>> paths_to_remove;
for (const auto & file : files)
{
bool skip = file.if_exists && !metadata_disk->exists(file.path);
if (!skip)
removeMetadata(file.path, paths_to_remove[file.path]);
}
if (!keep_all_batch_data)
{
std::vector<String> remove_from_remote;
for (auto && [path, remote_paths] : paths_to_remove)
{
if (!file_names_remove_metadata_only.contains(fs::path(path).filename()))
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
}
removeFromRemoteFS(remove_from_remote);
}
}
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
std::unordered_map<String, std::vector<String>> paths_to_remove;
removeMetadataRecursive(path, paths_to_remove);
if (!keep_all_batch_data)
{
std::vector<String> remove_from_remote;
for (auto && [local_path, remote_paths] : paths_to_remove)
{
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
}
removeFromRemoteFS(remove_from_remote);
}
}
void IDiskRemote::setReadOnly(const String & path)
{
/// We should store read only flag inside metadata file (instead of using FS flag),
/// because we modify metadata file when create hard-links from it.
readUpdateAndStoreMetadata(path, false, [] (Metadata & metadata) { metadata.read_only = true; return true; });
}
bool IDiskRemote::isDirectory(const String & path) const
{
return metadata_disk->isDirectory(path);
}
void IDiskRemote::createDirectory(const String & path)
{
metadata_disk->createDirectory(path);
}
void IDiskRemote::createDirectories(const String & path)
{
metadata_disk->createDirectories(path);
}
void IDiskRemote::clearDirectory(const String & path)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (isFile(it->path()))
removeFile(it->path());
}
void IDiskRemote::removeDirectory(const String & path)
{
metadata_disk->removeDirectory(path);
}
DiskDirectoryIteratorPtr IDiskRemote::iterateDirectory(const String & path)
{
return metadata_disk->iterateDirectory(path);
}
void IDiskRemote::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}
void IDiskRemote::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
metadata_disk->setLastModified(path, timestamp);
}
Poco::Timestamp IDiskRemote::getLastModified(const String & path)
{
return metadata_disk->getLastModified(path);
}
void IDiskRemote::createHardLink(const String & src_path, const String & dst_path)
{
readUpdateAndStoreMetadata(src_path, false, [] (Metadata & metadata) { metadata.ref_count++; return true; });
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
}
ReservationPtr IDiskRemote::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
return {};
return std::make_unique<DiskRemoteReservation>(std::static_pointer_cast<IDiskRemote>(shared_from_this()), bytes);
}
bool IDiskRemote::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
if (bytes == 0)
{
LOG_TRACE(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name));
++reservation_count;
return true;
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_TRACE(log, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return true;
}
return false;
}
String IDiskRemote::getUniqueId(const String & path) const
{
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
auto metadata = readMetadata(path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].relative_path;
return id;
}
AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
{
constexpr size_t pool_size = 50;
constexpr size_t queue_size = 1000000;
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(pool_size, queue_size);
return reader;
}
UInt32 IDiskRemote::getRefCount(const String & path) const
{
return readMetadata(path).ref_count;
}
ThreadPool & IDiskRemote::getThreadPoolWriter()
{
constexpr size_t pool_size = 100;
constexpr size_t queue_size = 1000000;
static ThreadPool writer(pool_size, pool_size, queue_size);
return writer;
}
}

View File

@ -1,302 +0,0 @@
#pragma once
#include <Common/config.h>
#include <atomic>
#include <Common/FileCache_fwd.h>
#include <Disks/DiskFactory.h>
#include <Disks/Executor.h>
#include <utility>
#include <mutex>
#include <shared_mutex>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
#include <filesystem>
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
class IAsynchronousReader;
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
/// Base Disk class for remote FS's, which are not posix-compatible (e.g. DiskS3, DiskHDFS, DiskBlobStorage)
class IDiskRemote : public IDisk
{
friend class DiskRemoteReservation;
public:
IDiskRemote(
const String & name_,
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
FileCachePtr cache_,
const String & log_name_,
size_t thread_pool_size);
struct Metadata;
using MetadataUpdater = std::function<bool(Metadata & metadata)>;
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_disk->getPath(); }
String getCacheBasePath() const final override;
std::vector<String> getRemotePaths(const String & local_path) const final override;
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
/// Methods for working with metadata. For some operations (like hardlink
/// creation) metadata can be updated concurrently from multiple threads
/// (file actually rewritten on disk). So additional RW lock is required for
/// metadata read and write, but not for create new metadata.
Metadata readMetadata(const String & path) const;
Metadata readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const;
Metadata readUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
Metadata readUpdateStoreMetadataAndRemove(const String & path, bool sync, MetadataUpdater updater);
Metadata readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, MetadataUpdater updater);
Metadata createAndStoreMetadata(const String & path, bool sync);
Metadata createUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getKeepingFreeSpace() const override { return 0; }
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
void createFile(const String & path) override;
size_t getFileSize(const String & path) const override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void removeFile(const String & path) override { removeSharedFile(path, false); }
void removeFileIfExists(const String & path) override { removeSharedFileIfExists(path, false); }
void removeRecursive(const String & path) override { removeSharedRecursive(path, false, {}); }
void removeSharedFile(const String & path, bool delete_metadata_only) override;
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
void setReadOnly(const String & path) override;
bool isDirectory(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
void removeDirectory(const String & path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override = 0;
virtual void removeFromRemoteFS(const std::vector<String> & paths) = 0;
static AsynchronousReaderPtr getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
UInt32 getRefCount(const String & path) const override;
/// Return metadata for each file path. Also, before serialization reset
/// ref_count for each metadata to zero. This function used only for remote
/// fetches/sends in replicated engines. That's why we reset ref_count to zero.
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
protected:
Poco::Logger * log;
const String name;
const String remote_fs_root_path;
DiskPtr metadata_disk;
FileCachePtr cache;
private:
void removeMetadata(const String & path, std::vector<String> & paths_to_remove);
void removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove);
bool tryReserve(UInt64 bytes);
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
mutable std::shared_mutex metadata_mutex;
};
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
/// Remote FS (S3, HDFS) metadata file layout:
/// FS objects, their number and total size of all FS objects.
/// Each FS object represents a file path in remote FS and its size.
struct IDiskRemote::Metadata
{
using Updater = std::function<bool(IDiskRemote::Metadata & metadata)>;
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
/// Remote FS objects paths and their sizes.
std::vector<BlobPathWithSize> remote_fs_objects;
/// URI
const String & remote_fs_root_path;
/// Relative path to metadata file on local FS.
const String metadata_file_path;
DiskPtr metadata_disk;
/// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0;
/// Number of references (hardlinks) to this metadata file.
///
/// FIXME: Why we are tracking it explicetly, without
/// info from filesystem????
UInt32 ref_count = 0;
/// Flag indicates that file is read only.
bool read_only = false;
Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_);
void addObject(const String & path, size_t size);
static Metadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
static Metadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
static Metadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite);
/// Serialize metadata to string (very same with saveToBuffer)
std::string serializeToString();
private:
/// Fsync metadata file if 'sync' flag is set.
void save(bool sync = false);
void saveToBuffer(WriteBuffer & buffer, bool sync);
void load();
};
class DiskRemoteReservation final : public IReservation
{
public:
DiskRemoteReservation(const RemoteDiskPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk(size_t i) const override;
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override;
~DiskRemoteReservation() override;
private:
RemoteDiskPtr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
public:
explicit AsyncExecutor(const String & name_, int thread_pool_size)
: name(name_)
, pool(ThreadPool(thread_pool_size)) {}
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
pool.scheduleOrThrowOnError(
[promise, task]()
{
try
{
task();
promise->set_value();
}
catch (...)
{
tryLogCurrentException("Failed to run async task");
try
{
promise->set_exception(std::current_exception());
}
catch (...) {}
}
});
return promise->get_future();
}
void setMaxThreads(size_t threads)
{
pool.setMaxThreads(threads);
}
private:
String name;
ThreadPool pool;
};
}

View File

@ -1,6 +1,5 @@
#include "ReadBufferFromRemoteFSGather.h" #include "ReadBufferFromRemoteFSGather.h"
#include <Disks/IDiskRemote.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <Disks/IO/ReadBufferFromWebServer.h> #include <Disks/IO/ReadBufferFromWebServer.h>

View File

@ -1,9 +1,9 @@
#pragma once #pragma once
#include <Common/config.h> #include <Common/config.h>
#include <Disks/IDiskRemote.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
#include <Disks/IObjectStorage.h>
#if USE_AZURE_BLOB_STORAGE #if USE_AZURE_BLOB_STORAGE
#include <azure/storage/blobs.hpp> #include <azure/storage/blobs.hpp>
@ -146,7 +146,7 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS
{ {
public: public:
ReadBufferFromAzureBlobStorageGather( ReadBufferFromAzureBlobStorageGather(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const std::string & common_path_prefix_, const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_, const BlobsPathToSize & blobs_to_read_,
size_t max_single_read_retries_, size_t max_single_read_retries_,
@ -162,7 +162,7 @@ public:
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override; SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
private: private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client; std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
size_t max_single_read_retries; size_t max_single_read_retries;
size_t max_single_download_retries; size_t max_single_download_retries;
}; };

View File

@ -2,7 +2,6 @@
#include <Common/config.h> #include <Common/config.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Disks/IDiskRemote.h>
#include <utility> #include <utility>

View File

@ -4,7 +4,6 @@
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h> #include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IDiskRemote.h>
namespace DB namespace DB

View File

@ -2,7 +2,6 @@
#include <Common/config.h> #include <Common/config.h>
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h> #include <IO/WriteBufferFromFileDecorator.h>

View File

@ -80,7 +80,6 @@ void registerDiskS3(DiskFactory & factory)
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context); FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>( ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>(
std::move(cache), getClient(config, config_prefix, context), std::move(cache), getClient(config, config_prefix, context),
getSettings(config, config_prefix, context), getSettings(config, config_prefix, context),

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & path_, const String & path_,
size_t max_single_read_retries_, size_t max_single_read_retries_,
size_t max_single_download_retries_, size_t max_single_download_retries_,

View File

@ -17,8 +17,8 @@ class ReadBufferFromAzureBlobStorage : public SeekableReadBuffer, public WithFil
{ {
public: public:
explicit ReadBufferFromAzureBlobStorage( ReadBufferFromAzureBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & path_, const String & path_,
size_t max_single_read_retries_, size_t max_single_read_retries_,
size_t max_single_download_retries_, size_t max_single_download_retries_,
@ -41,7 +41,7 @@ private:
void initialize(); void initialize();
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream; std::unique_ptr<Azure::Core::IO::BodyStream> data_stream;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client; std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client; std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client;
const String path; const String path;

View File

@ -12,14 +12,18 @@ namespace DB
{ {
WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage( WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_, const String & blob_path_,
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
size_t buf_size_) : size_t buf_size_,
BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0), std::optional<std::map<std::string, std::string>> attributes_)
blob_container_client(blob_container_client_), : BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0)
max_single_part_upload_size(max_single_part_upload_size_), , blob_container_client(blob_container_client_)
blob_path(blob_path_) {} , max_single_part_upload_size(max_single_part_upload_size_)
, blob_path(blob_path_)
, attributes(attributes_)
{
}
WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage() WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
@ -29,6 +33,15 @@ WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
void WriteBufferFromAzureBlobStorage::finalizeImpl() void WriteBufferFromAzureBlobStorage::finalizeImpl()
{ {
if (attributes.has_value())
{
auto blob_client = blob_container_client->GetBlobClient(blob_path);
Azure::Storage::Metadata metadata;
for (const auto & [key, value] : *attributes)
metadata[key] = value;
blob_client.SetMetadata(metadata);
}
const size_t max_tries = 3; const size_t max_tries = 3;
for (size_t i = 0; i < max_tries; ++i) for (size_t i = 0; i < max_tries; ++i)
{ {

View File

@ -19,11 +19,12 @@ class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer>
{ {
public: public:
explicit WriteBufferFromAzureBlobStorage( WriteBufferFromAzureBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_, const String & blob_path_,
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
size_t buf_size_); size_t buf_size_,
std::optional<std::map<std::string, std::string>> attributes_ = {});
~WriteBufferFromAzureBlobStorage() override; ~WriteBufferFromAzureBlobStorage() override;
@ -32,9 +33,10 @@ public:
private: private:
void finalizeImpl() override; void finalizeImpl() override;
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client; std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
size_t max_single_part_upload_size; size_t max_single_part_upload_size;
const String blob_path; const String blob_path;
std::optional<std::map<std::string, std::string>> attributes;
}; };
} }

View File

@ -29,7 +29,7 @@
#include <Storages/CompressionCodecSelector.h> #include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h> #include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h> #include <Disks/DiskLocal.h>
#include <Disks/IDiskRemote.h> #include <Disks/IObjectStorage.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h> #include <Interpreters/ActionLocksManager.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h> #include <Interpreters/ExternalLoaderXMLConfigRepository.h>
@ -313,7 +313,7 @@ struct ContextSharedPart
/// since it may use per-user MemoryTracker which will be destroyed here. /// since it may use per-user MemoryTracker which will be destroyed here.
try try
{ {
IDiskRemote::getThreadPoolWriter().wait(); IObjectStorage::getThreadPoolWriter().wait();
} }
catch (...) catch (...)
{ {

View File

@ -1,7 +1,6 @@
#include <Storages/MergeTree/DataPartsExchange.h> #include <Storages/MergeTree/DataPartsExchange.h>
#include <Formats/NativeWriter.h> #include <Formats/NativeWriter.h>
#include <Disks/IDiskRemote.h>
#include <Disks/SingleDiskVolume.h> #include <Disks/SingleDiskVolume.h>
#include <Disks/createVolume.h> #include <Disks/createVolume.h>
#include <IO/HTTPCommon.h> #include <IO/HTTPCommon.h>

View File

@ -1,7 +1,6 @@
#include <Storages/System/StorageSystemDisks.h> #include <Storages/System/StorageSystemDisks.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Disks/IDiskRemote.h>
namespace DB namespace DB
{ {

View File

@ -2,11 +2,9 @@
-- { echo } -- { echo }
SYSTEM DROP FILESYSTEM CACHE;
SET enable_filesystem_cache_on_write_operations=0; SET enable_filesystem_cache_on_write_operations=0;
SET max_memory_usage='20G'; SET max_memory_usage='20G';
SYSTEM DROP FILESYSTEM CACHE;
SELECT count() FROM test.hits_s3; SELECT count() FROM test.hits_s3;
SELECT count() FROM test.hits_s3 WHERE AdvEngineID != 0; SELECT count() FROM test.hits_s3 WHERE AdvEngineID != 0;
SELECT sum(AdvEngineID), count(), avg(ResolutionWidth) FROM test.hits_s3 ; SELECT sum(AdvEngineID), count(), avg(ResolutionWidth) FROM test.hits_s3 ;