Merge pull request #58357 from ClickHouse/better-disks-configuration

Flexible configuration for disks
This commit is contained in:
Kseniia Sumarokova 2024-01-15 21:34:36 +01:00 committed by GitHub
commit ae884760db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 868 additions and 641 deletions

View File

@ -78,6 +78,7 @@ remove_keeper_config "create_if_not_exists" "[01]"
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
@ -117,6 +118,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml

View File

@ -127,15 +127,17 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/DiskObjectStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/DiskObjectStorageCommon.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/ObjectStorageIterator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/StoredObject.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/registerDiskS3.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3Capabilities.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/diskSettings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/DiskS3Utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/ObjectStorageFactory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/MetadataStorageFactory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/RegisterDiskObjectStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp

View File

@ -14,7 +14,7 @@ DiskFactory & DiskFactory::instance()
return factory;
}
void DiskFactory::registerDiskType(const String & disk_type, DB::DiskFactory::Creator creator)
void DiskFactory::registerDiskType(const String & disk_type, Creator creator)
{
if (!registry.emplace(disk_type, creator).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DiskFactory: the disk type '{}' is not unique", disk_type);
@ -31,7 +31,10 @@ DiskPtr DiskFactory::create(
const auto found = registry.find(disk_type);
if (found == registry.end())
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "DiskFactory: the disk '{}' has unknown disk type: {}", name, disk_type);
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"DiskFactory: the disk '{}' has unknown disk type: {}", name, disk_type);
}
const auto & disk_creator = found->second;
return disk_creator(name, config, config_prefix, context, map);

View File

@ -65,8 +65,14 @@ AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::Abstr
}
else
{
if (config.has(config_prefix + ".connection_string"))
storage_url = config.getString(config_prefix + ".connection_string");
else if (config.has(config_prefix + ".endpoint"))
storage_url = config.getString(config_prefix + ".endpoint");
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected either `connection_string` or `endpoint` in config");
}
String container_name = config.getString(config_prefix + ".container_name", "default-container");
validateContainerName(container_name);
std::optional<bool> container_already_exists {};
@ -100,11 +106,14 @@ template <class T>
std::unique_ptr<T> getAzureBlobStorageClientWithAuth(
const String & url, const String & container_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
std::string connection_str;
if (config.has(config_prefix + ".connection_string"))
{
String connection_str = config.getString(config_prefix + ".connection_string");
connection_str = config.getString(config_prefix + ".connection_string");
else if (config.has(config_prefix + ".endpoint"))
connection_str = config.getString(config_prefix + ".endpoint");
if (!connection_str.empty())
return getClientWithConnectionString<T>(connection_str, container_name);
}
if (config.has(config_prefix + ".account_key") && config.has(config_prefix + ".account_name"))
{

View File

@ -3,7 +3,6 @@
#if USE_AZURE_BLOB_STORAGE
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/MultiVersion.h>
@ -64,6 +63,8 @@ public:
std::string getName() const override { return "AzureObjectStorage"; }
std::string getCommonKeyPrefix() const override { return ""; } /// No namespaces in azure.
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT

View File

@ -1,67 +0,0 @@
#include "config.h"
#include <Disks/DiskFactory.h>
#if USE_AZURE_BLOB_STORAGE
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Interpreters/Context.h>
namespace DB
{
void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/)
{
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
ObjectStoragePtr azure_object_storage = std::make_unique<AzureObjectStorage>(
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context));
String key_prefix;
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, key_prefix);
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
name,
/* no namespaces */ key_prefix,
"DiskAzureBlobStorage",
std::move(metadata_storage),
std::move(azure_object_storage),
config,
config_prefix
);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
azure_blob_storage_disk->startup(context, skip_access_check);
return azure_blob_storage_disk;
};
factory.registerDiskType("azure_blob_storage", creator);
}
}
#else
namespace DB
{
void registerDiskAzureBlobStorage(DiskFactory &, bool /* global_skip_access_check */) {}
}
#endif

View File

@ -1,6 +1,5 @@
#include "CachedObjectStorage.h"
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <IO/BoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>

View File

@ -24,6 +24,8 @@ public:
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
std::string getCommonKeyPrefix() const override { return object_storage->getCommonKeyPrefix(); }
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT

View File

@ -39,7 +39,10 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
}
FileCacheSettings file_cache_settings;
auto predefined_configuration = config.has("cache_name") ? NamedCollectionFactory::instance().tryGet(config.getString("cache_name")) : nullptr;
auto predefined_configuration = config.has("cache_name")
? NamedCollectionFactory::instance().tryGet(config.getString("cache_name"))
: nullptr;
if (predefined_configuration)
file_cache_settings.loadFromCollection(*predefined_configuration);
else

View File

@ -1,40 +0,0 @@
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Common/getRandomASCIIString.h>
#include <Disks/DiskLocal.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
static String getDiskMetadataPath(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context)
{
return config.getString(config_prefix + ".metadata_path", fs::path(context->getPath()) / "disks" / name / "");
}
std::pair<String, DiskPtr> prepareForLocalMetadata(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context)
{
/// where the metadata files are stored locally
auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context);
fs::create_directories(metadata_path);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0, config, config_prefix);
return std::make_pair(metadata_path, metadata_disk);
}
bool isFileWithPersistentCache(const String & path)
{
auto path_extension = std::filesystem::path(path).extension();
return path_extension == ".idx" // index files.
|| path_extension == ".mrk" || path_extension == ".mrk2" || path_extension == ".mrk3" /// mark files.
|| path_extension == ".txt" || path_extension == ".dat";
}
}

View File

@ -1,23 +0,0 @@
#pragma once
#include <random>
#include <utility>
#include <Core/Types.h>
#include <Common/thread_local_rng.h>
#include <Disks/IDisk.h>
namespace DB
{
std::pair<String, DiskPtr> prepareForLocalMetadata(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context);
bool isFileWithPersistentCache(const String & path);
}

View File

@ -48,6 +48,7 @@ public:
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_))
, hdfs_root_path(hdfs_root_path_)
{
data_source_description.type = DataSourceType::HDFS;
data_source_description.description = hdfs_root_path_;
@ -57,6 +58,8 @@ public:
std::string getName() const override { return "HDFSObjectStorage"; }
std::string getCommonKeyPrefix() const override { return hdfs_root_path; }
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
@ -123,8 +126,8 @@ private:
HDFSBuilderWrapper hdfs_builder;
HDFSFSPtr hdfs_fs;
SettingsPtr settings;
const std::string hdfs_root_path;
DataSourceDescription data_source_description;
};

View File

@ -1,66 +0,0 @@
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/DiskFactory.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
String uri{endpoint};
checkHDFSURL(uri);
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context->getSettingsRef().hdfs_replication
);
/// FIXME Cache currently unsupported :(
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskObjectStorage>(
name,
uri,
"DiskHDFS",
std::move(metadata_storage),
std::move(hdfs_storage),
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("hdfs", creator);
}
}

View File

@ -84,6 +84,8 @@ public:
virtual std::string getName() const = 0;
virtual std::string getCommonKeyPrefix() const = 0;
/// Object exists or not
virtual bool exists(const StoredObject & object) const = 0;

View File

@ -1,6 +1,5 @@
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Interpreters/Context.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
@ -28,7 +27,7 @@ LocalObjectStorage::LocalObjectStorage(String key_prefix_)
: key_prefix(std::move(key_prefix_))
, log(&Poco::Logger::get("LocalObjectStorage"))
{
data_source_description.type = DataSourceType::Local;
data_source_description.type = DataSourceType::LocalBlobStorage;
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
data_source_description.description = *block_device_id;
else

View File

@ -16,12 +16,14 @@ namespace DB
class LocalObjectStorage : public IObjectStorage
{
public:
LocalObjectStorage(String key_prefix_);
explicit LocalObjectStorage(String key_prefix_);
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "LocalObjectStorage"; }
std::string getCommonKeyPrefix() const override { return key_prefix; }
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT

View File

@ -1,46 +0,0 @@
#include <Disks/DiskFactory.h>
#include <Disks/loadLocalDiskConfig.h>
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
void registerDiskLocalObjectStorage(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String object_key_prefix;
UInt64 keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, object_key_prefix, keep_free_space_bytes);
/// keys are mapped to the fs, object_key_prefix is a directory also
fs::create_directories(object_key_prefix);
String type = config.getString(config_prefix + ".type");
chassert(type == "local_blob_storage");
std::shared_ptr<LocalObjectStorage> local_storage = std::make_shared<LocalObjectStorage>(object_key_prefix);
MetadataStoragePtr metadata_storage;
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, object_key_prefix);
auto disk = std::make_shared<DiskObjectStorage>(
name, object_key_prefix, "Local", metadata_storage, local_storage, config, config_prefix);
disk->startup(context, global_skip_access_check);
return disk;
};
factory.registerDiskType("local_blob_storage", creator);
}
}

View File

@ -0,0 +1,121 @@
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
#include <Disks/ObjectStorages/Web/MetadataStorageFromStaticFilesWebServer.h>
#endif
#include <Disks/DiskLocal.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
MetadataStorageFactory & MetadataStorageFactory::instance()
{
static MetadataStorageFactory factory;
return factory;
}
void MetadataStorageFactory::registerMetadataStorageType(const std::string & metadata_type, Creator creator)
{
if (!registry.emplace(metadata_type, creator).second)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"MetadataStorageFactory: the metadata type '{}' is not unique",
metadata_type);
}
}
MetadataStoragePtr MetadataStorageFactory::create(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ObjectStoragePtr object_storage,
const std::string & compatibility_type_hint) const
{
if (compatibility_type_hint.empty() && !config.has(config_prefix + ".metadata_type"))
{
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Expected `metadata_type` in config");
}
const auto type = config.getString(config_prefix + ".metadata_type", compatibility_type_hint);
const auto it = registry.find(type);
if (it == registry.end())
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"MetadataStorageFactory: unknown metadata storage type: {}", type);
}
return it->second(name, config, config_prefix, object_storage);
}
static std::string getObjectKeyCompatiblePrefix(
const IObjectStorage & object_storage,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
{
return config.getString(config_prefix + ".key_compatibility_prefix", object_storage.getCommonKeyPrefix());
}
void registerMetadataStorageFromDisk(MetadataStorageFactory & factory)
{
factory.registerMetadataStorageType("local", [](
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
auto metadata_path = config.getString(config_prefix + ".metadata_path",
fs::path(Context::getGlobalContextInstance()->getPath()) / "disks" / name / "");
fs::create_directories(metadata_path);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0, config, config_prefix);
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromDisk>(metadata_disk, key_compatibility_prefix);
});
}
void registerPlainMetadataStorage(MetadataStorageFactory & factory)
{
factory.registerMetadataStorageType("plain", [](
const std::string & /* name */,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromPlainObjectStorage>(object_storage, key_compatibility_prefix);
});
}
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
void registerMetadataStorageFromStaticFilesWebServer(MetadataStorageFactory & factory)
{
factory.registerMetadataStorageType("web", [](
const std::string & /* name */,
const Poco::Util::AbstractConfiguration & /* config */,
const std::string & /* config_prefix */,
ObjectStoragePtr object_storage) -> MetadataStoragePtr
{
return std::make_shared<MetadataStorageFromStaticFilesWebServer>(assert_cast<const WebObjectStorage &>(*object_storage));
});
}
#endif
void registerMetadataStorages()
{
auto & factory = MetadataStorageFactory::instance();
registerMetadataStorageFromDisk(factory);
registerPlainMetadataStorage(factory);
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
registerMetadataStorageFromStaticFilesWebServer(factory);
#endif
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Disks/ObjectStorages/IMetadataStorage.h>
namespace DB
{
class MetadataStorageFactory final : private boost::noncopyable
{
public:
using Creator = std::function<MetadataStoragePtr(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ObjectStoragePtr object_storage)>;
static MetadataStorageFactory & instance();
void registerMetadataStorageType(const std::string & metadata_type, Creator creator);
MetadataStoragePtr create(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ObjectStoragePtr object_storage,
const std::string & compatibility_type_hint) const;
private:
using Registry = std::unordered_map<String, Creator>;
Registry registry;
};
}

View File

@ -0,0 +1,283 @@
#include "config.h"
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
#if USE_AWS_S3
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
#endif
#if USE_HDFS && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#include <Storages/HDFS/HDFSCommon.h>
#endif
#if USE_AZURE_BLOB_STORAGE && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#endif
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
#include <Disks/ObjectStorages/Web/WebObjectStorage.h>
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
#include <Disks/loadLocalDiskConfig.h>
#endif
#include <Interpreters/Context.h>
#include <Common/Macros.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
ObjectStorageFactory & ObjectStorageFactory::instance()
{
static ObjectStorageFactory factory;
return factory;
}
void ObjectStorageFactory::registerObjectStorageType(const std::string & type, Creator creator)
{
if (!registry.emplace(type, creator).second)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"ObjectStorageFactory: the metadata type '{}' is not unique", type);
}
}
ObjectStoragePtr ObjectStorageFactory::create(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool skip_access_check) const
{
std::string type;
if (config.has(config_prefix + ".object_storage_type"))
{
type = config.getString(config_prefix + ".object_storage_type");
}
else if (config.has(config_prefix + ".type")) /// .type -- for compatibility.
{
type = config.getString(config_prefix + ".type");
}
else
{
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Expected `object_storage_type` in config");
}
const auto it = registry.find(type);
if (it == registry.end())
{
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"ObjectStorageFactory: unknown object storage type: {}", type);
}
return it->second(name, config, config_prefix, context, skip_access_check);
}
#if USE_AWS_S3
static S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
/// An empty key remains empty.
if (!uri.key.empty() && !uri.key.ends_with('/'))
uri.key.push_back('/');
return uri;
}
void registerS3ObjectStorage(ObjectStorageFactory & factory)
{
static constexpr auto disk_type = "s3";
factory.registerObjectStorageType(disk_type, [](
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool skip_access_check) -> ObjectStoragePtr
{
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
auto object_storage = std::make_shared<S3ObjectStorage>(
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
/// NOTE: should we still perform this check for clickhouse-disks?
if (!skip_access_check)
{
/// If `support_batch_delete` is turned on (default), check and possibly switch it off.
if (s3_capabilities.support_batch_delete && !checkBatchRemove(*object_storage, uri.key))
{
LOG_WARNING(
&Poco::Logger::get("S3ObjectStorage"),
"Storage for disk {} does not support batch delete operations, "
"so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. "
"To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.",
name
);
object_storage->setCapabilitiesSupportBatchDelete(false);
}
}
return object_storage;
});
}
void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
{
static constexpr auto disk_type = "s3_plain";
factory.registerObjectStorageType(disk_type, [](
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
/// send_metadata changes the filenames (includes revision), while
/// s3_plain do not care about this, and expect that the file name
/// will not be changed.
///
/// And besides, send_metadata does not make sense for s3_plain.
if (config.getBool(config_prefix + ".send_metadata", false))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain does not supports send_metadata");
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
return std::make_shared<S3PlainObjectStorage>(
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
});
}
#endif
#if USE_HDFS && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
void registerHDFSObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType("hdfs", [](
const std::string & /* name */,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
checkHDFSURL(uri);
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context->getSettingsRef().hdfs_replication
);
return std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
});
}
#endif
#if USE_AZURE_BLOB_STORAGE && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
void registerAzureObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType("azure_blob_storage", [](
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
return std::make_unique<AzureObjectStorage>(
name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context));
});
}
#endif
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
void registerWebObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType("web", [](
const std::string & /* name */,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
if (!uri.ends_with('/'))
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "URI must end with '/', but '{}' doesn't.", uri);
try
{
Poco::URI poco_uri(uri);
}
catch (const Poco::Exception & e)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Bad URI: `{}`. Error: {}", uri, e.what());
}
return std::make_shared<WebObjectStorage>(uri, context);
});
}
void registerLocalObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType("local_blob_storage", [](
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
String object_key_prefix;
UInt64 keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, object_key_prefix, keep_free_space_bytes);
/// keys are mapped to the fs, object_key_prefix is a directory also
fs::create_directories(object_key_prefix);
return std::make_shared<LocalObjectStorage>(object_key_prefix);
});
}
#endif
void registerObjectStorages()
{
auto & factory = ObjectStorageFactory::instance();
#if USE_AWS_S3
registerS3ObjectStorage(factory);
registerS3PlainObjectStorage(factory);
#endif
#if USE_HDFS && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
registerHDFSObjectStorage(factory);
#endif
#if USE_AZURE_BLOB_STORAGE && !defined(CLICKHOUSE_KEEPER_STANDALONE_BUILD)
registerAzureObjectStorage(factory);
#endif
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
registerWebObjectStorage(factory);
registerLocalObjectStorage(factory);
#endif
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Disks/ObjectStorages/IObjectStorage.h>
namespace DB
{
class ObjectStorageFactory final : private boost::noncopyable
{
public:
using Creator = std::function<ObjectStoragePtr(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool skip_access_check)>;
static ObjectStorageFactory & instance();
void registerObjectStorageType(const std::string & type, Creator creator);
ObjectStoragePtr create(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool skip_access_check) const;
private:
using Registry = std::unordered_map<String, Creator>;
Registry registry;
};
}

View File

@ -0,0 +1,87 @@
#include <Disks/DiskFactory.h>
#include <Interpreters/Context.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
#include <Disks/ObjectStorages/ObjectStorageFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
void registerObjectStorages();
void registerMetadataStorages();
static std::string getCompatibilityMetadataTypeHint(const DataSourceDescription & description)
{
switch (description.type)
{
case DataSourceType::S3:
case DataSourceType::HDFS:
case DataSourceType::LocalBlobStorage:
case DataSourceType::AzureBlobStorage:
return "local";
case DataSourceType::S3_Plain:
return "plain";
case DataSourceType::WebServer:
return "web";
default:
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"Cannot get compatibility metadata hint: "
"no such object storage type: {}", toString(description.type));
}
UNREACHABLE();
}
void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_check)
{
registerObjectStorages();
registerMetadataStorages();
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage = ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto compatibility_metadata_type_hint = config.has("metadata_type")
? ""
: getCompatibilityMetadataTypeHint(object_storage->getDataSourceDescription());
auto metadata_storage = MetadataStorageFactory::instance().create(
name, config, config_prefix, object_storage, compatibility_metadata_type_hint);
DiskObjectStoragePtr disk = std::make_shared<DiskObjectStorage>(
name,
object_storage->getCommonKeyPrefix(),
fmt::format("Disk_{}({})", toString(object_storage->getDataSourceDescription().type), name),
std::move(metadata_storage),
std::move(object_storage),
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("object_storage", creator);
#if USE_AWS_S3
factory.registerDiskType("s3", creator); /// For compatibility
factory.registerDiskType("s3_plain", creator); /// For compatibility
#endif
#if USE_HDFS
factory.registerDiskType("hdfs", creator); /// For compatibility
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerDiskType("azure_blob_storage", creator); /// For compatibility
#endif
factory.registerDiskType("local_blob_storage", creator); /// For compatibility
factory.registerDiskType("web", creator); /// For compatibility
}
}

View File

@ -0,0 +1,127 @@
#include "DiskS3Utils.h"
#if USE_AWS_S3
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Core/ServerUUID.h>
#include <IO/S3/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
ObjectStorageKeysGeneratorPtr getKeyGenerator(
String type,
const S3::URI & uri,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
{
if (type == "s3_plain")
return createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
chassert(type == "s3");
bool storage_metadata_write_full_object_key = DiskObjectStorageMetadata::getWriteFullObjectKeySetting();
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
if (send_metadata && storage_metadata_write_full_object_key)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"s3 does not supports feature 'send_metadata' with feature 'storage_metadata_write_full_object_key'.",
config_prefix);
String object_key_compatibility_prefix = config.getString(config_prefix + ".key_compatibility_prefix", String());
String object_key_template = config.getString(config_prefix + ".key_template", String());
if (object_key_template.empty())
{
if (!object_key_compatibility_prefix.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"Setting 'key_compatibility_prefix' can be defined only with setting 'key_template'.",
config_prefix);
return createObjectStorageKeysGeneratorByPrefix(uri.key);
}
if (send_metadata)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"s3 does not supports send_metadata with setting 'key_template'.",
config_prefix);
if (!storage_metadata_write_full_object_key)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"Feature 'storage_metadata_write_full_object_key' has to be enabled in order to use setting 'key_template'.",
config_prefix);
if (!uri.key.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"URI.key is forbidden with settings 'key_template', use setting 'key_compatibility_prefix' instead'. "
"URI.key: '{}', bucket: '{}'. ",
config_prefix,
uri.key, uri.bucket);
return createObjectStorageKeysGeneratorByTemplate(object_key_template);
}
static String getServerUUID()
{
UUID server_uuid = ServerUUID::get();
if (server_uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
return toString(server_uuid);
}
bool checkBatchRemove(S3ObjectStorage & storage, const String & key_with_trailing_slash)
{
/// NOTE: key_with_trailing_slash is the disk prefix, it is required
/// because access is done via S3ObjectStorage not via IDisk interface
/// (since we don't have disk yet).
const String path = fmt::format("{}clickhouse_remove_objects_capability_{}", key_with_trailing_slash, getServerUUID());
StoredObject object(path);
try
{
auto file = storage.writeObject(object, WriteMode::Rewrite);
file->write("test", 4);
file->finalize();
}
catch (...)
{
try
{
storage.removeObject(object);
}
catch (...) // NOLINT(bugprone-empty-catch)
{
}
/// We don't have write access, therefore no information about batch remove.
return true;
}
try
{
/// Uses `DeleteObjects` request (batch delete).
storage.removeObjects({object});
return true;
}
catch (const Exception &)
{
try
{
storage.removeObject(object);
}
catch (...) // NOLINT(bugprone-empty-catch)
{
}
return false;
}
}
}
#endif

View File

@ -0,0 +1,25 @@
#pragma once
#include "config.h"
#include <Core/Types.h>
#include <Common/ObjectStorageKeyGenerator.h>
#if USE_AWS_S3
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
namespace S3 { struct URI; }
ObjectStorageKeysGeneratorPtr getKeyGenerator(
String type,
const S3::URI & uri,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix);
class S3ObjectStorage;
bool checkBatchRemove(S3ObjectStorage & storage, const std::string & key_with_trailing_slash);
}
#endif

View File

@ -6,7 +6,6 @@
#include <Disks/ObjectStorages/ObjectStorageIteratorAsync.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/WriteBufferFromS3.h>
@ -100,7 +99,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync
{
public:
S3IteratorAsync(
const std::string & bucket,
const std::string & bucket_,
const std::string & path_prefix,
std::shared_ptr<const S3::Client> client_,
size_t max_list_size)
@ -111,7 +110,7 @@ public:
"ListObjectS3")
, client(client_)
{
request.SetBucket(bucket);
request.SetBucket(bucket_);
request.SetPrefix(path_prefix);
request.SetMaxKeys(static_cast<int>(max_list_size));
}
@ -156,7 +155,7 @@ private:
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto settings_ptr = s3_settings.get();
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@ -176,9 +175,9 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
bucket,
uri.bucket,
path,
version_id,
uri.version_id,
settings_ptr->request_settings,
disk_read_settings,
/* use_external_buffer */true,
@ -226,9 +225,9 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
auto settings_ptr = s3_settings.get();
return std::make_unique<ReadBufferFromS3>(
client.get(),
bucket,
uri.bucket,
object.remote_path,
version_id,
uri.version_id,
settings_ptr->request_settings,
patchSettings(read_settings));
}
@ -257,7 +256,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
return std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
uri.bucket,
object.remote_path,
buf_size,
settings_ptr->request_settings,
@ -271,7 +270,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
{
auto settings_ptr = s3_settings.get();
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
return std::make_shared<S3IteratorAsync>(uri.bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
}
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
@ -279,7 +278,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
auto settings_ptr = s3_settings.get();
S3::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetBucket(uri.bucket);
request.SetPrefix(path);
if (max_keys)
request.SetMaxKeys(max_keys);
@ -325,12 +324,12 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
S3::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetBucket(uri.bucket);
request.SetKey(object.remote_path);
auto outcome = client.get()->DeleteObject(request);
if (auto blob_storage_log = BlobStorageLogWriter::create(disk_name))
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
bucket, object.remote_path, object.local_path, object.bytes_size,
uri.bucket, object.remote_path, object.local_path, object.bytes_size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
throwIfUnexpectedError(outcome, if_exists);
@ -377,7 +376,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
S3::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetBucket(uri.bucket);
request.SetDelete(delkeys);
auto outcome = client.get()->DeleteObjects(request);
@ -387,7 +386,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
auto time_now = std::chrono::system_clock::now();
for (const auto & object : objects)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
bucket, object.remote_path, object.local_path, object.bytes_size,
uri.bucket, object.remote_path, object.local_path, object.bytes_size,
outcome_error, time_now);
}
@ -420,7 +419,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
return {};
@ -436,7 +435,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
ObjectMetadata result;
result.size_bytes = object_info.size;
@ -457,18 +456,18 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto client_ = dest_s3->client.get();
auto current_client = dest_s3->client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
try {
copyS3File(
client_,
bucket,
current_client,
uri.bucket,
object_from.remote_path,
0,
size,
dest_s3->bucket,
dest_s3->uri.bucket,
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
@ -499,16 +498,16 @@ void S3ObjectStorage::copyObject( // NOLINT
const WriteSettings &,
std::optional<ObjectAttributes> object_to_attributes)
{
auto client_ = client.get();
auto current_client = client.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(client_,
bucket,
copyS3File(current_client,
uri.bucket,
object_from.remote_path,
0,
size,
bucket,
uri.bucket,
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
@ -552,10 +551,12 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
auto new_uri{uri};
new_uri.bucket = new_namespace;
return std::make_unique<S3ObjectStorage>(
std::move(new_client), std::move(new_s3_settings),
version_id, s3_capabilities, new_namespace,
endpoint, key_generator, disk_name);
std::move(new_client), std::move(new_s3_settings), new_uri, s3_capabilities, key_generator, disk_name);
}
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string & path) const

View File

@ -49,22 +49,19 @@ private:
const char * logger_name,
std::unique_ptr<S3::Client> && client_,
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_,
S3::URI uri_,
const S3Capabilities & s3_capabilities_,
String bucket_,
String connection_string,
ObjectStorageKeysGeneratorPtr key_generator_,
const String & disk_name_)
: bucket(std::move(bucket_))
: uri(uri_)
, key_generator(std::move(key_generator_))
, disk_name(disk_name_)
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
{
data_source_description.type = DataSourceType::S3;
data_source_description.description = connection_string;
data_source_description.description = uri_.endpoint;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
@ -85,6 +82,8 @@ public:
std::string getName() const override { return "S3ObjectStorage"; }
std::string getCommonKeyPrefix() const override { return uri.key; }
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
@ -153,7 +152,7 @@ public:
const std::string & config_prefix,
ContextPtr context) override;
std::string getObjectsNamespace() const override { return bucket; }
std::string getObjectsNamespace() const override { return uri.bucket; }
bool isRemote() const override { return true; }
@ -177,8 +176,8 @@ private:
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
private:
std::string bucket;
const S3::URI uri;
ObjectStorageKeysGeneratorPtr key_generator;
std::string disk_name;
@ -186,8 +185,6 @@ private:
MultiVersion<S3ObjectStorageSettings> s3_settings;
S3Capabilities s3_capabilities;
const String version_id;
Poco::Logger * log;
DataSourceDescription data_source_description;
};

View File

@ -19,7 +19,6 @@
#include <Storages/StorageS3Settings.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/DiskLocal.h>
#include <Common/Macros.h>

View File

@ -1,243 +0,0 @@
#include "config.h"
#include <Common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Disks/DiskFactory.h>
#if USE_AWS_S3
#include <base/getFQDNOrHostName.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
#include <Core/ServerUUID.h>
#include <Common/Macros.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
{
class CheckAccess
{
public:
static bool checkBatchRemove(S3ObjectStorage & storage, const String & key_with_trailing_slash)
{
/// NOTE: key_with_trailing_slash is the disk prefix, it is required
/// because access is done via S3ObjectStorage not via IDisk interface
/// (since we don't have disk yet).
const String path = fmt::format("{}clickhouse_remove_objects_capability_{}", key_with_trailing_slash, getServerUUID());
StoredObject object(path);
try
{
auto file = storage.writeObject(object, WriteMode::Rewrite);
file->write("test", 4);
file->finalize();
}
catch (...)
{
try
{
storage.removeObject(object);
}
catch (...) // NOLINT(bugprone-empty-catch)
{
}
return true; /// We don't have write access, therefore no information about batch remove.
}
try
{
/// Uses `DeleteObjects` request (batch delete).
storage.removeObjects({object});
return true;
}
catch (const Exception &)
{
try
{
storage.removeObject(object);
}
catch (...) // NOLINT(bugprone-empty-catch)
{
}
return false;
}
}
private:
static String getServerUUID()
{
UUID server_uuid = ServerUUID::get();
if (server_uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
return toString(server_uuid);
}
};
std::pair<String, ObjectStorageKeysGeneratorPtr> getPrefixAndKeyGenerator(
String type, const S3::URI & uri, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
if (type == "s3_plain")
return {uri.key, createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key)};
chassert(type == "s3");
bool storage_metadata_write_full_object_key = DiskObjectStorageMetadata::getWriteFullObjectKeySetting();
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
if (send_metadata && storage_metadata_write_full_object_key)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"s3 does not supports feature 'send_metadata' with feature 'storage_metadata_write_full_object_key'.",
config_prefix);
String object_key_compatibility_prefix = config.getString(config_prefix + ".key_compatibility_prefix", String());
String object_key_template = config.getString(config_prefix + ".key_template", String());
if (object_key_template.empty())
{
if (!object_key_compatibility_prefix.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"Setting 'key_compatibility_prefix' can be defined only with setting 'key_template'.",
config_prefix);
return {uri.key, createObjectStorageKeysGeneratorByPrefix(uri.key)};
}
if (send_metadata)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"s3 does not supports send_metadata with setting 'key_template'.",
config_prefix);
if (!storage_metadata_write_full_object_key)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"Feature 'storage_metadata_write_full_object_key' has to be enabled in order to use setting 'key_template'.",
config_prefix);
if (!uri.key.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong configuration in {}. "
"URI.key is forbidden with settings 'key_template', use setting 'key_compatibility_prefix' instead'. "
"URI.key: '{}', bucket: '{}'. ",
config_prefix,
uri.key, uri.bucket);
return {object_key_compatibility_prefix, createObjectStorageKeysGeneratorByTemplate(object_key_template)};
}
}
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
// an empty key remains empty
if (!uri.key.empty() && !uri.key.ends_with('/'))
uri.key.push_back('/');
S3Capabilities s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
std::shared_ptr<S3ObjectStorage> s3_storage;
String type = config.getString(config_prefix + ".type");
chassert(type == "s3" || type == "s3_plain");
auto [object_key_compatibility_prefix, object_key_generator] = getPrefixAndKeyGenerator(type, uri, config, config_prefix);
MetadataStoragePtr metadata_storage;
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
if (type == "s3_plain")
{
/// send_metadata changes the filenames (includes revision), while
/// s3_plain do not care about this, and expect that the file name
/// will not be changed.
///
/// And besides, send_metadata does not make sense for s3_plain.
if (config.getBool(config_prefix + ".send_metadata", false))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain does not supports send_metadata");
s3_storage = std::make_shared<S3PlainObjectStorage>(
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, object_key_generator, name);
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, object_key_compatibility_prefix);
}
else
{
s3_storage = std::make_shared<S3ObjectStorage>(
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, object_key_generator, name);
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, object_key_compatibility_prefix);
}
/// NOTE: should we still perform this check for clickhouse-disks?
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
if (!skip_access_check)
{
/// If `support_batch_delete` is turned on (default), check and possibly switch it off.
if (s3_capabilities.support_batch_delete && !CheckAccess::checkBatchRemove(*s3_storage, uri.key))
{
LOG_WARNING(
&Poco::Logger::get("registerDiskS3"),
"Storage for disk {} does not support batch delete operations, "
"so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. "
"To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.",
name
);
s3_storage->setCapabilitiesSupportBatchDelete(false);
}
}
DiskObjectStoragePtr s3disk = std::make_shared<DiskObjectStorage>(
name,
uri.key, /// might be empty
type == "s3" ? "DiskS3" : "DiskS3Plain",
std::move(metadata_storage),
std::move(s3_storage),
config,
config_prefix);
s3disk->startup(context, skip_access_check);
return s3disk;
};
factory.registerDiskType("s3", creator);
factory.registerDiskType("s3_plain", creator);
}
}
#else
void registerDiskS3(DB::DiskFactory &, bool /* global_skip_access_check */) {}
#endif

View File

@ -33,6 +33,8 @@ public:
std::string getName() const override { return "WebObjectStorage"; }
std::string getCommonKeyPrefix() const override { return ""; }
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT

View File

@ -1,65 +0,0 @@
#include <Poco/Exception.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Disks/DiskFactory.h>
#include <Disks/ObjectStorages/Web/WebObjectStorage.h>
#include <Disks/ObjectStorages/Web/MetadataStorageFromStaticFilesWebServer.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Common/assert_cast.h>
#include <Common/Macros.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
{
auto creator = [global_skip_access_check](
const String & disk_name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr
{
String uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
if (!uri.ends_with('/'))
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "URI must end with '/', but '{}' doesn't.", uri);
try
{
Poco::URI poco_uri(uri);
}
catch (const Poco::Exception & e)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Bad URI: `{}`. Error: {}", uri, e.what());
}
auto object_storage = std::make_shared<WebObjectStorage>(uri, context);
auto metadata_storage = std::make_shared<MetadataStorageFromStaticFilesWebServer>(assert_cast<const WebObjectStorage &>(*object_storage));
std::string root_path;
DiskPtr disk = std::make_shared<DiskObjectStorage>(
disk_name,
root_path,
"DiskWebServer",
metadata_storage,
object_storage,
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;
};
factory.registerDiskType("web", creator);
}
}

View File

@ -9,27 +9,12 @@ namespace DB
void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_AZURE_BLOB_STORAGE
void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_SSL
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check);
#endif
#if USE_HDFS
void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check);
#endif
void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check);
void registerDiskCache(DiskFactory & factory, bool global_skip_access_check);
void registerDiskLocalObjectStorage(DiskFactory & factory, bool global_skip_access_check);
void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_check);
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD
@ -40,27 +25,13 @@ void registerDisks(bool global_skip_access_check)
registerDiskLocal(factory, global_skip_access_check);
#if USE_AWS_S3
registerDiskS3(factory, global_skip_access_check);
#endif
#if USE_AZURE_BLOB_STORAGE
registerDiskAzureBlobStorage(factory, global_skip_access_check);
#endif
#if USE_SSL
registerDiskEncrypted(factory, global_skip_access_check);
#endif
#if USE_HDFS
registerDiskHDFS(factory, global_skip_access_check);
#endif
registerDiskWebServer(factory, global_skip_access_check);
registerDiskCache(factory, global_skip_access_check);
registerDiskLocalObjectStorage(factory, global_skip_access_check);
registerDiskObjectStorage(factory, global_skip_access_check);
}
#else
@ -71,9 +42,7 @@ void registerDisks(bool global_skip_access_check)
registerDiskLocal(factory, global_skip_access_check);
#if USE_AWS_S3
registerDiskS3(factory, global_skip_access_check);
#endif
registerDiskObjectStorage(factory, global_skip_access_check);
}
#endif

View File

@ -0,0 +1,15 @@
<clickhouse>
<storage_configuration>
<disks>
<s3_disk_02963>
<type>object_storage</type>
<object_storage_type>s3</object_storage_type>
<path>s3_disk/</path>
<endpoint>http://localhost:11111/test/common/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk_02963>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -176,6 +176,7 @@ if [[ -n "$EXPORT_S3_STORAGE_POLICIES" ]]; then
ln -sf $SRC_PATH/config.d/storage_conf.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/storage_conf_02944.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/storage_conf_02963.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/users.d/s3_cache.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/s3_cache_new.xml $DEST_SERVER_PATH/users.d/
fi

View File

@ -0,0 +1,56 @@
-- Tags: no-fasttest
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test1', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test1/');
drop table test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk='s3_disk_02963';
drop table test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test1',
type = object_storage,
object_storage_type = s3,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test2',
type = object_storage,
object_storage_type = s3,
metadata_storage_type = local,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test3',
type = object_storage,
object_storage_type = s3,
metadata_type = lll,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse); -- { serverError UNKNOWN_ELEMENT_IN_CONFIG }
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test4',
type = object_storage,
object_storage_type = kkk,
metadata_type = local,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse); -- { serverError UNKNOWN_ELEMENT_IN_CONFIG }
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test5',
type = kkk,
object_storage_type = s3,
metadata_type = local,
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse); -- { serverError UNKNOWN_ELEMENT_IN_CONFIG }