Correct merge

This commit is contained in:
kssenii 2022-06-22 12:19:19 +02:00
parent 4178abc0a5
commit 86f34af32f
9 changed files with 27 additions and 687 deletions

View File

@ -599,7 +599,10 @@ catch (...)
DiskObjectStoragePtr DiskLocal::getObjectStorage(const String & name_)
{
auto object_storage = std::make_shared<LocalObjectStorage>();
auto metadata_storage = std::make_shared<MetadataStorageFromLocalDisk>(std::static_pointer_cast<DiskLocal>(shared_from_this()));
auto metadata_storage = std::make_shared<MetadataStorageFromLocalDisk>(
std::static_pointer_cast<DiskLocal>(shared_from_this()),
object_storage);
return std::make_shared<DiskObjectStorage>(
name_,
disk_path,

View File

@ -8,6 +8,7 @@
#include <Common/setThreadName.h>
#include <Disks/ObjectStorages/MetadataStorageFromRemoteDisk.h>
#include <Disks/ObjectStorages/MetadataStorageFromLocalDisk.h>
#include <Disks/ObjectStorages/LocalObjectStorage.h>
#include <Disks/FakeDiskTransaction.h>
namespace DB
@ -123,9 +124,14 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
MetadataStoragePtr IDisk::getMetadataStorage()
{
if (isRemote())
{
return std::make_shared<MetadataStorageFromRemoteDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), "");
}
else
return std::make_shared<MetadataStorageFromLocalDisk>(std::static_pointer_cast<IDisk>(shared_from_this()));
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<MetadataStorageFromLocalDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), object_storage);
}
}
}

View File

@ -85,7 +85,7 @@ DiskTransactionPtr DiskObjectStorage::createTransaction()
return std::make_shared<DiskObjectStorageTransaction>(
*object_storage,
*metadata_storage,
remote_fs_root_path,
object_storage_root_path,
send_metadata ? metadata_helper.get() : nullptr);
}

View File

@ -93,8 +93,6 @@ public:
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void removeFromObjectStorage(const std::vector<String> & paths);
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override;

View File

@ -96,6 +96,12 @@ struct RemoveObjectOperation final : public IDiskObjectStorageOperation
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path);
auto remote_objects = metadata_storage.getRemotePaths(path);
/// FIXME:!!!
String full_path = fs::path(metadata_storage.getPath()) / path;
bool is_remote = object_storage.isRemote();
if (!is_remote)
object_storage.removeCacheIfExists(full_path);
tx->unlinkMetadata(path);
if (hardlink_count == 0)
@ -132,7 +138,7 @@ struct RemoveObjectOperation final : public IDiskObjectStorageOperation
if (remove_from_cache)
{
for (const auto & path_to_remove : paths_to_remove)
object_storage.removeFromCache(path_to_remove);
object_storage.removeCacheIfExists(path_to_remove);
}
}
@ -213,7 +219,7 @@ struct RemoveRecursiveOperation final : public IDiskObjectStorageOperation
void finalize() override
{
if (!keep_all_batch_data)
if (!keep_all_batch_data && object_storage.isRemote())
{
std::vector<std::string> remove_from_remote;
for (auto && [local_path, remote_paths] : paths_to_remove)
@ -227,7 +233,7 @@ struct RemoveRecursiveOperation final : public IDiskObjectStorageOperation
}
for (const auto & path_to_remove : path_to_remove_from_cache)
object_storage.removeFromCache(path_to_remove);
object_storage.removeCacheIfExists(path_to_remove);
}
};
@ -273,16 +279,13 @@ struct ReplaceFileOperation final : public IDiskObjectStorageOperation
struct WriteFileOperation final : public IDiskObjectStorageOperation
{
std::string path;
std::string blob_path;
WriteFileOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_,
const std::string & blob_path_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, blob_path(blob_path_)
{}
@ -494,7 +497,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
auto blob_path = fs::path(remote_fs_root_path) / blob_name;
operations_to_execute.emplace_back(std::make_unique<WriteFileOperation>(object_storage, metadata_storage, path, blob_path));
operations_to_execute.emplace_back(std::make_unique<WriteFileOperation>(object_storage, metadata_storage, blob_path));
auto create_metadata_callback = [tx = shared_from_this(), this, mode, path, blob_name, autocommit] (size_t count)
{

View File

@ -1,672 +0,0 @@
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <ranges>
#include <filesystem>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/getRandomASCIIString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FS_METADATA_ERROR;
}
std::string toString(MetadataFromDiskTransactionState state)
{
switch (state)
{
case MetadataFromDiskTransactionState::PREPARING:
return "PREPARING";
case MetadataFromDiskTransactionState::FAILED:
return "FAILED";
case MetadataFromDiskTransactionState::COMMITTED:
return "COMMITTED";
case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK:
return "PARTIALLY_ROLLED_BACK";
}
__builtin_unreachable();
}
namespace
{
std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString();
}
class SetLastModifiedOperation final : public IMetadataOperation
{
std::string path;
Poco::Timestamp new_timestamp;
Poco::Timestamp old_timestamp;
IDisk & disk;
public:
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_)
: path(path_)
, new_timestamp(new_timestamp_)
, disk(disk_)
{}
void execute() override
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
}
void undo() override
{
disk.setLastModified(path, old_timestamp);
}
};
class UnlinkFileOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string prev_data;
public:
UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
disk.removeFile(path);
}
void undo() override
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
buf->finalize();
}
};
class CreateDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
CreateDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
disk.createDirectory(path);
}
void undo() override
{
disk.removeDirectory(path);
}
};
class CreateDirectoryRecursiveOperation final : public IMetadataOperation
{
private:
std::string path;
std::vector<std::string> paths_created;
IDisk & disk;
public:
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
namespace fs = std::filesystem;
fs::path p(path);
while (!disk.exists(p))
{
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
for (const auto & path_to_create : paths_created | std::views::reverse)
disk.createDirectory(path_to_create);
}
void undo() override
{
for (const auto & path_created : paths_created)
disk.removeDirectory(path_created);
}
};
class RemoveDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{}
void execute() override
{
disk.removeDirectory(path);
}
void undo() override
{
disk.createDirectory(path);
}
};
class RemoveRecursiveOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string temp_path;
public:
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_path(getTempFileName(fs::path(path).parent_path()))
{
}
void execute() override
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
else if (disk.isDirectory(path))
disk.moveDirectory(path, temp_path);
}
void undo() override
{
if (disk.isFile(temp_path))
disk.moveFile(temp_path, path);
else if (disk.isDirectory(temp_path))
disk.moveDirectory(temp_path, path);
}
void finalize() override
{
if (disk.exists(temp_path))
disk.removeRecursive(temp_path);
if (disk.exists(path))
disk.removeRecursive(path);
}
};
class CreateHardlinkOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.createHardLink(path_from, path_to);
}
void undo() override
{
disk.removeFile(path_to);
}
};
class MoveFileOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
}
};
class MoveDirectoryOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveDirectory(path_from, path_to);
}
void undo() override
{
disk.moveDirectory(path_to, path_from);
}
};
class ReplaceFileOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
std::string temp_path_to;
public:
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
, temp_path_to(getTempFileName(fs::path(path_to).parent_path()))
{
}
void execute() override
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
disk.replaceFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
disk.moveFile(temp_path_to, path_to);
}
void finalize() override
{
disk.removeFileIfExists(temp_path_to);
}
};
class WriteFileOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
std::string data;
bool existed = false;
std::string prev_data;
public:
WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_)
: path(path_)
, disk(disk_)
, data(data_)
{}
void execute() override
{
if (disk.exists(path))
{
existed = true;
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
}
auto buf = disk.writeFile(path);
writeString(data, *buf);
buf->finalize();
}
void undo() override
{
if (!existed)
disk.removeFileIfExists(path);
else
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
}
}
};
}
void MetadataStorageFromDiskTransaction::writeStringToFile( /// NOLINT
const std::string & path,
const std::string & data)
{
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage.disk, data));
}
void MetadataStorageFromDiskTransaction::addOperation(MetadataOperationPtr && operation)
{
if (state != MetadataFromDiskTransactionState::PREPARING)
throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot add operations to transaction in {} state, it should be in {} state",
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
operations.emplace_back(std::move(operation));
}
void MetadataStorageFromDiskTransaction::commit()
{
if (state != MetadataFromDiskTransactionState::PREPARING)
throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot commit transaction in {} state, it should be in {} state",
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
{
std::unique_lock lock(metadata_storage.metadata_mutex);
for (size_t i = 0; i < operations.size(); ++i)
{
try
{
operations[i]->execute();
}
catch (Exception & ex)
{
ex.addMessage(fmt::format("While committing metadata operation #{}", i));
state = MetadataFromDiskTransactionState::FAILED;
rollback(i);
throw;
}
}
}
/// Do it in "best effort" mode
for (size_t i = 0; i < operations.size(); ++i)
{
try
{
operations[i]->finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to finalize operation #{}", i));
}
}
state = MetadataFromDiskTransactionState::COMMITTED;
}
void MetadataStorageFromDiskTransaction::rollback(size_t until_pos)
{
/// Otherwise everything is alright
if (state == MetadataFromDiskTransactionState::FAILED)
{
for (int64_t i = until_pos; i >= 0; --i)
{
try
{
operations[i]->undo();
}
catch (Exception & ex)
{
state = MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK;
ex.addMessage(fmt::format("While rolling back operation #{}", i));
throw;
}
}
}
else
{
/// Nothing to do, transaction committed or not even started to commit
}
}
const std::string & MetadataStorageFromDisk::getPath() const
{
return disk->getPath();
}
bool MetadataStorageFromDisk::exists(const std::string & path) const
{
return disk->exists(path);
}
bool MetadataStorageFromDisk::isFile(const std::string & path) const
{
return disk->isFile(path);
}
bool MetadataStorageFromDisk::isDirectory(const std::string & path) const
{
return disk->isDirectory(path);
}
Poco::Timestamp MetadataStorageFromDisk::getLastModified(const std::string & path) const
{
return disk->getLastModified(path);
}
time_t MetadataStorageFromDisk::getLastChanged(const std::string & path) const
{
return disk->getLastChanged(path);
}
uint64_t MetadataStorageFromDisk::getFileSize(const String & path) const
{
auto metadata = readMetadata(path);
return metadata->getTotalSizeBytes();
}
std::vector<std::string> MetadataStorageFromDisk::listDirectory(const std::string & path) const
{
std::vector<std::string> result_files;
disk->listFiles(path, result_files);
return result_files;
}
DirectoryIteratorPtr MetadataStorageFromDisk::iterateDirectory(const std::string & path) const
{
return disk->iterateDirectory(path);
}
std::string MetadataStorageFromDisk::readFileToString(const std::string & path) const
{
auto buf = disk->readFile(path);
std::string result;
readStringUntilEOF(result, *buf);
return result;
}
void MetadataStorageFromDiskTransaction::createEmptyMetadataFile(const std::string & path)
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromDiskTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp)
{
addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::unlinkFile(const std::string & path)
{
addOperation(std::make_unique<UnlinkFileOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::removeRecursive(const std::string & path)
{
addOperation(std::make_unique<RemoveRecursiveOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::createDirectory(const std::string & path)
{
addOperation(std::make_unique<CreateDirectoryOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::createDicrectoryRecursive(const std::string & path)
{
addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::removeDirectory(const std::string & path)
{
addOperation(std::make_unique<RemoveDirectoryOperation>(path, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::createHardLink(const std::string & path_from, const std::string & path_to)
{
auto metadata = metadata_storage.readMetadata(path_from);
metadata->incrementRefCount();
writeStringToFile(path_from, metadata->serializeToString());
addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::moveFile(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::moveDirectory(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::replaceFile(const std::string & path_from, const std::string & path_to)
{
addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *metadata_storage.disk));
}
void MetadataStorageFromDiskTransaction::setReadOnly(const std::string & path)
{
auto metadata = metadata_storage.readMetadata(path);
metadata->setReadOnly();
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromDiskTransaction::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
{
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
metadata->addObject(blob_name, size_in_bytes);
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromDiskTransaction::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
{
DiskObjectStorageMetadataPtr metadata;
if (metadata_storage.exists(path))
{
metadata = metadata_storage.readMetadata(path);
}
else
{
metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
}
metadata->addObject(blob_name, size_in_bytes);
writeStringToFile(path, metadata->serializeToString());
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
auto str = readFileToString(path);
metadata->deserializeFromString(str);
return metadata;
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadata(const std::string & path) const
{
std::shared_lock lock(metadata_mutex);
return readMetadataUnlocked(path, lock);
}
std::unordered_map<String, String> MetadataStorageFromDisk::getSerializedMetadata(const std::vector<String> & file_paths) const
{
std::shared_lock lock(metadata_mutex);
std::unordered_map<String, String> metadatas;
for (const auto & path : file_paths)
{
auto metadata = readMetadataUnlocked(path, lock);
metadata->resetRefCount();
WriteBufferFromOwnString buf;
metadata->serialize(buf, false);
metadatas[path] = buf.str();
}
return metadatas;
}
MetadataTransactionPtr MetadataStorageFromDisk::createTransaction() const
{
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
}
std::vector<std::string> MetadataStorageFromDisk::getRemotePaths(const std::string & path) const
{
auto metadata = readMetadata(path);
std::vector<std::string> remote_paths;
auto blobs = metadata->getBlobs();
auto root_path = metadata->getBlobsCommonPrefix();
remote_paths.reserve(blobs.size());
for (const auto & [remote_path, _] : blobs)
remote_paths.push_back(fs::path(root_path) / remote_path);
return remote_paths;
}
uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) const
{
auto metadata = readMetadata(path);
return metadata->getRefCount();
}
BlobsPathToSize MetadataStorageFromDisk::getBlobs(const std::string & path) const
{
auto metadata = readMetadata(path);
return metadata->getBlobs();
}
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
{
auto metadata = metadata_storage.readMetadata(path);
uint32_t ref_count = metadata->getRefCount();
if (ref_count != 0)
{
metadata->decrementRefCount();
writeStringToFile(path, metadata->serializeToString());
}
unlinkFile(path);
}
}

View File

@ -61,7 +61,7 @@ void MetadataStorageFromDiskTransaction::commit()
catch (Exception & ex)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
ex.addMessage(fmt::format("While committing operation #{}", i));
ex.addMessage(fmt::format("While committing metadata operation #{}", i));
state = MetadataFromDiskTransactionState::FAILED;
rollback(i);
throw;

View File

@ -10,8 +10,9 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
MetadataStorageFromLocalDisk::MetadataStorageFromLocalDisk(DiskPtr disk_)
MetadataStorageFromLocalDisk::MetadataStorageFromLocalDisk(DiskPtr disk_, ObjectStoragePtr object_storage_)
: disk(disk_)
, object_storage(object_storage_)
{
}

View File

@ -12,7 +12,7 @@ class MetadataStorageFromLocalDisk : public IMetadataStorage
{
public:
explicit MetadataStorageFromLocalDisk(DiskPtr disk_);
explicit MetadataStorageFromLocalDisk(DiskPtr disk_, ObjectStoragePtr object_storage_);
MetadataTransactionPtr createTransaction() const override;
@ -48,6 +48,7 @@ public:
private:
DiskPtr disk;
ObjectStoragePtr object_storage;
};
class MetadataStorageFromLocalDiskTransaction final : public MetadataStorageFromDiskTransaction