mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
First try new interface
This commit is contained in:
parent
81dca513de
commit
56a57e649f
@ -175,7 +175,7 @@ bool DiskObjectStorage::isFile(const String & path) const
|
||||
void DiskObjectStorage::createFile(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createEmptyMetadataFile(path, tx);
|
||||
tx->createEmptyMetadataFile(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -205,7 +205,7 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
|
||||
}
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->moveFile(from_path, to_path, tx);
|
||||
tx->moveFile(from_path, to_path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ void DiskObjectStorage::replaceFile(const String & from_path, const String & to_
|
||||
auto blobs = metadata_storage->getRemotePaths(to_path);
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->replaceFile(from_path, to_path, tx);
|
||||
tx->replaceFile(from_path, to_path);
|
||||
tx->commit();
|
||||
|
||||
removeFromRemoteFS(blobs);
|
||||
@ -292,7 +292,7 @@ void DiskObjectStorage::createHardLink(const String & src_path, const String & d
|
||||
|
||||
/// Create FS hardlink to metadata file.
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createHardLink(src_path, dst_path, tx);
|
||||
tx->createHardLink(src_path, dst_path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -307,7 +307,7 @@ void DiskObjectStorage::setReadOnly(const String & path)
|
||||
/// We should store read only flag inside metadata file (instead of using FS flag),
|
||||
/// because we modify metadata file when create hard-links from it.
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->setReadOnly(path, tx);
|
||||
tx->setReadOnly(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -321,7 +321,7 @@ bool DiskObjectStorage::isDirectory(const String & path) const
|
||||
void DiskObjectStorage::createDirectory(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createDirectory(path, tx);
|
||||
tx->createDirectory(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -329,7 +329,7 @@ void DiskObjectStorage::createDirectory(const String & path)
|
||||
void DiskObjectStorage::createDirectories(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createDicrectoryRecursive(path, tx);
|
||||
tx->createDicrectoryRecursive(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -345,7 +345,7 @@ void DiskObjectStorage::clearDirectory(const String & path)
|
||||
void DiskObjectStorage::removeDirectory(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->removeDirectory(path, tx);
|
||||
tx->removeDirectory(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -366,7 +366,7 @@ void DiskObjectStorage::listFiles(const String & path, std::vector<String> & fil
|
||||
void DiskObjectStorage::setLastModified(const String & path, const Poco::Timestamp & timestamp)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->setLastModified(path, timestamp, tx);
|
||||
tx->setLastModified(path, timestamp);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -391,8 +391,9 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
|
||||
{
|
||||
auto remote_objects = metadata_storage->getRemotePaths(path);
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
uint32_t hardlink_count = metadata_storage->unlinkAndGetHardlinkCount(path, tx);
|
||||
tx->unlinkMetadata(path);
|
||||
tx->commit();
|
||||
uint32_t hardlink_count = metadata_storage->getHardlinkCount(path);
|
||||
|
||||
if (hardlink_count == 0)
|
||||
{
|
||||
@ -413,7 +414,7 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
|
||||
"It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", path);
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->unlinkFile(path, tx);
|
||||
tx->unlinkFile(path);
|
||||
tx->commit();
|
||||
}
|
||||
else
|
||||
@ -436,7 +437,7 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
|
||||
removeMetadataRecursive(it->path(), paths_to_remove);
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->removeDirectory(path, tx);
|
||||
tx->removeDirectory(path);
|
||||
tx->commit();
|
||||
}
|
||||
}
|
||||
@ -556,9 +557,10 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
if (mode == WriteMode::Rewrite)
|
||||
metadata_storage->createMetadataFile(path, blob_name, count, tx);
|
||||
tx->createMetadataFile(path, blob_name, count);
|
||||
else
|
||||
metadata_storage->addBlobToMetadata(path, blob_name, count, tx);
|
||||
tx->addBlobToMetadata(path, blob_name, count);
|
||||
|
||||
tx->commit();
|
||||
};
|
||||
|
||||
|
@ -223,7 +223,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::Abs
|
||||
restoreFileOperations(source_object_storage, information);
|
||||
|
||||
auto tx = disk->metadata_storage->createTransaction();
|
||||
disk->metadata_storage->unlinkFile(RESTORE_FILE_NAME, tx);
|
||||
tx->unlinkFile(RESTORE_FILE_NAME);
|
||||
tx->commit();
|
||||
|
||||
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
|
||||
@ -424,7 +424,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles(IObjectSt
|
||||
source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage);
|
||||
|
||||
auto tx = disk->metadata_storage->createTransaction();
|
||||
disk->metadata_storage->addBlobToMetadata(path, relative_key, meta.size_bytes, tx);
|
||||
tx->addBlobToMetadata(path, relative_key, meta.size_bytes);
|
||||
tx->commit();
|
||||
|
||||
LOG_TRACE(disk->log, "Restored file {}", path);
|
||||
@ -438,7 +438,7 @@ void DiskObjectStorage::onFreeze(const String & path)
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
WriteBufferFromOwnString revision_file_buf ;
|
||||
writeIntText(metadata_helper->revision_counter.load(), revision_file_buf);
|
||||
metadata_storage->writeMetadataToFile(path + "revision.txt", tx, revision_file_buf.str());
|
||||
tx->writeMetadataToFile(path + "revision.txt", revision_file_buf.str());
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -558,10 +558,10 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
||||
|
||||
/// to_path may exist and non-empty in case for example abrupt restart, so remove it before rename
|
||||
if (disk->metadata_storage->exists(to_path))
|
||||
disk->metadata_storage->removeRecursive(to_path, tx);
|
||||
tx->removeRecursive(to_path);
|
||||
|
||||
disk->createDirectories(directoryPath(to_path));
|
||||
disk->metadata_storage->moveDirectory(from_path, to_path, tx);
|
||||
tx->moveDirectory(from_path, to_path);
|
||||
}
|
||||
tx->commit();
|
||||
}
|
||||
|
@ -25,12 +25,51 @@ struct IMetadataOperation
|
||||
|
||||
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
|
||||
|
||||
class IMetadataStorage;
|
||||
|
||||
struct IMetadataTransaction : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
virtual void addOperation(MetadataOperationPtr && operation) = 0;
|
||||
virtual void commit() = 0;
|
||||
virtual void rollback() = 0;
|
||||
|
||||
virtual const IMetadataStorage & getStorageForNonTransactionalReads() const = 0;
|
||||
|
||||
/// Create empty file in metadata storage
|
||||
virtual void createEmptyMetadataFile(const std::string & path) = 0;
|
||||
|
||||
/// Write metadata string to file
|
||||
virtual void writeMetadataToFile(const std::string & path, const std::string & data) = 0;
|
||||
|
||||
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
|
||||
|
||||
virtual void setReadOnly(const std::string & path) = 0;
|
||||
|
||||
virtual void unlinkFile(const std::string & path) = 0;
|
||||
|
||||
virtual void createDirectory(const std::string & path) = 0;
|
||||
|
||||
virtual void createDicrectoryRecursive(const std::string & path) = 0;
|
||||
|
||||
virtual void removeDirectory(const std::string & path) = 0;
|
||||
|
||||
virtual void removeRecursive(const std::string & path) = 0;
|
||||
|
||||
virtual void createHardLink(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
virtual void moveFile(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
virtual void moveDirectory(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
virtual void replaceFile(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
/// Create metadata file on paths with content (blob_name, size_in_bytes)
|
||||
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
||||
|
||||
/// Add to new blob to metadata file (way to implement appends)
|
||||
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
||||
|
||||
/// Unlink file and return amount of hardlinks left
|
||||
virtual void unlinkMetadata(const std::string & path) = 0;
|
||||
|
||||
virtual ~IMetadataTransaction() = default;
|
||||
};
|
||||
@ -48,49 +87,26 @@ public:
|
||||
virtual MetadataTransactionPtr createTransaction() const = 0;
|
||||
|
||||
virtual const std::string & getPath() const = 0;
|
||||
|
||||
virtual bool exists(const std::string & path) const = 0;
|
||||
|
||||
virtual bool isFile(const std::string & path) const = 0;
|
||||
|
||||
virtual bool isDirectory(const std::string & path) const = 0;
|
||||
|
||||
virtual uint64_t getFileSize(const std::string & path) const = 0;
|
||||
|
||||
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
|
||||
|
||||
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
|
||||
|
||||
virtual DirectoryIteratorPtr iterateDirectory(const std::string & path) = 0;
|
||||
|
||||
virtual uint32_t getHardlinkCount(const std::string & path) const = 0;
|
||||
|
||||
/// Create empty file in metadata storage
|
||||
virtual void createEmptyMetadataFile(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
/// Read metadata file to string from path
|
||||
virtual std::string readMetadataFileToString(const std::string & path) const = 0;
|
||||
|
||||
/// Write metadata string to file
|
||||
virtual void writeMetadataToFile(
|
||||
const std::string & path,
|
||||
MetadataTransactionPtr transaction,
|
||||
const std::string & data) = 0;
|
||||
|
||||
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void setReadOnly(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void unlinkFile(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void createDirectory(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual ~IMetadataStorage() = default;
|
||||
|
||||
@ -104,15 +120,6 @@ public:
|
||||
|
||||
/// Return [(remote_path, size_in_bytes), ...] for metadata path
|
||||
virtual BlobsPathToSize getBlobs(const std::string & path) const = 0;
|
||||
|
||||
/// Create metadata file on paths with content (blob_name, size_in_bytes)
|
||||
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
/// Add to new blob to metadata file (way to implement appends)
|
||||
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
/// Unlink file and return amount of hardlinks left
|
||||
virtual uint32_t unlinkAndGetHardlinkCount(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
};
|
||||
|
||||
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
|
||||
|
@ -27,8 +27,6 @@ std::string toString(MetadataFromDiskTransactionState state)
|
||||
return "FAILED";
|
||||
case MetadataFromDiskTransactionState::COMMITTED:
|
||||
return "COMMITTED";
|
||||
case MetadataFromDiskTransactionState::ROLLED_BACK:
|
||||
return "ROLLED_BACK";
|
||||
case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK:
|
||||
return "PARTIALLY_ROLLED_BACK";
|
||||
}
|
||||
@ -192,7 +190,7 @@ public:
|
||||
{
|
||||
if (disk.isFile(path))
|
||||
disk.moveFile(path, temp_path);
|
||||
if (disk.isDirectory(path))
|
||||
else if (disk.isDirectory(path))
|
||||
disk.moveDirectory(path, temp_path);
|
||||
}
|
||||
|
||||
@ -366,12 +364,11 @@ public:
|
||||
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::writeMetadataToFile( /// NOLINT
|
||||
void MetadataStorageFromDiskTransaction::writeMetadataToFile( /// NOLINT
|
||||
const std::string & path,
|
||||
MetadataTransactionPtr transaction,
|
||||
const std::string & data)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<WriteFileOperation>(path, *disk, data));
|
||||
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage.disk, data));
|
||||
}
|
||||
|
||||
|
||||
@ -391,7 +388,7 @@ void MetadataStorageFromDiskTransaction::commit()
|
||||
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
|
||||
|
||||
{
|
||||
std::unique_lock lock(commit_mutex);
|
||||
std::unique_lock lock(metadata_storage.metadata_mutex);
|
||||
for (size_t i = 0; i < operations.size(); ++i)
|
||||
{
|
||||
try
|
||||
@ -401,8 +398,8 @@ void MetadataStorageFromDiskTransaction::commit()
|
||||
catch (Exception & ex)
|
||||
{
|
||||
ex.addMessage(fmt::format("While committing operation #{}", i));
|
||||
failed_operation_index = i;
|
||||
state = MetadataFromDiskTransactionState::FAILED;
|
||||
rollback(i);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@ -424,19 +421,15 @@ void MetadataStorageFromDiskTransaction::commit()
|
||||
state = MetadataFromDiskTransactionState::COMMITTED;
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::rollback()
|
||||
void MetadataStorageFromDiskTransaction::rollback(size_t until_pos)
|
||||
{
|
||||
/// Otherwise everything is alright
|
||||
if (state == MetadataFromDiskTransactionState::FAILED)
|
||||
{
|
||||
if (!failed_operation_index.has_value())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction in failed state, but has not failed operations. It's a bug");
|
||||
|
||||
for (int64_t i = failed_operation_index.value(); i >= 0; --i)
|
||||
for (int64_t i = until_pos; i >= 0; --i)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::unique_lock lock(commit_mutex);
|
||||
operations[i]->undo();
|
||||
}
|
||||
catch (Exception & ex)
|
||||
@ -451,23 +444,6 @@ void MetadataStorageFromDiskTransaction::rollback()
|
||||
{
|
||||
/// Nothing to do, transaction committed or not even started to commit
|
||||
}
|
||||
|
||||
state = MetadataFromDiskTransactionState::ROLLED_BACK;
|
||||
}
|
||||
|
||||
MetadataStorageFromDiskTransaction::~MetadataStorageFromDiskTransaction()
|
||||
{
|
||||
if (state == MetadataFromDiskTransactionState::FAILED)
|
||||
{
|
||||
try
|
||||
{
|
||||
rollback();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const std::string & MetadataStorageFromDisk::getPath() const
|
||||
@ -523,96 +499,96 @@ std::string MetadataStorageFromDisk::readMetadataFileToString(const std::string
|
||||
return result;
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createEmptyMetadataFile(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createEmptyMetadataFile(const std::string & path)
|
||||
{
|
||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
|
||||
writeMetadataToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *disk));
|
||||
addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::unlinkFile(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::unlinkFile(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<UnlinkFileOperation>(path, *disk));
|
||||
addOperation(std::make_unique<UnlinkFileOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::removeRecursive(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::removeRecursive(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<RemoveRecursiveOperation>(path, *disk));
|
||||
addOperation(std::make_unique<RemoveRecursiveOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createDirectory(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createDirectory(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<CreateDirectoryOperation>(path, *disk));
|
||||
addOperation(std::make_unique<CreateDirectoryOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createDicrectoryRecursive(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *disk));
|
||||
addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::removeDirectory(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::removeDirectory(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<RemoveDirectoryOperation>(path, *disk));
|
||||
addOperation(std::make_unique<RemoveDirectoryOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createHardLink(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
auto metadata = readMetadata(path_from);
|
||||
auto metadata = metadata_storage.readMetadata(path_from);
|
||||
|
||||
metadata->incrementRefCount();
|
||||
|
||||
writeMetadataToFile(path_from, transaction, metadata->serializeToString());
|
||||
writeMetadataToFile(path_from, metadata->serializeToString());
|
||||
|
||||
transaction->addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::moveFile(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::moveDirectory(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::replaceFile(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::setReadOnly(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::setReadOnly(const std::string & path)
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
auto metadata = metadata_storage.readMetadata(path);
|
||||
metadata->setReadOnly();
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeMetadataToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
||||
{
|
||||
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
|
||||
metadata->addObject(blob_name, size_in_bytes);
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeMetadataToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
||||
{
|
||||
DiskObjectStorageMetadataPtr metadata;
|
||||
if (exists(path))
|
||||
if (metadata_storage.exists(path))
|
||||
{
|
||||
metadata = readMetadata(path);
|
||||
metadata = metadata_storage.readMetadata(path);
|
||||
}
|
||||
else
|
||||
{
|
||||
metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
|
||||
}
|
||||
|
||||
metadata->addObject(blob_name, size_in_bytes);
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeMetadataToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> &) const
|
||||
@ -646,6 +622,11 @@ std::unordered_map<String, String> MetadataStorageFromDisk::getSerializedMetadat
|
||||
return metadatas;
|
||||
}
|
||||
|
||||
MetadataTransactionPtr MetadataStorageFromDisk::createTransaction() const
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromDisk::getRemotePaths(const std::string & path) const
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
@ -666,25 +647,22 @@ uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) con
|
||||
return metadata->getRefCount();
|
||||
}
|
||||
|
||||
|
||||
BlobsPathToSize MetadataStorageFromDisk::getBlobs(const std::string & path) const
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
return metadata->getBlobs();
|
||||
}
|
||||
|
||||
|
||||
uint32_t MetadataStorageFromDisk::unlinkAndGetHardlinkCount(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
auto metadata = metadata_storage.readMetadata(path);
|
||||
uint32_t ref_count = metadata->getRefCount();
|
||||
if (ref_count != 0)
|
||||
{
|
||||
metadata->decrementRefCount();
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeMetadataToFile(path, metadata->serializeToString());
|
||||
}
|
||||
unlinkFile(path, transaction);
|
||||
return ref_count;
|
||||
unlinkFile(path);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,35 +13,16 @@ enum class MetadataFromDiskTransactionState
|
||||
PREPARING,
|
||||
FAILED,
|
||||
COMMITTED,
|
||||
ROLLED_BACK,
|
||||
PARTIALLY_ROLLED_BACK,
|
||||
};
|
||||
|
||||
std::string toString(MetadataFromDiskTransactionState state);
|
||||
|
||||
struct MetadataStorageFromDiskTransaction final : public IMetadataTransaction
|
||||
{
|
||||
private:
|
||||
std::optional<size_t> failed_operation_index;
|
||||
|
||||
std::shared_mutex & commit_mutex;
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
|
||||
public:
|
||||
explicit MetadataStorageFromDiskTransaction(std::shared_mutex & commit_mutex_)
|
||||
: commit_mutex(commit_mutex_)
|
||||
{}
|
||||
|
||||
void addOperation(MetadataOperationPtr && operation) override;
|
||||
void commit() override;
|
||||
void rollback() override;
|
||||
|
||||
~MetadataStorageFromDiskTransaction() override;
|
||||
};
|
||||
|
||||
class MetadataStorageFromDisk final : public IMetadataStorage
|
||||
{
|
||||
private:
|
||||
friend struct MetadataStorageFromDiskTransaction;
|
||||
|
||||
DiskPtr disk;
|
||||
std::string root_path_for_remote_metadata;
|
||||
mutable std::shared_mutex metadata_mutex;
|
||||
@ -53,10 +34,7 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
MetadataTransactionPtr createTransaction() const override
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromDiskTransaction>(metadata_mutex);
|
||||
}
|
||||
MetadataTransactionPtr createTransaction() const override;
|
||||
|
||||
const std::string & getPath() const override;
|
||||
|
||||
@ -84,45 +62,69 @@ public:
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||
|
||||
void writeMetadataToFile(
|
||||
const std::string & path,
|
||||
MetadataTransactionPtr transaction,
|
||||
const std::string & data) override;
|
||||
|
||||
void createEmptyMetadataFile(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void setReadOnly(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void unlinkFile(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createDirectory(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
uint32_t unlinkAndGetHardlinkCount(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
private:
|
||||
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
|
||||
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> & lock) const;
|
||||
};
|
||||
|
||||
struct MetadataStorageFromDiskTransaction final : public IMetadataTransaction
|
||||
{
|
||||
private:
|
||||
const MetadataStorageFromDisk & metadata_storage;
|
||||
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
|
||||
|
||||
void addOperation(MetadataOperationPtr && operation);
|
||||
void rollback(size_t until_pos);
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromDiskTransaction(const MetadataStorageFromDisk & metadata_storage_)
|
||||
: metadata_storage(metadata_storage_)
|
||||
{}
|
||||
|
||||
const IMetadataStorage & getStorageForNonTransactionalReads() const override
|
||||
{
|
||||
return metadata_storage;
|
||||
}
|
||||
|
||||
void commit() override;
|
||||
|
||||
void writeMetadataToFile(const std::string & path, const std::string & data) override;
|
||||
|
||||
void createEmptyMetadataFile(const std::string & path) override;
|
||||
|
||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
|
||||
|
||||
void setReadOnly(const std::string & path) override;
|
||||
|
||||
void unlinkFile(const std::string & path) override;
|
||||
|
||||
void createDirectory(const std::string & path) override;
|
||||
|
||||
void createDicrectoryRecursive(const std::string & path) override;
|
||||
|
||||
void removeDirectory(const std::string & path) override;
|
||||
|
||||
void removeRecursive(const std::string & path) override;
|
||||
|
||||
void createHardLink(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void replaceFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void unlinkMetadata(const std::string & path) override;
|
||||
|
||||
~MetadataStorageFromDiskTransaction() override = default;
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
@ -8165,7 +8165,7 @@ public:
|
||||
writeString(table_shared_id, buffer);
|
||||
buffer.write("\n", 1);
|
||||
|
||||
metadata_storage->writeMetadataToFile(file_path, tx, buffer.str());
|
||||
tx->writeMetadataToFile(file_path, buffer.str());
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -8205,7 +8205,7 @@ public:
|
||||
if (metadata_storage->exists(fname))
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->unlinkFile(fname, tx);
|
||||
tx->unlinkFile(fname);
|
||||
tx->commit();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user