mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Finish changes
This commit is contained in:
parent
eb294075a6
commit
383d40ea03
@ -6,6 +6,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
|
||||
extern const int NO_ELEMENTS_IN_CONFIG;
|
||||
}
|
||||
|
||||
DiskFactory & DiskFactory::instance()
|
||||
@ -14,7 +15,7 @@ DiskFactory & DiskFactory::instance()
|
||||
return factory;
|
||||
}
|
||||
|
||||
void DiskFactory::registerDiskType(const String & disk_type, DB::DiskFactory::Creator creator)
|
||||
void DiskFactory::registerDiskType(const String & disk_type, Creator creator)
|
||||
{
|
||||
if (!registry.emplace(disk_type, creator).second)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "DiskFactory: the disk type '{}' is not unique", disk_type);
|
||||
@ -31,7 +32,10 @@ DiskPtr DiskFactory::create(
|
||||
|
||||
const auto found = registry.find(disk_type);
|
||||
if (found == registry.end())
|
||||
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "DiskFactory: the disk '{}' has unknown disk type: {}", name, disk_type);
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
|
||||
"DiskFactory: the disk '{}' has unknown disk type: {}", name, disk_type);
|
||||
}
|
||||
|
||||
const auto & disk_creator = found->second;
|
||||
return disk_creator(name, config, config_prefix, context, map);
|
||||
|
@ -63,9 +63,7 @@ public:
|
||||
|
||||
std::string getName() const override { return "AzureObjectStorage"; }
|
||||
|
||||
std::string getBasePath() const override { return ""; } /// No namespaces in azure.
|
||||
|
||||
std::string getTypeName() const override { return "azure_blob_storage"; }
|
||||
std::string getDataPrefix() const override { return ""; } /// No namespaces in azure.
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
|
@ -24,12 +24,10 @@ public:
|
||||
|
||||
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
|
||||
|
||||
std::string getBasePath() const override { return object_storage->getBasePath(); }
|
||||
std::string getDataPrefix() const override { return object_storage->getDataPrefix(); }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
std::string getTypeName() const override { return object_storage->getTypeName(); }
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
|
||||
const StoredObject & object,
|
||||
const ReadSettings & read_settings = ReadSettings{},
|
||||
|
@ -39,7 +39,10 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
|
||||
}
|
||||
|
||||
FileCacheSettings file_cache_settings;
|
||||
auto predefined_configuration = config.has("cache_name") ? NamedCollectionFactory::instance().tryGet(config.getString("cache_name")) : nullptr;
|
||||
auto predefined_configuration = config.has("cache_name")
|
||||
? NamedCollectionFactory::instance().tryGet(config.getString("cache_name"))
|
||||
: nullptr;
|
||||
|
||||
if (predefined_configuration)
|
||||
file_cache_settings.loadFromCollection(*predefined_configuration);
|
||||
else
|
||||
|
@ -58,9 +58,7 @@ public:
|
||||
|
||||
std::string getName() const override { return "HDFSObjectStorage"; }
|
||||
|
||||
std::string getBasePath() const override { return hdfs_root_path; }
|
||||
|
||||
std::string getTypeName() const override { return "hdfs"; }
|
||||
std::string getDataPrefix() const override { return hdfs_root_path; }
|
||||
|
||||
DataSourceDescription getDataSourceDescription() const override
|
||||
{
|
||||
|
@ -84,9 +84,7 @@ public:
|
||||
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
virtual std::string getBasePath() const = 0;
|
||||
|
||||
virtual std::string getTypeName() const = 0;
|
||||
virtual std::string getDataPrefix() const = 0;
|
||||
|
||||
/// Object exists or not
|
||||
virtual bool exists(const StoredObject & object) const = 0;
|
||||
|
@ -27,7 +27,7 @@ LocalObjectStorage::LocalObjectStorage(String key_prefix_)
|
||||
: key_prefix(std::move(key_prefix_))
|
||||
, log(&Poco::Logger::get("LocalObjectStorage"))
|
||||
{
|
||||
data_source_description.type = DataSourceType::Local;
|
||||
data_source_description.type = DataSourceType::LocalBlobStorage;
|
||||
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
|
||||
data_source_description.description = *block_device_id;
|
||||
else
|
||||
|
@ -22,9 +22,7 @@ public:
|
||||
|
||||
std::string getName() const override { return "LocalObjectStorage"; }
|
||||
|
||||
std::string getBasePath() const override { return key_prefix; }
|
||||
|
||||
std::string getTypeName() const override { return "local_blob_storage"; }
|
||||
std::string getDataPrefix() const override { return key_prefix; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
|
@ -41,7 +41,7 @@ MetadataStoragePtr MetadataStorageFactory::create(
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Expected `metadata_type` in config");
|
||||
}
|
||||
|
||||
const auto type = config.getString(config_prefix + ".metadata_type");
|
||||
const auto type = config.getString(config_prefix + ".metadata_type", compatibility_type_hint);
|
||||
const auto it = registry.find(type);
|
||||
|
||||
if (it == registry.end())
|
||||
@ -53,9 +53,17 @@ MetadataStoragePtr MetadataStorageFactory::create(
|
||||
return it->second(name, config, config_prefix, object_storage);
|
||||
}
|
||||
|
||||
static std::string getObjectStoragePrefix(
|
||||
const IObjectStorage & object_storage,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix)
|
||||
{
|
||||
return config.getString(config_prefix + ".key_compatibility_prefix", object_storage.getDataPrefix());
|
||||
}
|
||||
|
||||
void registerMetadataStorageFromDisk(MetadataStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerMetadataStorageType("local", [](
|
||||
const std::string & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -65,35 +73,42 @@ void registerMetadataStorageFromDisk(MetadataStorageFactory & factory)
|
||||
fs::path(Context::getGlobalContextInstance()->getPath()) / "disks" / name / "");
|
||||
fs::create_directories(metadata_path);
|
||||
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0, config, config_prefix);
|
||||
return std::make_shared<MetadataStorageFromDisk>(metadata_disk, object_storage->getBasePath());
|
||||
};
|
||||
factory.registerMetadataStorageType("local", creator);
|
||||
auto key_compatibility_prefix = getObjectStoragePrefix(*object_storage, config, config_prefix);
|
||||
return std::make_shared<MetadataStorageFromDisk>(metadata_disk, key_compatibility_prefix);
|
||||
});
|
||||
}
|
||||
|
||||
void registerMetadataStorageFromDiskPlain(MetadataStorageFactory & factory)
|
||||
void registerPlainMetadataStorage(MetadataStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerMetadataStorageType("plain", [](
|
||||
const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & /* config */,
|
||||
const std::string & /* config_prefix */,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
ObjectStoragePtr object_storage) -> MetadataStoragePtr
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromPlainObjectStorage>(object_storage, object_storage->getBasePath());
|
||||
};
|
||||
factory.registerMetadataStorageType("plain", creator);
|
||||
auto key_compatibility_prefix = getObjectStoragePrefix(*object_storage, config, config_prefix);
|
||||
return std::make_shared<MetadataStorageFromPlainObjectStorage>(object_storage, key_compatibility_prefix);
|
||||
});
|
||||
}
|
||||
|
||||
void registerMetadataStorageFromStaticFilesWebServer(MetadataStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerMetadataStorageType("web", [](
|
||||
const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & /* config */,
|
||||
const std::string & /* config_prefix */,
|
||||
ObjectStoragePtr object_storage) -> MetadataStoragePtr
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromStaticFilesWebServer>(assert_cast<const WebObjectStorage &>(*object_storage));
|
||||
};
|
||||
factory.registerMetadataStorageType("web", creator);
|
||||
});
|
||||
}
|
||||
|
||||
void registerMetadataStorages()
|
||||
{
|
||||
auto & factory = MetadataStorageFactory::instance();
|
||||
registerMetadataStorageFromDisk(factory);
|
||||
registerPlainMetadataStorage(factory);
|
||||
registerMetadataStorageFromStaticFilesWebServer(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/S3/diskSettings.h>
|
||||
#include <Disks/ObjectStorages/S3/checkBatchRemove.h>
|
||||
#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
|
||||
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
|
||||
@ -28,13 +28,12 @@ ObjectStorageFactory & ObjectStorageFactory::instance()
|
||||
return factory;
|
||||
}
|
||||
|
||||
void ObjectStorageFactory::registerObjectStorageType(const std::string & metadata_type, Creator creator)
|
||||
void ObjectStorageFactory::registerObjectStorageType(const std::string & type, Creator creator)
|
||||
{
|
||||
if (!registry.emplace(metadata_type, creator).second)
|
||||
if (!registry.emplace(type, creator).second)
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"ObjectStorageFactory: the metadata type '{}' is not unique",
|
||||
metadata_type);
|
||||
"ObjectStorageFactory: the metadata type '{}' is not unique", type);
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +45,7 @@ ObjectStoragePtr ObjectStorageFactory::create(
|
||||
bool skip_access_check) const
|
||||
{
|
||||
std::string type;
|
||||
if (config.has(config_prefix + ".obejct_storage_type"))
|
||||
if (config.has(config_prefix + ".object_storage_type"))
|
||||
{
|
||||
type = config.getString(config_prefix + ".object_storage_type");
|
||||
}
|
||||
@ -76,15 +75,20 @@ static S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config,
|
||||
{
|
||||
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
|
||||
S3::URI uri(endpoint);
|
||||
if (!uri.key.ends_with('/'))
|
||||
|
||||
/// An empty key remains empty.
|
||||
if (!uri.key.empty() && !uri.key.ends_with('/'))
|
||||
uri.key.push_back('/');
|
||||
|
||||
return uri;
|
||||
}
|
||||
|
||||
#if USE_AWS_S3
|
||||
void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
static constexpr auto disk_type = "s3";
|
||||
|
||||
factory.registerObjectStorageType(disk_type, [](
|
||||
const std::string & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -95,10 +99,10 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto settings = getSettings(config, config_prefix, context);
|
||||
auto client = getClient(config, config_prefix, context, *settings);
|
||||
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
|
||||
|
||||
auto object_storage = std::make_shared<S3ObjectStorage>(
|
||||
std::move(client), std::move(settings), uri.version_id,
|
||||
s3_capabilities, uri.bucket, uri.endpoint, uri.key, name);
|
||||
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
|
||||
|
||||
/// NOTE: should we still perform this check for clickhouse-disks?
|
||||
if (!skip_access_check)
|
||||
@ -117,13 +121,14 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
}
|
||||
}
|
||||
return object_storage;
|
||||
};
|
||||
factory.registerObjectStorageType("s3", creator);
|
||||
});
|
||||
}
|
||||
|
||||
void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
static constexpr auto disk_type = "s3_plain";
|
||||
|
||||
factory.registerObjectStorageType(disk_type, [](
|
||||
const std::string & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -142,19 +147,18 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto settings = getSettings(config, config_prefix, context);
|
||||
auto client = getClient(config, config_prefix, context, *settings);
|
||||
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
|
||||
|
||||
return std::make_shared<S3PlainObjectStorage>(
|
||||
std::move(client), std::move(settings), uri.version_id,
|
||||
s3_capabilities, uri.bucket, uri.endpoint, uri.key, name);
|
||||
};
|
||||
factory.registerObjectStorageType("s3_plain", creator);
|
||||
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
|
||||
});
|
||||
}
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
void registerHDFSObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerObjectStorageType("hdfs", [](
|
||||
const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -173,15 +177,14 @@ void registerHDFSObjectStorage(ObjectStorageFactory & factory)
|
||||
);
|
||||
|
||||
return std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
|
||||
};
|
||||
factory.registerObjectStorageType("hdfs", creator);
|
||||
});
|
||||
}
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
void registerAzureObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerObjectStorageType("azure_blob_storage", [](
|
||||
const std::string & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -193,14 +196,13 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
|
||||
getAzureBlobContainerClient(config, config_prefix),
|
||||
getAzureBlobStorageSettings(config, config_prefix, context));
|
||||
|
||||
};
|
||||
factory.registerObjectStorageType("azure_blob_storage", creator);
|
||||
});
|
||||
}
|
||||
#endif
|
||||
|
||||
void registerWebObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerObjectStorageType("web", [](
|
||||
const std::string & /* name */,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -222,13 +224,12 @@ void registerWebObjectStorage(ObjectStorageFactory & factory)
|
||||
}
|
||||
|
||||
return std::make_shared<WebObjectStorage>(uri, context);
|
||||
};
|
||||
factory.registerObjectStorageType("web", creator);
|
||||
});
|
||||
}
|
||||
|
||||
void registerLocalObjectStorage(ObjectStorageFactory & factory)
|
||||
{
|
||||
auto creator = [](
|
||||
factory.registerObjectStorageType("local_blob_storage", [](
|
||||
const std::string & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
@ -241,8 +242,28 @@ void registerLocalObjectStorage(ObjectStorageFactory & factory)
|
||||
/// keys are mapped to the fs, object_key_prefix is a directory also
|
||||
fs::create_directories(object_key_prefix);
|
||||
return std::make_shared<LocalObjectStorage>(object_key_prefix);
|
||||
};
|
||||
factory.registerObjectStorageType("local", creator);
|
||||
});
|
||||
}
|
||||
|
||||
void registerObjectStorages()
|
||||
{
|
||||
auto & factory = ObjectStorageFactory::instance();
|
||||
|
||||
#if USE_AWS_S3
|
||||
registerS3ObjectStorage(factory);
|
||||
registerS3PlainObjectStorage(factory);
|
||||
#endif
|
||||
|
||||
#if USE_HDFS
|
||||
registerHDFSObjectStorage(factory);
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
registerAzureObjectStorage(factory);
|
||||
#endif
|
||||
|
||||
registerLocalObjectStorage(factory);
|
||||
registerWebObjectStorage(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
|
||||
static ObjectStorageFactory & instance();
|
||||
|
||||
void registerObjectStorageType(const std::string & metadata_type, Creator creator);
|
||||
void registerObjectStorageType(const std::string & type, Creator creator);
|
||||
|
||||
ObjectStoragePtr create(
|
||||
const std::string & name,
|
||||
|
@ -11,23 +11,35 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
|
||||
}
|
||||
|
||||
static std::string getCompatibilityMetadataTypeHint(const std::string & object_storage_type)
|
||||
void registerObjectStorages();
|
||||
void registerMetadataStorages();
|
||||
|
||||
static std::string getCompatibilityMetadataTypeHint(const DataSourceDescription & description)
|
||||
{
|
||||
/// TODO: change type name to enum
|
||||
if (object_storage_type == "s3"
|
||||
|| object_storage_type == "hdfs"
|
||||
|| object_storage_type == "azure_blob_storage"
|
||||
|| object_storage_type == "local_blob_storage")
|
||||
return "local";
|
||||
else if (object_storage_type == "s3_plain")
|
||||
return "plain";
|
||||
else if (object_storage_type == "web")
|
||||
return "web";
|
||||
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown object storage type: {}", object_storage_type);
|
||||
switch (description.type)
|
||||
{
|
||||
case DataSourceType::S3:
|
||||
case DataSourceType::HDFS:
|
||||
case DataSourceType::LocalBlobStorage:
|
||||
case DataSourceType::AzureBlobStorage:
|
||||
return "local";
|
||||
case DataSourceType::S3_Plain:
|
||||
return "plain";
|
||||
case DataSourceType::WebServer:
|
||||
return "web";
|
||||
default:
|
||||
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
|
||||
"Cannot get compatibility metadata hint: "
|
||||
"no such object storage type: {}", toString(description.type));
|
||||
}
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_check)
|
||||
{
|
||||
registerObjectStorages();
|
||||
registerMetadataStorages();
|
||||
|
||||
auto creator = [global_skip_access_check](
|
||||
const String & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
@ -37,13 +49,17 @@ void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_ch
|
||||
{
|
||||
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
|
||||
auto object_storage = ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
|
||||
auto compatibility_metadata_type_hint = config.has("metadata_type") ? "" : getCompatibilityMetadataTypeHint(object_storage->getTypeName());
|
||||
auto metadata_storage = MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, compatibility_metadata_type_hint);
|
||||
auto compatibility_metadata_type_hint = config.has("metadata_type")
|
||||
? ""
|
||||
: getCompatibilityMetadataTypeHint(object_storage->getDataSourceDescription());
|
||||
|
||||
auto metadata_storage = MetadataStorageFactory::instance().create(
|
||||
name, config, config_prefix, object_storage, compatibility_metadata_type_hint);
|
||||
|
||||
DiskObjectStoragePtr disk = std::make_shared<DiskObjectStorage>(
|
||||
name,
|
||||
object_storage->getBasePath(),
|
||||
"Disk" + object_storage->getTypeName(),
|
||||
object_storage->getDataPrefix(),
|
||||
fmt::format("Disk_{}({})", toString(object_storage->getDataSourceDescription().type), name),
|
||||
std::move(metadata_storage),
|
||||
std::move(object_storage),
|
||||
config,
|
||||
@ -52,6 +68,7 @@ void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_ch
|
||||
disk->startup(context, skip_access_check);
|
||||
return disk;
|
||||
};
|
||||
|
||||
factory.registerDiskType("object_storage", creator);
|
||||
#if USE_AWS_S3
|
||||
factory.registerDiskType("s3", creator); /// For compatibility
|
||||
|
117
src/Disks/ObjectStorages/S3/DiskS3Utils.cpp
Normal file
117
src/Disks/ObjectStorages/S3/DiskS3Utils.cpp
Normal file
@ -0,0 +1,117 @@
|
||||
#include "DiskS3Utils.h"
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ObjectStorageKeysGeneratorPtr getKeyGenerator(
|
||||
String type,
|
||||
const S3::URI & uri,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix)
|
||||
{
|
||||
if (type == "s3_plain")
|
||||
return createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
|
||||
|
||||
chassert(type == "s3");
|
||||
|
||||
bool storage_metadata_write_full_object_key = DiskObjectStorageMetadata::getWriteFullObjectKeySetting();
|
||||
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
|
||||
|
||||
if (send_metadata && storage_metadata_write_full_object_key)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Wrong configuration in {}. "
|
||||
"s3 does not supports feature 'send_metadata' with feature 'storage_metadata_write_full_object_key'.",
|
||||
config_prefix);
|
||||
|
||||
String object_key_compatibility_prefix = config.getString(config_prefix + ".key_compatibility_prefix", String());
|
||||
String object_key_template = config.getString(config_prefix + ".key_template", String());
|
||||
|
||||
if (object_key_template.empty())
|
||||
{
|
||||
if (!object_key_compatibility_prefix.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Wrong configuration in {}. "
|
||||
"Setting 'key_compatibility_prefix' can be defined only with setting 'key_template'.",
|
||||
config_prefix);
|
||||
|
||||
return createObjectStorageKeysGeneratorByPrefix(uri.key);
|
||||
}
|
||||
|
||||
if (send_metadata)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Wrong configuration in {}. "
|
||||
"s3 does not supports send_metadata with setting 'key_template'.",
|
||||
config_prefix);
|
||||
|
||||
if (!storage_metadata_write_full_object_key)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Wrong configuration in {}. "
|
||||
"Feature 'storage_metadata_write_full_object_key' has to be enabled in order to use setting 'key_template'.",
|
||||
config_prefix);
|
||||
|
||||
if (!uri.key.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Wrong configuration in {}. "
|
||||
"URI.key is forbidden with settings 'key_template', use setting 'key_compatibility_prefix' instead'. "
|
||||
"URI.key: '{}', bucket: '{}'. ",
|
||||
config_prefix,
|
||||
uri.key, uri.bucket);
|
||||
|
||||
return createObjectStorageKeysGeneratorByTemplate(object_key_template);
|
||||
}
|
||||
|
||||
static String getServerUUID()
|
||||
{
|
||||
UUID server_uuid = ServerUUID::get();
|
||||
if (server_uuid == UUIDHelpers::Nil)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
|
||||
return toString(server_uuid);
|
||||
}
|
||||
|
||||
bool checkBatchRemove(S3ObjectStorage & storage, const String & key_with_trailing_slash)
|
||||
{
|
||||
/// NOTE: key_with_trailing_slash is the disk prefix, it is required
|
||||
/// because access is done via S3ObjectStorage not via IDisk interface
|
||||
/// (since we don't have disk yet).
|
||||
const String path = fmt::format("{}clickhouse_remove_objects_capability_{}", key_with_trailing_slash, getServerUUID());
|
||||
StoredObject object(path);
|
||||
try
|
||||
{
|
||||
auto file = storage.writeObject(object, WriteMode::Rewrite);
|
||||
file->write("test", 4);
|
||||
file->finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
try
|
||||
{
|
||||
storage.removeObject(object);
|
||||
}
|
||||
catch (...) // NOLINT(bugprone-empty-catch)
|
||||
{
|
||||
}
|
||||
/// We don't have write access, therefore no information about batch remove.
|
||||
return true;
|
||||
}
|
||||
try
|
||||
{
|
||||
/// Uses `DeleteObjects` request (batch delete).
|
||||
storage.removeObjects({object});
|
||||
return true;
|
||||
}
|
||||
catch (const Exception &)
|
||||
{
|
||||
try
|
||||
{
|
||||
storage.removeObject(object);
|
||||
}
|
||||
catch (...) // NOLINT(bugprone-empty-catch)
|
||||
{
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
20
src/Disks/ObjectStorages/S3/DiskS3Utils.h
Normal file
20
src/Disks/ObjectStorages/S3/DiskS3Utils.h
Normal file
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
#include <Common/ObjectStorageKeyGenerator.h>
|
||||
#include <IO/S3/URI.h>
|
||||
|
||||
namespace Poco::Util { class AbstractConfiguration; }
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ObjectStorageKeysGeneratorPtr getKeyGenerator(
|
||||
String type,
|
||||
const S3::URI & uri,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix);
|
||||
|
||||
class S3ObjectStorage;
|
||||
bool checkBatchRemove(S3ObjectStorage & storage, const std::string & key_with_trailing_slash);
|
||||
|
||||
}
|
@ -99,7 +99,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync
|
||||
{
|
||||
public:
|
||||
S3IteratorAsync(
|
||||
const std::string & bucket,
|
||||
const std::string & bucket_,
|
||||
const std::string & path_prefix,
|
||||
std::shared_ptr<const S3::Client> client_,
|
||||
size_t max_list_size)
|
||||
@ -110,7 +110,7 @@ public:
|
||||
"ListObjectS3")
|
||||
, client(client_)
|
||||
{
|
||||
request.SetBucket(bucket);
|
||||
request.SetBucket(bucket_);
|
||||
request.SetPrefix(path_prefix);
|
||||
request.SetMaxKeys(static_cast<int>(max_list_size));
|
||||
}
|
||||
@ -155,7 +155,7 @@ private:
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
@ -175,9 +175,9 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
{
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client.get(),
|
||||
bucket,
|
||||
uri.bucket,
|
||||
path,
|
||||
version_id,
|
||||
uri.version_id,
|
||||
settings_ptr->request_settings,
|
||||
disk_read_settings,
|
||||
/* use_external_buffer */true,
|
||||
@ -225,9 +225,9 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return std::make_unique<ReadBufferFromS3>(
|
||||
client.get(),
|
||||
bucket,
|
||||
uri.bucket,
|
||||
object.remote_path,
|
||||
version_id,
|
||||
uri.version_id,
|
||||
settings_ptr->request_settings,
|
||||
patchSettings(read_settings));
|
||||
}
|
||||
@ -256,7 +256,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
|
||||
return std::make_unique<WriteBufferFromS3>(
|
||||
client.get(),
|
||||
bucket,
|
||||
uri.bucket,
|
||||
object.remote_path,
|
||||
buf_size,
|
||||
settings_ptr->request_settings,
|
||||
@ -270,7 +270,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
|
||||
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
|
||||
return std::make_shared<S3IteratorAsync>(uri.bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
|
||||
@ -278,7 +278,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
|
||||
auto settings_ptr = s3_settings.get();
|
||||
|
||||
S3::ListObjectsV2Request request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetBucket(uri.bucket);
|
||||
request.SetPrefix(path);
|
||||
if (max_keys)
|
||||
request.SetMaxKeys(max_keys);
|
||||
@ -324,12 +324,12 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetBucket(uri.bucket);
|
||||
request.SetKey(object.remote_path);
|
||||
auto outcome = client.get()->DeleteObject(request);
|
||||
if (auto blob_storage_log = BlobStorageLogWriter::create(disk_name))
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
bucket, object.remote_path, object.local_path, object.bytes_size,
|
||||
uri.bucket, object.remote_path, object.local_path, object.bytes_size,
|
||||
outcome.IsSuccess() ? nullptr : &outcome.GetError());
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
@ -376,7 +376,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetBucket(uri.bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = client.get()->DeleteObjects(request);
|
||||
|
||||
@ -386,7 +386,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
auto time_now = std::chrono::system_clock::now();
|
||||
for (const auto & object : objects)
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
bucket, object.remote_path, object.local_path, object.bytes_size,
|
||||
uri.bucket, object.remote_path, object.local_path, object.bytes_size,
|
||||
outcome_error, time_now);
|
||||
}
|
||||
|
||||
@ -419,7 +419,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
|
||||
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
|
||||
return {};
|
||||
@ -435,7 +435,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
|
||||
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
|
||||
ObjectMetadata result;
|
||||
result.size_bytes = object_info.size;
|
||||
@ -456,18 +456,18 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
/// Shortcut for S3
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
auto client_ = dest_s3->client.get();
|
||||
auto current_client = dest_s3->client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
try {
|
||||
copyS3File(
|
||||
client_,
|
||||
bucket,
|
||||
current_client,
|
||||
uri.bucket,
|
||||
object_from.remote_path,
|
||||
0,
|
||||
size,
|
||||
dest_s3->bucket,
|
||||
dest_s3->uri.bucket,
|
||||
object_to.remote_path,
|
||||
settings_ptr->request_settings,
|
||||
patchSettings(read_settings),
|
||||
@ -498,16 +498,16 @@ void S3ObjectStorage::copyObject( // NOLINT
|
||||
const WriteSettings &,
|
||||
std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto client_ = client.get();
|
||||
auto current_client = client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(client_,
|
||||
bucket,
|
||||
copyS3File(current_client,
|
||||
uri.bucket,
|
||||
object_from.remote_path,
|
||||
0,
|
||||
size,
|
||||
bucket,
|
||||
uri.bucket,
|
||||
object_to.remote_path,
|
||||
settings_ptr->request_settings,
|
||||
patchSettings(read_settings),
|
||||
@ -551,10 +551,12 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context);
|
||||
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
|
||||
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
|
||||
|
||||
auto new_uri{uri};
|
||||
new_uri.bucket = new_namespace;
|
||||
|
||||
return std::make_unique<S3ObjectStorage>(
|
||||
std::move(new_client), std::move(new_s3_settings),
|
||||
version_id, s3_capabilities, new_namespace,
|
||||
endpoint, key_generator, disk_name);
|
||||
std::move(new_client), std::move(new_s3_settings), new_uri, s3_capabilities, key_generator, disk_name);
|
||||
}
|
||||
|
||||
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string & path) const
|
||||
|
@ -49,22 +49,19 @@ private:
|
||||
const char * logger_name,
|
||||
std::unique_ptr<S3::Client> && client_,
|
||||
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
|
||||
String version_id_,
|
||||
S3::URI uri_,
|
||||
const S3Capabilities & s3_capabilities_,
|
||||
String bucket_,
|
||||
String connection_string,
|
||||
ObjectStorageKeysGeneratorPtr key_generator_,
|
||||
const String & disk_name_)
|
||||
: bucket(std::move(bucket_))
|
||||
: uri(uri_)
|
||||
, key_generator(std::move(key_generator_))
|
||||
, disk_name(disk_name_)
|
||||
, client(std::move(client_))
|
||||
, s3_settings(std::move(s3_settings_))
|
||||
, s3_capabilities(s3_capabilities_)
|
||||
, version_id(std::move(version_id_))
|
||||
{
|
||||
data_source_description.type = DataSourceType::S3;
|
||||
data_source_description.description = connection_string;
|
||||
data_source_description.description = uri_.endpoint;
|
||||
data_source_description.is_cached = false;
|
||||
data_source_description.is_encrypted = false;
|
||||
|
||||
@ -85,9 +82,7 @@ public:
|
||||
|
||||
std::string getName() const override { return "S3ObjectStorage"; }
|
||||
|
||||
std::string getBasePath() const override { return object_key_prefix; }
|
||||
|
||||
std::string getTypeName() const override { return "s3"; }
|
||||
std::string getDataPrefix() const override { return uri.key; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
@ -157,7 +152,7 @@ public:
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context) override;
|
||||
|
||||
std::string getObjectsNamespace() const override { return bucket; }
|
||||
std::string getObjectsNamespace() const override { return uri.bucket; }
|
||||
|
||||
bool isRemote() const override { return true; }
|
||||
|
||||
@ -181,8 +176,8 @@ private:
|
||||
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
||||
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
||||
|
||||
private:
|
||||
std::string bucket;
|
||||
const S3::URI uri;
|
||||
|
||||
ObjectStorageKeysGeneratorPtr key_generator;
|
||||
std::string disk_name;
|
||||
|
||||
@ -190,8 +185,6 @@ private:
|
||||
MultiVersion<S3ObjectStorageSettings> s3_settings;
|
||||
S3Capabilities s3_capabilities;
|
||||
|
||||
const String version_id;
|
||||
|
||||
Poco::Logger * log;
|
||||
DataSourceDescription data_source_description;
|
||||
};
|
||||
@ -205,8 +198,6 @@ class S3PlainObjectStorage : public S3ObjectStorage
|
||||
public:
|
||||
std::string getName() const override { return "S3PlainObjectStorage"; }
|
||||
|
||||
std::string getTypeName() const override { return "s3_plain"; }
|
||||
|
||||
template <class ...Args>
|
||||
explicit S3PlainObjectStorage(Args && ...args)
|
||||
: S3ObjectStorage("S3PlainObjectStorage", std::forward<Args>(args)...)
|
||||
|
0
src/Disks/ObjectStorages/S3/S3Utils.h
Normal file
0
src/Disks/ObjectStorages/S3/S3Utils.h
Normal file
@ -1,64 +0,0 @@
|
||||
#include <Disks/ObjectStorages/S3/checkBatchRemove.h>
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/StoredObject.h>
|
||||
#include <Core/Types.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static String getServerUUID()
|
||||
{
|
||||
UUID server_uuid = ServerUUID::get();
|
||||
if (server_uuid == UUIDHelpers::Nil)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
|
||||
return toString(server_uuid);
|
||||
}
|
||||
|
||||
bool checkBatchRemove(S3ObjectStorage & storage, const String & key_with_trailing_slash)
|
||||
{
|
||||
/// NOTE: key_with_trailing_slash is the disk prefix, it is required
|
||||
/// because access is done via S3ObjectStorage not via IDisk interface
|
||||
/// (since we don't have disk yet).
|
||||
const String path = fmt::format("{}clickhouse_remove_objects_capability_{}", key_with_trailing_slash, getServerUUID());
|
||||
StoredObject object(path);
|
||||
try
|
||||
{
|
||||
auto file = storage.writeObject(object, WriteMode::Rewrite);
|
||||
file->write("test", 4);
|
||||
file->finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
try
|
||||
{
|
||||
storage.removeObject(object);
|
||||
}
|
||||
catch (...) // NOLINT(bugprone-empty-catch)
|
||||
{
|
||||
}
|
||||
/// We don't have write access, therefore no information about batch remove.
|
||||
return true;
|
||||
}
|
||||
try
|
||||
{
|
||||
/// Uses `DeleteObjects` request (batch delete).
|
||||
storage.removeObjects({object});
|
||||
return true;
|
||||
}
|
||||
catch (const Exception &)
|
||||
{
|
||||
try
|
||||
{
|
||||
storage.removeObject(object);
|
||||
}
|
||||
catch (...) // NOLINT(bugprone-empty-catch)
|
||||
{
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
#pragma once
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class S3ObjectStorage;
|
||||
|
||||
bool checkBatchRemove(S3ObjectStorage & storage, const std::string & key_with_trailing_slash);
|
||||
|
||||
}
|
@ -33,9 +33,7 @@ public:
|
||||
|
||||
std::string getName() const override { return "WebObjectStorage"; }
|
||||
|
||||
std::string getBasePath() const override { return ""; }
|
||||
|
||||
std::string getTypeName() const override { return "web"; }
|
||||
std::string getDataPrefix() const override { return ""; }
|
||||
|
||||
bool exists(const StoredObject & object) const override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user