Add shortcut for backups

This commit is contained in:
alesapin 2022-08-19 16:58:30 +02:00
parent 7b460b5f85
commit d8664c3227
46 changed files with 334 additions and 90 deletions

View File

@ -2,6 +2,7 @@
#include <Disks/IDisk.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Poco/File.h>
#include <Common/filesystemHelpers.h>
namespace DB
@ -44,4 +45,24 @@ std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer(
return createReadBufferFromFileBase(file_path, /* settings= */ {});
}
DataSourceDescription BackupEntryFromImmutableFile::getDataSourceDescription() const
{
if (disk)
return disk->getDataSourceDescription();
DataSourceDescription result{
.type = DataSourceType::Local,
.is_encrypted = false,
.is_cached = false,
};
if (auto block_device_id = tryGetBlockDeviceId(file_path); block_device_id.has_value())
result.description = *block_device_id;
else
result.description = file_path;
return result;
}
}

View File

@ -39,6 +39,8 @@ public:
String getFilePath() const override { return file_path; }
DiskPtr getDisk() const { return disk; }
DataSourceDescription getDataSourceDescription() const override;
private:
const DiskPtr disk;
const String file_path;

View File

@ -24,6 +24,11 @@ public:
return "";
}
DataSourceDescription getDataSourceDescription() const override
{
return DataSourceDescription{DataSourceType::RAM, "", false, false};
}
private:
const String data;
const std::optional<UInt128> checksum;

View File

@ -36,4 +36,5 @@ BackupEntryFromSmallFile::BackupEntryFromSmallFile(
: BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_)
{
}
}

View File

@ -25,7 +25,6 @@ public:
String getFilePath() const override { return file_path; }
DiskPtr getDisk() const { return disk; }
private:
const DiskPtr disk;
const String file_path;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Disks/DiskType.h>
namespace DB
{
@ -15,6 +16,7 @@ public:
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
virtual std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
};
/// Represents operations of storing to disk or uploading for writing a backup.
@ -27,7 +29,14 @@ public:
virtual bool fileContentsEqual(const String & file_name, const String & expected_file_contents) = 0;
virtual std::unique_ptr<WriteBuffer> writeFile(const String & file_name) = 0;
virtual void removeFiles(const Strings & file_names) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual void copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const
{
return false;
}
virtual void copyFileNative(const String & file_name_from, const String & file_name_to);
};

View File

@ -77,4 +77,26 @@ void BackupWriterDisk::removeFiles(const Strings & file_names)
disk->removeDirectory(path);
}
DataSourceDescription BackupWriterDisk::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
DataSourceDescription BackupReaderDisk::getDataSourceDescription() const
{
return disk->getDataSourceDescription();
}
bool BackupWriterDisk::supportNativeCopy(DataSourceDescription data_source_description) const
{
return data_source_description == disk->getDataSourceDescription();
}
void BackupWriterDisk::copyFileNative(const String & file_name_from, const String & file_name_to)
{
auto file_path = path / file_name_to;
disk->createDirectories(file_path.parent_path());
disk->copyFile(file_name_from, *disk, file_path);
}
}

View File

@ -17,6 +17,7 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
DataSourceDescription getDataSourceDescription() const override;
private:
DiskPtr disk;
@ -34,6 +35,11 @@ public:
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(const String & file_name_from, const String & file_name_to) override;
private:
DiskPtr disk;
std::filesystem::path path;

View File

@ -2,6 +2,7 @@
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
#include <Common/filesystemHelpers.h>
namespace fs = std::filesystem;
@ -79,4 +80,49 @@ void BackupWriterFile::removeFiles(const Strings & file_names)
fs::remove(path);
}
DataSourceDescription BackupWriterFile::getDataSourceDescription() const
{
DataSourceDescription data_source_description;
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
return data_source_description;
}
DataSourceDescription BackupReaderFile::getDataSourceDescription() const
{
DataSourceDescription data_source_description;
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
return data_source_description;
}
bool BackupWriterFile::supportNativeCopy(DataSourceDescription data_source_description) const
{
return data_source_description == getDataSourceDescription();
}
void BackupWriterFile::copyFileNative(const String & file_name_from, const String & file_name_to)
{
auto file_path = path / file_name_to;
fs::create_directories(file_path.parent_path());
fs::copy(file_name_from, file_path, fs::copy_options::recursive | fs::copy_options::overwrite_existing);
}
}

View File

@ -15,6 +15,7 @@ public:
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
std::unique_ptr<SeekableReadBuffer> readFile(const String & file_name) override;
DataSourceDescription getDataSourceDescription() const override;
private:
std::filesystem::path path;
@ -31,6 +32,11 @@ public:
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(const String & file_name_from, const String & file_name_to) override;
private:
std::filesystem::path path;
};

View File

@ -116,6 +116,12 @@ public:
return data_file_name;
}
DataSourceDescription getDataSourceDescription() const override
{
return backup->reader->getDataSourceDescription();
}
private:
const std::shared_ptr<const BackupImpl> backup;
const String archive_suffix;
@ -776,40 +782,52 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
if (!is_data_file_required)
return; /// We copy data only if it's a new combination of size & checksum.
auto read_buffer = entry->getReadBuffer();
/// If we have prefix in base we will seek to the start of the suffix which differs
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
if (!num_files_written)
checkLockFile(true);
std::unique_ptr<WriteBuffer> out;
if (use_archives)
/// We need to copy whole file without archive, we can do it faster
/// if source and destination are compatible
if (!use_archives && info.base_size == 0)
{
String archive_suffix = current_archive_suffix;
bool next_suffix = false;
if (current_archive_suffix.empty() && is_internal_backup)
next_suffix = true;
/*if (archive_params.max_volume_size && current_archive_writer
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/
if (next_suffix)
current_archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != current_archive_suffix)
{
info.archive_suffix = current_archive_suffix;
coordination->updateFileInfo(info);
}
out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
copyData(*read_buffer, *out);
out->finalize();
auto writer_description = writer->getDataSourceDescription();
auto reader_description = entry->getDataSourceDescription();
/// Should be much faster than writing data through server
if (writer->supportNativeCopy(reader_description))
writer->copyFileNative(entry->getFilePath(), info.data_file_name);
}
else
{
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
auto read_buffer = entry->getReadBuffer();
/// If we have prefix in base we will seek to the start of the suffix which differs
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
if (!num_files_written)
checkLockFile(true);
if (use_archives)
{
String archive_suffix = current_archive_suffix;
bool next_suffix = false;
if (current_archive_suffix.empty() && is_internal_backup)
next_suffix = true;
/*if (archive_params.max_volume_size && current_archive_writer
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/
if (next_suffix)
current_archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != current_archive_suffix)
{
info.archive_suffix = current_archive_suffix;
coordination->updateFileInfo(info);
}
auto out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
copyData(*read_buffer, *out);
out->finalize();
}
else
{
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
}
}
++num_files_written;

View File

@ -25,6 +25,12 @@ public:
return getInternalBackupEntry()->getFilePath();
}
DataSourceDescription getDataSourceDescription() const override
{
return getInternalBackupEntry()->getDataSourceDescription();
}
private:
BackupEntryPtr getInternalBackupEntry() const
{

View File

@ -4,6 +4,7 @@
#include <memory>
#include <optional>
#include <vector>
#include <Disks/DiskType.h>
namespace DB
{
@ -26,6 +27,8 @@ public:
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer() const = 0;
virtual String getFilePath() const = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
};
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;

View File

@ -46,7 +46,7 @@ namespace
void checkPath(const String & disk_name, const DiskPtr & disk, fs::path & path)
{
path = path.lexically_normal();
if (!path.is_relative() && (disk->getType() == DiskType::Local))
if (!path.is_relative() && (disk->getDataSourceDescription().type == DataSourceType::Local))
path = path.lexically_proximate(disk->getPath());
bool path_ok = path.empty() || (path.is_relative() && (*path.begin() != ".."));

View File

@ -79,6 +79,22 @@ String getBlockDeviceId([[maybe_unused]] const String & path)
#endif
}
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path)
{
#if defined(OS_LINUX)
struct stat sb;
if (lstat(path.c_str(), &sb))
return {};
WriteBufferFromOwnString ss;
ss << major(sb.st_dev) << ":" << minor(sb.st_dev);
return ss.str();
#else
return {};
#endif
}
#if !defined(OS_LINUX)
[[noreturn]]
#endif

View File

@ -25,6 +25,8 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
#endif
String getBlockDeviceId([[maybe_unused]] const String & path);
std::optional<String> tryGetBlockDeviceId([[maybe_unused]] const String & path);
enum class BlockDeviceType
{
UNKNOWN = 0, // we were unable to determine device type

View File

@ -72,7 +72,7 @@ public:
void sync(int fd) const;
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType getType() const override { return delegate->getType(); }
DataSourceDescription getDataSourceDescription() const override { return delegate->getDataSourceDescription(); }
bool isRemote() const override { return delegate->isRemote(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
bool supportParallelWrite() const override { return delegate->supportParallelWrite(); }

View File

@ -234,7 +234,13 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType getType() const override { return DiskType::Encrypted; }
DataSourceDescription getDataSourceDescription() const override
{
auto delegate_description = delegate->getDataSourceDescription();
delegate_description.is_encrypted = true;
return delegate_description;
}
bool isRemote() const override { return delegate->isRemote(); }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -230,14 +230,14 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
if (bytes == 0)
{
LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name));
LOG_DEBUG(logger, "Reserving 0 bytes on disk {}", backQuote(name));
++reservation_count;
return {unreserved_space};
}
if (unreserved_space >= bytes)
{
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
LOG_DEBUG(logger, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
@ -497,6 +497,14 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
{
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId(disk_path); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = disk_path;
data_source_description.is_encrypted = false;
data_source_description.is_cached = false;
}
DiskLocal::DiskLocal(
@ -507,6 +515,11 @@ DiskLocal::DiskLocal(
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
}
DataSourceDescription DiskLocal::getDataSourceDescription() const
{
return data_source_description;
}
void DiskLocal::startup(ContextPtr)
{
try
@ -615,7 +628,6 @@ DiskObjectStoragePtr DiskLocal::createDiskObjectStorage()
"Local",
metadata_storage,
object_storage,
DiskType::Local,
false,
/* threadpool_size */16
);

View File

@ -101,7 +101,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType getType() const override { return DiskType::Local; }
DataSourceDescription getDataSourceDescription() const override;
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }
@ -145,14 +146,13 @@ private:
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes;
Poco::Logger * logger;
DataSourceDescription data_source_description;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
static std::mutex reservation_mutex;
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
std::atomic<bool> broken{false};
std::atomic<bool> readonly{false};
std::unique_ptr<DiskLocalCheckThread> disk_checker;

View File

@ -91,7 +91,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType getType() const override { return DiskType::RAM; }
DataSourceDescription getDataSourceDescription() const override { return DataSourceDescription{DataSourceType::RAM, "", false, false}; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }

View File

@ -5,40 +5,44 @@
namespace DB
{
enum class DiskType
enum class DataSourceType
{
Local,
RAM,
S3,
HDFS,
Encrypted,
WebServer,
AzureBlobStorage,
Cache,
};
inline String toString(DiskType disk_type)
inline String toString(DataSourceType data_source_type)
{
switch (disk_type)
switch (data_source_type)
{
case DiskType::Local:
case DataSourceType::Local:
return "local";
case DiskType::RAM:
case DataSourceType::RAM:
return "memory";
case DiskType::S3:
case DataSourceType::S3:
return "s3";
case DiskType::HDFS:
case DataSourceType::HDFS:
return "hdfs";
case DiskType::Encrypted:
return "encrypted";
case DiskType::WebServer:
case DataSourceType::WebServer:
return "web";
case DiskType::AzureBlobStorage:
case DataSourceType::AzureBlobStorage:
return "azure_blob_storage";
case DiskType::Cache:
return "cache";
}
__builtin_unreachable();
}
struct DataSourceDescription
{
DataSourceType type;
std::string description;
bool is_encrypted;
bool is_cached;
bool operator==(const DataSourceDescription & o) const;
};
}

View File

@ -113,7 +113,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
void IDisk::truncateFile(const String &, size_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getDataSourceDescription().type);
}
SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const

View File

@ -227,7 +227,7 @@ public:
virtual NameSet getCacheLayersNames() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getCacheLayersNames()` is not implemented for disk: {}", getDataSourceDescription().type);
}
/// Returns a list of storage objects (contains path, size, ...).
@ -235,7 +235,7 @@ public:
/// be multiple files in remote fs for single clickhouse file.
virtual StoredObjects getStorageObjects(const String &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getDataSourceDescription().type);
}
/// For one local path there might be multiple remote paths in case of Log family engines.
@ -243,7 +243,7 @@ public:
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getRemotePathsRecursive() not implemented for disk: {}`", getDataSourceDescription().type);
}
/// Batch request to remove multiple files.
@ -271,8 +271,8 @@ public:
/// Truncate file to specified size.
virtual void truncateFile(const String & path, size_t size);
/// Return disk type - "local", "s3", etc.
virtual DiskType getType() const = 0;
/// Return data source description
virtual DataSourceDescription getDataSourceDescription() const = 0;
/// Involves network interaction.
virtual bool isRemote() const = 0;
@ -357,7 +357,7 @@ public:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method createDiskObjectStorage() is not implemented for disk type: {}",
getType());
getDataSourceDescription().type);
}
virtual bool supportsStat() const { return false; }

View File

@ -32,6 +32,10 @@ AzureObjectStorage::AzureObjectStorage(
, settings(std::move(settings_))
, log(&Poco::Logger::get("AzureObjectStorage"))
{
data_source_description.type = DataSourceType::AzureBlobStorage;
data_source_description.description = client.get()->GetUrl();
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* path */)

View File

@ -58,6 +58,8 @@ public:
AzureClientPtr && client_,
SettingsPtr && settings_);
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "AzureObjectStorage"; }
bool exists(const StoredObject & object) const override;
@ -129,6 +131,8 @@ private:
MultiVersion<AzureObjectStorageSettings> settings;
Poco::Logger * log;
DataSourceDescription data_source_description;
};
}

View File

@ -90,7 +90,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
"DiskAzureBlobStorage",
std::move(metadata_storage),
std::move(azure_object_storage),
DiskType::AzureBlobStorage,
send_metadata,
copy_thread_pool_size
);

View File

@ -34,6 +34,13 @@ CachedObjectStorage::CachedObjectStorage(
cache->initialize();
}
DataSourceDescription CachedObjectStorage::getDataSourceDescription() const
{
auto wrapped_object_storage_data_source = object_storage->getDataSourceDescription();
wrapped_object_storage_data_source.is_cached = true;
return wrapped_object_storage_data_source;
}
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
{
return cache->hash(path);

View File

@ -20,6 +20,8 @@ class CachedObjectStorage final : public IObjectStorage
public:
CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_, const FileCacheSettings & cache_settings_, const String & cache_config_name_);
DataSourceDescription getDataSourceDescription() const override;
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
bool exists(const StoredObject & object) const override;

View File

@ -103,14 +103,12 @@ DiskObjectStorage::DiskObjectStorage(
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
DiskType disk_type_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(getAsyncExecutor(log_name, thread_pool_size_))
, name(name_)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, disk_type(disk_type_)
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(send_metadata_)
@ -469,7 +467,6 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
getName(),
metadata_storage,
object_storage,
disk_type,
send_metadata,
threadpool_size);
}

View File

@ -34,14 +34,13 @@ public:
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
DiskType disk_type_,
bool send_metadata_,
uint64_t thread_pool_size_);
/// Create fake transaction
DiskTransactionPtr createTransaction() override;
DiskType getType() const override { return disk_type; }
DataSourceDescription getDataSourceDescription() const override { return object_storage->getDataSourceDescription(); }
bool supportZeroCopyReplication() const override { return true; }
@ -206,7 +205,6 @@ private:
const String object_storage_root_path;
Poco::Logger * log;
const DiskType disk_type;
MetadataStoragePtr metadata_storage;
ObjectStoragePtr object_storage;

View File

@ -48,10 +48,20 @@ public:
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_))
{}
{
data_source_description.type = DataSourceType::HDFS;
data_source_description.description = hdfs_root_path_;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
std::string getName() const override { return "HDFSObjectStorage"; }
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
}
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
@ -121,6 +131,8 @@ private:
HDFSFSPtr hdfs_fs;
SettingsPtr settings;
DataSourceDescription data_source_description;
};
}

View File

@ -49,7 +49,6 @@ void registerDiskHDFS(DiskFactory & factory)
"DiskHDFS",
std::move(metadata_storage),
std::move(hdfs_storage),
DiskType::HDFS,
/* send_metadata = */ false,
copy_thread_pool_size);

View File

@ -15,6 +15,7 @@
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool.h>
#include <Common/FileCache.h>
#include <Disks/WriteMode.h>
@ -58,6 +59,8 @@ class IObjectStorage
public:
IObjectStorage() = default;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual std::string getName() const = 0;
/// Object exists or not

View File

@ -28,6 +28,14 @@ namespace ErrorCodes
LocalObjectStorage::LocalObjectStorage()
: log(&Poco::Logger::get("LocalObjectStorage"))
{
data_source_description.type = DataSourceType::Local;
if (auto block_device_id = tryGetBlockDeviceId("/"); block_device_id.has_value())
data_source_description.description = *block_device_id;
else
data_source_description.description = "/";
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
bool LocalObjectStorage::exists(const StoredObject & object) const

View File

@ -17,6 +17,8 @@ class LocalObjectStorage : public IObjectStorage
public:
LocalObjectStorage();
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "LocalObjectStorage"; }
bool exists(const StoredObject & object) const override;
@ -86,6 +88,7 @@ public:
private:
Poco::Logger * log;
DataSourceDescription data_source_description;
};
}

View File

@ -2,6 +2,8 @@
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
@ -25,6 +27,7 @@
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
@ -511,7 +514,8 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
return std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
version_id, s3_capabilities, new_namespace);
version_id, s3_capabilities, new_namespace,
S3::URI(Poco::URI(config.getString(config_prefix + ".endpoint"))).endpoint);
}
}

View File

@ -48,13 +48,23 @@ public:
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_,
const S3Capabilities & s3_capabilities_,
String bucket_)
String bucket_,
String connection_string)
: bucket(bucket_)
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
{
data_source_description.type = DataSourceType::S3;
data_source_description.description = connection_string;
data_source_description.is_cached = false;
data_source_description.is_encrypted = false;
}
DataSourceDescription getDataSourceDescription() const override
{
return data_source_description;
}
std::string getName() const override { return "S3ObjectStorage"; }
@ -169,6 +179,8 @@ private:
S3Capabilities s3_capabilities;
const String version_id;
DataSourceDescription data_source_description;
};
}

View File

@ -133,7 +133,7 @@ void registerDiskS3(DiskFactory & factory)
auto s3_storage = std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
uri.version_id, s3_capabilities, uri.bucket);
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
bool skip_access_check = config.getBool(config_prefix + ".skip_access_check", false);
@ -162,7 +162,6 @@ void registerDiskS3(DiskFactory & factory)
"DiskS3",
std::move(metadata_storage),
std::move(s3_storage),
DiskType::S3,
send_metadata,
copy_thread_pool_size);

View File

@ -20,6 +20,16 @@ class WebObjectStorage : public IObjectStorage, WithContext
public:
WebObjectStorage(const String & url_, ContextPtr context_);
DataSourceDescription getDataSourceDescription() const override
{
return DataSourceDescription{
.type = DataSourceType::WebServer,
.description = url,
.is_encrypted = false,
.is_cached = false,
};
}
std::string getName() const override { return "WebObjectStorage"; }
bool exists(const StoredObject & object) const override;

View File

@ -47,7 +47,6 @@ void registerDiskWebServer(DiskFactory & factory)
"DiskWebServer",
metadata_storage,
object_storage,
DiskType::WebServer,
/* send_metadata */false,
/* threadpool_size */16);
};

View File

@ -415,7 +415,7 @@ std::string DataPartStorageOnDisk::getDiskName() const
std::string DataPartStorageOnDisk::getDiskType() const
{
return toString(volume->getDisk()->getType());
return toString(volume->getDisk()->getDataSourceDescription().type);
}
bool DataPartStorageOnDisk::isStoredOnRemoteDisk() const

View File

@ -458,11 +458,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
Disks disks = data.getDisks();
for (const auto & data_disk : disks)
if (data_disk->supportZeroCopyReplication())
capability.push_back(toString(data_disk->getType()));
capability.push_back(toString(data_disk->getDataSourceDescription().type));
}
else if (disk->supportZeroCopyReplication())
{
capability.push_back(toString(disk->getType()));
capability.push_back(toString(disk->getDataSourceDescription().type));
}
}
if (!capability.empty())

View File

@ -7605,7 +7605,7 @@ void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_nam
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(),
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
@ -7795,11 +7795,11 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
const String & path)
{
const auto settings = getSettings();
auto disk_type = disk->getType();
auto data_source_description = disk->getDataSourceDescription();
if (!(disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication))
return nullptr;
String replica = getSharedDataReplica(part, disk_type);
String replica = getSharedDataReplica(part, data_source_description.type);
/// We can't fetch part when none replicas have this part on a same type remote disk
if (replica.empty())
@ -7808,9 +7808,8 @@ DataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
return executeFetchShared(replica, part.name, disk, path);
}
String StorageReplicatedMergeTree::getSharedDataReplica(
const IMergeTreeDataPart & part, DiskType disk_type) const
const IMergeTreeDataPart & part, DataSourceType data_source_type) const
{
String best_replica;
@ -7818,7 +7817,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
if (!zookeeper)
return "";
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk_type), getTableSharedID(), part.name,
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(data_source_type), getTableSharedID(), part.name,
zookeeper_path);
std::set<String> replicas;
@ -7929,7 +7928,7 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
return getZeroCopyPartPath(*getSettings(), toString(disk->getType()), getTableSharedID(), part_name, zookeeper_path)[0];
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
@ -8334,7 +8333,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
String id = disk->getUniqueId(checksums);
bool can_remove = false;
std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, toString(disk->getType()), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
detached_replica_name, toString(disk->getDataSourceDescription().type), zookeeper, local_context->getReplicatedMergeTreeSettings(), &Poco::Logger::get("StorageReplicatedMergeTree"),
detached_zookeeper_path);
keep_shared = !can_remove;

View File

@ -286,7 +286,7 @@ public:
DataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType disk_type) const;
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
inline String getReplicaName() const { return replica_name; }

View File

@ -54,7 +54,7 @@ Pipe StorageSystemDisks::read(
col_free->insert(disk_ptr->getAvailableSpace());
col_total->insert(disk_ptr->getTotalSpace());
col_keep->insert(disk_ptr->getKeepingFreeSpace());
col_type->insert(toString(disk_ptr->getType()));
col_type->insert(toString(disk_ptr->getDataSourceDescription().type));
String cache_path;
if (disk_ptr->supportsCache())