mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Implement encrypted disk transaction and fix shared merge tree with encrypted disk
This commit is contained in:
parent
2ac600b642
commit
2a3362e0c8
@ -203,18 +203,19 @@ private:
|
||||
};
|
||||
|
||||
DiskEncrypted::DiskEncrypted(
|
||||
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_)
|
||||
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_))
|
||||
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_)
|
||||
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), use_fake_transaction_)
|
||||
{
|
||||
}
|
||||
|
||||
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
|
||||
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_, bool use_fake_transaction_)
|
||||
: IDisk(name_)
|
||||
, delegate(settings_->wrapped_disk)
|
||||
, encrypted_name(name_)
|
||||
, disk_path(settings_->disk_path)
|
||||
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
|
||||
, current_settings(std::move(settings_))
|
||||
, use_fake_transaction(use_fake_transaction_)
|
||||
{
|
||||
delegate->createDirectories(disk_path);
|
||||
}
|
||||
@ -416,7 +417,7 @@ void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
|
||||
const DisksMap & map) -> DiskPtr
|
||||
{
|
||||
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
|
||||
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
|
||||
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map, config.getBool(config_prefix + ".use_fake_transaction", true));
|
||||
disk->startup(context, skip_access_check);
|
||||
return disk;
|
||||
};
|
||||
|
@ -6,22 +6,14 @@
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Disks/FakeDiskTransaction.h>
|
||||
#include <Disks/DiskEncryptedTransaction.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadBufferFromFileBase;
|
||||
class WriteBufferFromFileBase;
|
||||
namespace FileEncryption { enum class Algorithm; }
|
||||
|
||||
struct DiskEncryptedSettings
|
||||
{
|
||||
DiskPtr wrapped_disk;
|
||||
String disk_path;
|
||||
std::unordered_map<UInt64, String> keys;
|
||||
UInt64 current_key_id;
|
||||
FileEncryption::Algorithm current_algorithm;
|
||||
};
|
||||
|
||||
/// Encrypted disk ciphers all written files on the fly and writes the encrypted files to an underlying (normal) disk.
|
||||
/// And when we read files from an encrypted disk it deciphers them automatically,
|
||||
@ -29,8 +21,8 @@ struct DiskEncryptedSettings
|
||||
class DiskEncrypted : public IDisk
|
||||
{
|
||||
public:
|
||||
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
|
||||
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_);
|
||||
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_);
|
||||
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_, bool use_fake_transaction_);
|
||||
|
||||
const String & getName() const override { return encrypted_name; }
|
||||
const String & getPath() const override { return disk_absolute_path; }
|
||||
@ -69,7 +61,6 @@ public:
|
||||
delegate->createDirectories(wrapped_path);
|
||||
}
|
||||
|
||||
|
||||
void clearDirectory(const String & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
@ -293,8 +284,17 @@ public:
|
||||
{
|
||||
/// Need to overwrite explicetly because this disk change
|
||||
/// a lot of "delegate" methods.
|
||||
|
||||
if (use_fake_transaction)
|
||||
{
|
||||
return std::make_shared<FakeDiskTransaction>(*this);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto delegate_transaction = delegate->createTransaction();
|
||||
return std::make_shared<DiskEncryptedTransaction>(delegate_transaction, disk_path, *current_settings.get(), delegate.get());
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 getTotalSpace() const override
|
||||
{
|
||||
@ -342,6 +342,7 @@ private:
|
||||
const String disk_path;
|
||||
const String disk_absolute_path;
|
||||
MultiVersion<DiskEncryptedSettings> current_settings;
|
||||
bool use_fake_transaction;
|
||||
};
|
||||
|
||||
}
|
||||
|
123
src/Disks/DiskEncryptedTransaction.cpp
Normal file
123
src/Disks/DiskEncryptedTransaction.cpp
Normal file
@ -0,0 +1,123 @@
|
||||
#include <Disks/DiskEncryptedTransaction.h>
|
||||
#include <IO/FileEncryptionCommon.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <boost/algorithm/hex.hpp>
|
||||
#include <IO/ReadBufferFromEncryptedFile.h>
|
||||
#include <IO/ReadBufferFromFileDecorator.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromEncryptedFile.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int DATA_ENCRYPTION_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
FileEncryption::Header readHeader(ReadBufferFromFileBase & read_buffer)
|
||||
{
|
||||
try
|
||||
{
|
||||
FileEncryption::Header header;
|
||||
header.read(read_buffer);
|
||||
return header;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("While reading the header of encrypted file " + quoteString(read_buffer.getFileName()));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
String getCurrentKey(const String & path, const DiskEncryptedSettings & settings)
|
||||
{
|
||||
auto it = settings.keys.find(settings.current_key_id);
|
||||
if (it == settings.keys.end())
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR,
|
||||
"Not found a key with the current ID {} required to cipher file {}",
|
||||
settings.current_key_id,
|
||||
quoteString(path));
|
||||
|
||||
return it->second;
|
||||
}
|
||||
|
||||
String getKey(const String & path, const FileEncryption::Header & header, const DiskEncryptedSettings & settings)
|
||||
{
|
||||
auto it = settings.keys.find(header.key_id);
|
||||
if (it == settings.keys.end())
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR,
|
||||
"Not found a key with ID {} required to decipher file {}",
|
||||
header.key_id,
|
||||
quoteString(path));
|
||||
|
||||
String key = it->second;
|
||||
if (FileEncryption::calculateKeyHash(key) != header.key_hash)
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key with ID {}, could not decipher file {}", header.key_id, quoteString(path));
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
|
||||
{
|
||||
auto wrapped_from_path = wrappedPath(from_file_path);
|
||||
auto wrapped_to_path = wrappedPath(to_file_path);
|
||||
delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile(
|
||||
const std::string & path,
|
||||
size_t buf_size,
|
||||
WriteMode mode,
|
||||
const WriteSettings & settings,
|
||||
bool autocommit)
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
FileEncryption::Header header;
|
||||
String key;
|
||||
UInt64 old_file_size = 0;
|
||||
if (mode == WriteMode::Append && delegate_disk->exists(path))
|
||||
{
|
||||
old_file_size = delegate_disk->getFileSize(path);
|
||||
if (old_file_size)
|
||||
{
|
||||
/// Append mode: we continue to use the same header.
|
||||
auto read_buffer = delegate_disk->readFile(wrapped_path, ReadSettings().adjustBufferSize(FileEncryption::Header::kSize));
|
||||
header = readHeader(*read_buffer);
|
||||
key = getKey(path, header, current_settings);
|
||||
}
|
||||
}
|
||||
if (!old_file_size)
|
||||
{
|
||||
/// Rewrite mode: we generate a new header.
|
||||
key = getCurrentKey(path, current_settings);
|
||||
header.algorithm = current_settings.current_algorithm;
|
||||
header.key_id = current_settings.current_key_id;
|
||||
header.key_hash = FileEncryption::calculateKeyHash(key);
|
||||
header.init_vector = FileEncryption::InitVector::random();
|
||||
}
|
||||
auto buffer = delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings, autocommit);
|
||||
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header, old_file_size);
|
||||
|
||||
}
|
||||
void DiskEncryptedTransaction::writeFileUsingCustomWriteObject(
|
||||
const String &,
|
||||
WriteMode,
|
||||
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `writeFileUsingCustomWriteObject()` is not implemented");
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
242
src/Disks/DiskEncryptedTransaction.h
Normal file
242
src/Disks/DiskEncryptedTransaction.h
Normal file
@ -0,0 +1,242 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/IDiskTransaction.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/DiskCommitTransactionOptions.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace FileEncryption { enum class Algorithm; }
|
||||
|
||||
struct DiskEncryptedSettings
|
||||
{
|
||||
DiskPtr wrapped_disk;
|
||||
String disk_path;
|
||||
std::unordered_map<UInt64, String> keys;
|
||||
UInt64 current_key_id;
|
||||
FileEncryption::Algorithm current_algorithm;
|
||||
};
|
||||
|
||||
class DiskEncryptedTransaction : public IDiskTransaction
|
||||
{
|
||||
public:
|
||||
explicit DiskEncryptedTransaction(DiskTransactionPtr delegate_transaction_, const std::string & disk_path_, DiskEncryptedSettings current_settings_, IDisk * delegate_disk_)
|
||||
: delegate_transaction(delegate_transaction_)
|
||||
, disk_path(disk_path_)
|
||||
, current_settings(current_settings_)
|
||||
, delegate_disk(delegate_disk_)
|
||||
{}
|
||||
|
||||
/// Tries to commit all accumulated operations simultaneously.
|
||||
/// If something fails rollback and throw exception.
|
||||
void commit(const TransactionCommitOptionsVariant & options = NoCommitOptions{}) override // NOLINT
|
||||
{
|
||||
delegate_transaction->commit(options);
|
||||
}
|
||||
|
||||
void undo() override
|
||||
{
|
||||
delegate_transaction->undo();
|
||||
}
|
||||
|
||||
TransactionCommitOutcomeVariant tryCommit(const TransactionCommitOptionsVariant & options) override
|
||||
{
|
||||
return delegate_transaction->tryCommit(options);
|
||||
}
|
||||
|
||||
~DiskEncryptedTransaction() override = default;
|
||||
|
||||
/// Create directory.
|
||||
void createDirectory(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->createDirectory(wrapped_path);
|
||||
}
|
||||
|
||||
/// Create directory and all parent directories if necessary.
|
||||
void createDirectories(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->createDirectories(wrapped_path);
|
||||
}
|
||||
|
||||
/// Remove all files from the directory. Directories are not removed.
|
||||
void clearDirectory(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->clearDirectory(wrapped_path);
|
||||
}
|
||||
|
||||
/// Move directory from `from_path` to `to_path`.
|
||||
void moveDirectory(const std::string & from_path, const std::string & to_path) override
|
||||
{
|
||||
auto wrapped_from_path = wrappedPath(from_path);
|
||||
auto wrapped_to_path = wrappedPath(to_path);
|
||||
delegate_transaction->moveDirectory(wrapped_from_path, wrapped_to_path);
|
||||
}
|
||||
|
||||
void moveFile(const std::string & from_path, const std::string & to_path) override
|
||||
{
|
||||
auto wrapped_from_path = wrappedPath(from_path);
|
||||
auto wrapped_to_path = wrappedPath(to_path);
|
||||
delegate_transaction->moveFile(wrapped_from_path, wrapped_to_path);
|
||||
|
||||
}
|
||||
|
||||
void createFile(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->createFile(wrapped_path);
|
||||
}
|
||||
|
||||
/// Move the file from `from_path` to `to_path`.
|
||||
/// If a file with `to_path` path already exists, it will be replaced.
|
||||
void replaceFile(const std::string & from_path, const std::string & to_path) override
|
||||
{
|
||||
auto wrapped_from_path = wrappedPath(from_path);
|
||||
auto wrapped_to_path = wrappedPath(to_path);
|
||||
delegate_transaction->replaceFile(wrapped_from_path, wrapped_to_path);
|
||||
}
|
||||
|
||||
/// Only copy of several files supported now. Disk interface support copy to another disk
|
||||
/// but it's impossible to implement correctly in transactions because other disk can
|
||||
/// use different metadata storage.
|
||||
/// TODO: maybe remove it at all, we don't want copies
|
||||
void copyFile(const std::string & from_file_path, const std::string & to_file_path) override;
|
||||
|
||||
/// Open the file for write and return WriteBufferFromFileBase object.
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
||||
const std::string & path,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
WriteMode mode = WriteMode::Rewrite,
|
||||
const WriteSettings & settings = {},
|
||||
bool autocommit = true) override;
|
||||
|
||||
/// Write a file using a custom function to write an object to the disk's object storage.
|
||||
void writeFileUsingCustomWriteObject(
|
||||
const String & path,
|
||||
WriteMode mode,
|
||||
std::function<size_t(const StoredObject & object, WriteMode mode, const std::optional<ObjectAttributes> & object_attributes)>
|
||||
custom_write_object_function) override;
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or it's a directory.
|
||||
void removeFile(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeFile(wrapped_path);
|
||||
}
|
||||
|
||||
/// Remove file if it exists.
|
||||
void removeFileIfExists(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeFileIfExists(wrapped_path);
|
||||
}
|
||||
|
||||
/// Remove directory. Throws exception if it's not a directory or if directory is not empty.
|
||||
void removeDirectory(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeDirectory(wrapped_path);
|
||||
}
|
||||
|
||||
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
|
||||
void removeRecursive(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeRecursive(wrapped_path);
|
||||
}
|
||||
|
||||
/// Remove file. Throws exception if file doesn't exists or if directory is not empty.
|
||||
/// Differs from removeFile for S3/HDFS disks
|
||||
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
|
||||
void removeSharedFile(const std::string & path, bool keep_shared_data) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeSharedFile(wrapped_path, keep_shared_data);
|
||||
}
|
||||
|
||||
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
|
||||
/// Differs from removeRecursive for S3/HDFS disks
|
||||
/// Second bool param is a flag to remove (false) or keep (true) shared data on S3.
|
||||
/// Third param determines which files cannot be removed even if second is true.
|
||||
void removeSharedRecursive(const std::string & path, bool keep_all_shared_data, const NameSet & file_names_remove_metadata_only) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeSharedRecursive(wrapped_path, keep_all_shared_data, file_names_remove_metadata_only);
|
||||
}
|
||||
|
||||
/// Remove file or directory if it exists.
|
||||
/// Differs from removeFileIfExists for S3/HDFS disks
|
||||
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
|
||||
void removeSharedFileIfExists(const std::string & path, bool keep_shared_data) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->removeSharedFileIfExists(wrapped_path, keep_shared_data);
|
||||
}
|
||||
|
||||
/// Batch request to remove multiple files.
|
||||
/// May be much faster for blob storage.
|
||||
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3.
|
||||
/// Third param determines which files cannot be removed even if second is true.
|
||||
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override
|
||||
{
|
||||
for (const auto & file : files)
|
||||
{
|
||||
auto wrapped_path = wrappedPath(file.path);
|
||||
bool keep = keep_all_batch_data || file_names_remove_metadata_only.contains(fs::path(file.path).filename());
|
||||
if (file.if_exists)
|
||||
delegate_transaction->removeSharedFileIfExists(wrapped_path, keep);
|
||||
else
|
||||
delegate_transaction->removeSharedFile(wrapped_path, keep);
|
||||
}
|
||||
}
|
||||
|
||||
/// Set last modified time to file or directory at `path`.
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->setLastModified(wrapped_path, timestamp);
|
||||
}
|
||||
|
||||
/// Just chmod.
|
||||
void chmod(const String & path, mode_t mode) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->chmod(wrapped_path, mode);
|
||||
}
|
||||
|
||||
/// Set file at `path` as read-only.
|
||||
void setReadOnly(const std::string & path) override
|
||||
{
|
||||
auto wrapped_path = wrappedPath(path);
|
||||
delegate_transaction->setReadOnly(wrapped_path);
|
||||
}
|
||||
|
||||
/// Create hardlink from `src_path` to `dst_path`.
|
||||
void createHardLink(const std::string & src_path, const std::string & dst_path) override
|
||||
{
|
||||
auto wrapped_src_path = wrappedPath(src_path);
|
||||
auto wrapped_dst_path = wrappedPath(dst_path);
|
||||
delegate_transaction->createHardLink(wrapped_src_path, wrapped_dst_path);
|
||||
}
|
||||
|
||||
private:
|
||||
String wrappedPath(const String & path) const
|
||||
{
|
||||
// if path starts_with disk_path -> got already wrapped path
|
||||
if (!disk_path.empty() && path.starts_with(disk_path))
|
||||
return path;
|
||||
return disk_path + path;
|
||||
}
|
||||
|
||||
DiskTransactionPtr delegate_transaction;
|
||||
std::string disk_path;
|
||||
DiskEncryptedSettings current_settings;
|
||||
IDisk * delegate_disk;
|
||||
};
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user