mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge branch 'metadata_storage' of github.com:ClickHouse/ClickHouse into metadata_storage
This commit is contained in:
commit
5c3846f421
@ -175,7 +175,7 @@ bool DiskObjectStorage::isFile(const String & path) const
|
||||
void DiskObjectStorage::createFile(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createEmptyMetadataFile(path, tx);
|
||||
tx->createEmptyMetadataFile(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -205,7 +205,7 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
|
||||
}
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->moveFile(from_path, to_path, tx);
|
||||
tx->moveFile(from_path, to_path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ void DiskObjectStorage::replaceFile(const String & from_path, const String & to_
|
||||
auto blobs = metadata_storage->getRemotePaths(to_path);
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->replaceFile(from_path, to_path, tx);
|
||||
tx->replaceFile(from_path, to_path);
|
||||
tx->commit();
|
||||
|
||||
removeFromRemoteFS(blobs);
|
||||
@ -292,7 +292,7 @@ void DiskObjectStorage::createHardLink(const String & src_path, const String & d
|
||||
|
||||
/// Create FS hardlink to metadata file.
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createHardLink(src_path, dst_path, tx);
|
||||
tx->createHardLink(src_path, dst_path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -307,7 +307,7 @@ void DiskObjectStorage::setReadOnly(const String & path)
|
||||
/// We should store read only flag inside metadata file (instead of using FS flag),
|
||||
/// because we modify metadata file when create hard-links from it.
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->setReadOnly(path, tx);
|
||||
tx->setReadOnly(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -321,7 +321,7 @@ bool DiskObjectStorage::isDirectory(const String & path) const
|
||||
void DiskObjectStorage::createDirectory(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createDirectory(path, tx);
|
||||
tx->createDirectory(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -329,7 +329,7 @@ void DiskObjectStorage::createDirectory(const String & path)
|
||||
void DiskObjectStorage::createDirectories(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->createDicrectoryRecursive(path, tx);
|
||||
tx->createDicrectoryRecursive(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -345,7 +345,7 @@ void DiskObjectStorage::clearDirectory(const String & path)
|
||||
void DiskObjectStorage::removeDirectory(const String & path)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->removeDirectory(path, tx);
|
||||
tx->removeDirectory(path);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -366,7 +366,7 @@ void DiskObjectStorage::listFiles(const String & path, std::vector<String> & fil
|
||||
void DiskObjectStorage::setLastModified(const String & path, const Poco::Timestamp & timestamp)
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->setLastModified(path, timestamp, tx);
|
||||
tx->setLastModified(path, timestamp);
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -391,8 +391,9 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
|
||||
{
|
||||
auto remote_objects = metadata_storage->getRemotePaths(path);
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
uint32_t hardlink_count = metadata_storage->unlinkAndGetHardlinkCount(path, tx);
|
||||
tx->unlinkMetadata(path);
|
||||
tx->commit();
|
||||
uint32_t hardlink_count = metadata_storage->getHardlinkCount(path);
|
||||
|
||||
if (hardlink_count == 0)
|
||||
{
|
||||
@ -413,7 +414,7 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
|
||||
"It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", path);
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->unlinkFile(path, tx);
|
||||
tx->unlinkFile(path);
|
||||
tx->commit();
|
||||
}
|
||||
else
|
||||
@ -436,7 +437,7 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
|
||||
removeMetadataRecursive(it->path(), paths_to_remove);
|
||||
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->removeDirectory(path, tx);
|
||||
tx->removeDirectory(path);
|
||||
tx->commit();
|
||||
}
|
||||
}
|
||||
@ -556,9 +557,10 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
if (mode == WriteMode::Rewrite)
|
||||
metadata_storage->createMetadataFile(path, blob_name, count, tx);
|
||||
tx->createMetadataFile(path, blob_name, count);
|
||||
else
|
||||
metadata_storage->addBlobToMetadata(path, blob_name, count, tx);
|
||||
tx->addBlobToMetadata(path, blob_name, count);
|
||||
|
||||
tx->commit();
|
||||
};
|
||||
|
||||
|
@ -223,7 +223,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::Abs
|
||||
restoreFileOperations(source_object_storage, information);
|
||||
|
||||
auto tx = disk->metadata_storage->createTransaction();
|
||||
disk->metadata_storage->unlinkFile(RESTORE_FILE_NAME, tx);
|
||||
tx->unlinkFile(RESTORE_FILE_NAME);
|
||||
tx->commit();
|
||||
|
||||
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
|
||||
@ -240,7 +240,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restore(const Poco::Util::Abs
|
||||
|
||||
void DiskObjectStorageRemoteMetadataRestoreHelper::readRestoreInformation(RestoreInformation & restore_information) /// NOLINT
|
||||
{
|
||||
auto metadata_str = disk->metadata_storage->readMetadataFileToString(RESTORE_FILE_NAME);
|
||||
auto metadata_str = disk->metadata_storage->readFileToString(RESTORE_FILE_NAME);
|
||||
ReadBufferFromString buffer(metadata_str);
|
||||
|
||||
try
|
||||
@ -424,7 +424,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles(IObjectSt
|
||||
source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage);
|
||||
|
||||
auto tx = disk->metadata_storage->createTransaction();
|
||||
disk->metadata_storage->addBlobToMetadata(path, relative_key, meta.size_bytes, tx);
|
||||
tx->addBlobToMetadata(path, relative_key, meta.size_bytes);
|
||||
tx->commit();
|
||||
|
||||
LOG_TRACE(disk->log, "Restored file {}", path);
|
||||
@ -438,7 +438,7 @@ void DiskObjectStorage::onFreeze(const String & path)
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
WriteBufferFromOwnString revision_file_buf ;
|
||||
writeIntText(metadata_helper->revision_counter.load(), revision_file_buf);
|
||||
metadata_storage->writeMetadataToFile(path + "revision.txt", tx, revision_file_buf.str());
|
||||
tx->writeStringToFile(path + "revision.txt", revision_file_buf.str());
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -558,10 +558,10 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|
||||
|
||||
/// to_path may exist and non-empty in case for example abrupt restart, so remove it before rename
|
||||
if (disk->metadata_storage->exists(to_path))
|
||||
disk->metadata_storage->removeRecursive(to_path, tx);
|
||||
tx->removeRecursive(to_path);
|
||||
|
||||
disk->createDirectories(directoryPath(to_path));
|
||||
disk->metadata_storage->moveDirectory(from_path, to_path, tx);
|
||||
tx->moveDirectory(from_path, to_path);
|
||||
}
|
||||
tx->commit();
|
||||
}
|
||||
|
@ -1,6 +0,0 @@
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
}
|
@ -25,12 +25,65 @@ struct IMetadataOperation
|
||||
|
||||
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
|
||||
|
||||
class IMetadataStorage;
|
||||
|
||||
/// Tries to provide some "transactions" interface, which allow
|
||||
/// to execute (commit) operations simultaneously. We don't provide
|
||||
/// any snapshot isolation here, so no read operations in transactions
|
||||
/// interface. This transaction is more like "batch operation" than real "transaction".
|
||||
///
|
||||
/// But for better usability we can get MetadataStorage interface and use some read methods.
|
||||
struct IMetadataTransaction : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
virtual void addOperation(MetadataOperationPtr && operation) = 0;
|
||||
virtual void commit() = 0;
|
||||
virtual void rollback() = 0;
|
||||
|
||||
virtual const IMetadataStorage & getStorageForNonTransactionalReads() const = 0;
|
||||
|
||||
/// General purpose methods
|
||||
|
||||
/// Write metadata string to file
|
||||
virtual void writeStringToFile(const std::string & path, const std::string & data) = 0;
|
||||
|
||||
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
|
||||
|
||||
virtual void setReadOnly(const std::string & path) = 0;
|
||||
|
||||
virtual void unlinkFile(const std::string & path) = 0;
|
||||
|
||||
virtual void createDirectory(const std::string & path) = 0;
|
||||
|
||||
virtual void createDicrectoryRecursive(const std::string & path) = 0;
|
||||
|
||||
virtual void removeDirectory(const std::string & path) = 0;
|
||||
|
||||
virtual void removeRecursive(const std::string & path) = 0;
|
||||
|
||||
virtual void createHardLink(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
virtual void moveFile(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
virtual void moveDirectory(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
virtual void replaceFile(const std::string & path_from, const std::string & path_to) = 0;
|
||||
|
||||
/// Metadata related methods
|
||||
|
||||
/// Create empty file in metadata storage
|
||||
virtual void createEmptyMetadataFile(const std::string & path) = 0;
|
||||
|
||||
/// Create metadata file on paths with content (blob_name, size_in_bytes)
|
||||
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
||||
|
||||
/// Add to new blob to metadata file (way to implement appends)
|
||||
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) = 0;
|
||||
|
||||
/// Unlink metadata file and do something special if required
|
||||
/// By default just remove file (unlink file).
|
||||
virtual void unlinkMetadata(const std::string & path)
|
||||
{
|
||||
unlinkFile(path);
|
||||
}
|
||||
|
||||
virtual ~IMetadataTransaction() = default;
|
||||
};
|
||||
@ -40,57 +93,32 @@ using MetadataTransactionPtr = std::shared_ptr<IMetadataTransaction>;
|
||||
/// Metadata storage for remote disks like DiskObjectStorage.
|
||||
/// Support some subset of Disk operations, allow to read/write only
|
||||
/// small amounts of data (strings).
|
||||
/// Tries to provide some "transactions" interface, which allow
|
||||
/// to execute operations simultaneously.
|
||||
class IMetadataStorage : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
virtual MetadataTransactionPtr createTransaction() const = 0;
|
||||
|
||||
/// General purpose functions (similar to Disk)
|
||||
virtual const std::string & getPath() const = 0;
|
||||
|
||||
virtual bool exists(const std::string & path) const = 0;
|
||||
|
||||
virtual bool isFile(const std::string & path) const = 0;
|
||||
|
||||
virtual bool isDirectory(const std::string & path) const = 0;
|
||||
|
||||
virtual uint64_t getFileSize(const std::string & path) const = 0;
|
||||
|
||||
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
|
||||
|
||||
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
|
||||
|
||||
virtual DirectoryIteratorPtr iterateDirectory(const std::string & path) = 0;
|
||||
|
||||
virtual uint32_t getHardlinkCount(const std::string & path) const = 0;
|
||||
|
||||
/// Create empty file in metadata storage
|
||||
virtual void createEmptyMetadataFile(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
/// Read metadata file to string from path
|
||||
virtual std::string readMetadataFileToString(const std::string & path) const = 0;
|
||||
|
||||
/// Read metadata string to file
|
||||
virtual void writeMetadataToFile(
|
||||
const std::string & path,
|
||||
MetadataTransactionPtr transaction,
|
||||
const std::string & data) = 0;
|
||||
|
||||
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void setReadOnly(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void unlinkFile(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void createDirectory(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
virtual void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
|
||||
virtual std::string readFileToString(const std::string & path) const = 0;
|
||||
|
||||
virtual ~IMetadataStorage() = default;
|
||||
|
||||
@ -104,15 +132,6 @@ public:
|
||||
|
||||
/// Return [(remote_path, size_in_bytes), ...] for metadata path
|
||||
virtual BlobsPathToSize getBlobs(const std::string & path) const = 0;
|
||||
|
||||
/// Create metadata file on paths with content (blob_name, size_in_bytes)
|
||||
virtual void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
/// Add to new blob to metadata file (way to implement appends)
|
||||
virtual void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) = 0;
|
||||
|
||||
/// Unlink file and return amount of hardlinks left
|
||||
virtual uint32_t unlinkAndGetHardlinkCount(const std::string & path, MetadataTransactionPtr transaction) = 0;
|
||||
};
|
||||
|
||||
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Poco/TemporaryFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -12,7 +13,6 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FS_METADATA_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
@ -26,8 +26,6 @@ std::string toString(MetadataFromDiskTransactionState state)
|
||||
return "FAILED";
|
||||
case MetadataFromDiskTransactionState::COMMITTED:
|
||||
return "COMMITTED";
|
||||
case MetadataFromDiskTransactionState::ROLLED_BACK:
|
||||
return "ROLLED_BACK";
|
||||
case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK:
|
||||
return "PARTIALLY_ROLLED_BACK";
|
||||
}
|
||||
@ -37,15 +35,9 @@ std::string toString(MetadataFromDiskTransactionState state)
|
||||
namespace
|
||||
{
|
||||
|
||||
std::string getTempFileName()
|
||||
std::string getTempFileName(const std::string & dir)
|
||||
{
|
||||
std::string temp_filepath;
|
||||
std::string dummy_prefix = "a/";
|
||||
temp_filepath = Poco::TemporaryFile::tempName(dummy_prefix);
|
||||
dummy_prefix += "tmp";
|
||||
assert(temp_filepath.starts_with(dummy_prefix));
|
||||
temp_filepath.replace(0, dummy_prefix.length(), "tmp");
|
||||
return temp_filepath;
|
||||
return fs::path(dir) / getRandomASCIIString();
|
||||
}
|
||||
|
||||
class SetLastModifiedOperation final : public IMetadataOperation
|
||||
@ -171,17 +163,7 @@ public:
|
||||
|
||||
void execute() override
|
||||
{
|
||||
try
|
||||
{
|
||||
disk.removeDirectory(path);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::vector<std::string> files;
|
||||
disk.listFiles(path, files);
|
||||
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "GOT FIlES {}", fmt::join(files, ", "));
|
||||
throw;
|
||||
}
|
||||
disk.removeDirectory(path);
|
||||
}
|
||||
|
||||
void undo() override
|
||||
@ -199,7 +181,7 @@ public:
|
||||
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_)
|
||||
: path(path_)
|
||||
, disk(disk_)
|
||||
, temp_path(getTempFileName())
|
||||
, temp_path(getTempFileName(fs::path(path).parent_path()))
|
||||
{
|
||||
}
|
||||
|
||||
@ -207,7 +189,7 @@ public:
|
||||
{
|
||||
if (disk.isFile(path))
|
||||
disk.moveFile(path, temp_path);
|
||||
if (disk.isDirectory(path))
|
||||
else if (disk.isDirectory(path))
|
||||
disk.moveDirectory(path, temp_path);
|
||||
}
|
||||
|
||||
@ -215,7 +197,7 @@ public:
|
||||
{
|
||||
if (disk.isFile(temp_path))
|
||||
disk.moveFile(temp_path, path);
|
||||
if (disk.isDirectory(temp_path))
|
||||
else if (disk.isDirectory(temp_path))
|
||||
disk.moveDirectory(temp_path, path);
|
||||
}
|
||||
|
||||
@ -315,7 +297,7 @@ public:
|
||||
: path_from(path_from_)
|
||||
, path_to(path_to_)
|
||||
, disk(disk_)
|
||||
, temp_path_to(getTempFileName())
|
||||
, temp_path_to(getTempFileName(fs::path(path_to).parent_path()))
|
||||
{
|
||||
}
|
||||
|
||||
@ -381,12 +363,11 @@ public:
|
||||
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::writeMetadataToFile( /// NOLINT
|
||||
void MetadataStorageFromDiskTransaction::writeStringToFile( /// NOLINT
|
||||
const std::string & path,
|
||||
MetadataTransactionPtr transaction,
|
||||
const std::string & data)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<WriteFileOperation>(path, *disk, data));
|
||||
addOperation(std::make_unique<WriteFileOperation>(path, *metadata_storage.disk, data));
|
||||
}
|
||||
|
||||
|
||||
@ -406,7 +387,7 @@ void MetadataStorageFromDiskTransaction::commit()
|
||||
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
|
||||
|
||||
{
|
||||
std::unique_lock lock(commit_mutex);
|
||||
std::unique_lock lock(metadata_storage.metadata_mutex);
|
||||
for (size_t i = 0; i < operations.size(); ++i)
|
||||
{
|
||||
try
|
||||
@ -416,8 +397,8 @@ void MetadataStorageFromDiskTransaction::commit()
|
||||
catch (Exception & ex)
|
||||
{
|
||||
ex.addMessage(fmt::format("While committing operation #{}", i));
|
||||
failed_operation_index = i;
|
||||
state = MetadataFromDiskTransactionState::FAILED;
|
||||
rollback(i);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@ -439,19 +420,15 @@ void MetadataStorageFromDiskTransaction::commit()
|
||||
state = MetadataFromDiskTransactionState::COMMITTED;
|
||||
}
|
||||
|
||||
void MetadataStorageFromDiskTransaction::rollback()
|
||||
void MetadataStorageFromDiskTransaction::rollback(size_t until_pos)
|
||||
{
|
||||
/// Otherwise everything is alright
|
||||
if (state == MetadataFromDiskTransactionState::FAILED)
|
||||
{
|
||||
if (!failed_operation_index.has_value())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction in failed state, but has not failed operations. It's a bug");
|
||||
|
||||
for (int64_t i = failed_operation_index.value(); i >= 0; --i)
|
||||
for (int64_t i = until_pos; i >= 0; --i)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::unique_lock lock(commit_mutex);
|
||||
operations[i]->undo();
|
||||
}
|
||||
catch (Exception & ex)
|
||||
@ -466,23 +443,6 @@ void MetadataStorageFromDiskTransaction::rollback()
|
||||
{
|
||||
/// Nothing to do, transaction committed or not even started to commit
|
||||
}
|
||||
|
||||
state = MetadataFromDiskTransactionState::ROLLED_BACK;
|
||||
}
|
||||
|
||||
MetadataStorageFromDiskTransaction::~MetadataStorageFromDiskTransaction()
|
||||
{
|
||||
if (state == MetadataFromDiskTransactionState::FAILED)
|
||||
{
|
||||
try
|
||||
{
|
||||
rollback();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const std::string & MetadataStorageFromDisk::getPath() const
|
||||
@ -530,7 +490,7 @@ DirectoryIteratorPtr MetadataStorageFromDisk::iterateDirectory(const std::string
|
||||
}
|
||||
|
||||
|
||||
std::string MetadataStorageFromDisk::readMetadataFileToString(const std::string & path) const
|
||||
std::string MetadataStorageFromDisk::readFileToString(const std::string & path) const
|
||||
{
|
||||
auto buf = disk->readFile(path);
|
||||
std::string result;
|
||||
@ -538,102 +498,102 @@ std::string MetadataStorageFromDisk::readMetadataFileToString(const std::string
|
||||
return result;
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createEmptyMetadataFile(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createEmptyMetadataFile(const std::string & path)
|
||||
{
|
||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
|
||||
writeStringToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *disk));
|
||||
addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::unlinkFile(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::unlinkFile(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<UnlinkFileOperation>(path, *disk));
|
||||
addOperation(std::make_unique<UnlinkFileOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::removeRecursive(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::removeRecursive(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<RemoveRecursiveOperation>(path, *disk));
|
||||
addOperation(std::make_unique<RemoveRecursiveOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createDirectory(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createDirectory(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<CreateDirectoryOperation>(path, *disk));
|
||||
addOperation(std::make_unique<CreateDirectoryOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createDicrectoryRecursive(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *disk));
|
||||
addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::removeDirectory(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::removeDirectory(const std::string & path)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<RemoveDirectoryOperation>(path, *disk));
|
||||
addOperation(std::make_unique<RemoveDirectoryOperation>(path, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createHardLink(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
auto metadata = readMetadata(path_from);
|
||||
auto metadata = metadata_storage.readMetadata(path_from);
|
||||
|
||||
metadata->incrementRefCount();
|
||||
|
||||
writeMetadataToFile(path_from, transaction, metadata->serializeToString());
|
||||
writeStringToFile(path_from, metadata->serializeToString());
|
||||
|
||||
transaction->addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::moveFile(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::moveDirectory(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::replaceFile(const std::string & path_from, const std::string & path_to)
|
||||
{
|
||||
transaction->addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *disk));
|
||||
addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *metadata_storage.disk));
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::setReadOnly(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::setReadOnly(const std::string & path)
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
auto metadata = metadata_storage.readMetadata(path);
|
||||
metadata->setReadOnly();
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeStringToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
||||
{
|
||||
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
DiskObjectStorageMetadataPtr metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
|
||||
metadata->addObject(blob_name, size_in_bytes);
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeStringToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
void MetadataStorageFromDisk::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes)
|
||||
{
|
||||
DiskObjectStorageMetadataPtr metadata;
|
||||
if (exists(path))
|
||||
if (metadata_storage.exists(path))
|
||||
{
|
||||
metadata = readMetadata(path);
|
||||
metadata = metadata_storage.readMetadata(path);
|
||||
}
|
||||
else
|
||||
{
|
||||
metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
metadata = std::make_unique<DiskObjectStorageMetadata>(metadata_storage.disk->getPath(), metadata_storage.root_path_for_remote_metadata, path);
|
||||
}
|
||||
|
||||
metadata->addObject(blob_name, size_in_bytes);
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeStringToFile(path, metadata->serializeToString());
|
||||
}
|
||||
|
||||
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> &) const
|
||||
{
|
||||
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
|
||||
auto str = readMetadataFileToString(path);
|
||||
auto str = readFileToString(path);
|
||||
metadata->deserializeFromString(str);
|
||||
return metadata;
|
||||
}
|
||||
@ -661,6 +621,11 @@ std::unordered_map<String, String> MetadataStorageFromDisk::getSerializedMetadat
|
||||
return metadatas;
|
||||
}
|
||||
|
||||
MetadataTransactionPtr MetadataStorageFromDisk::createTransaction() const
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromDiskTransaction>(*this);
|
||||
}
|
||||
|
||||
std::vector<std::string> MetadataStorageFromDisk::getRemotePaths(const std::string & path) const
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
@ -681,25 +646,22 @@ uint32_t MetadataStorageFromDisk::getHardlinkCount(const std::string & path) con
|
||||
return metadata->getRefCount();
|
||||
}
|
||||
|
||||
|
||||
BlobsPathToSize MetadataStorageFromDisk::getBlobs(const std::string & path) const
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
return metadata->getBlobs();
|
||||
}
|
||||
|
||||
|
||||
uint32_t MetadataStorageFromDisk::unlinkAndGetHardlinkCount(const std::string & path, MetadataTransactionPtr transaction)
|
||||
void MetadataStorageFromDiskTransaction::unlinkMetadata(const std::string & path)
|
||||
{
|
||||
auto metadata = readMetadata(path);
|
||||
auto metadata = metadata_storage.readMetadata(path);
|
||||
uint32_t ref_count = metadata->getRefCount();
|
||||
if (ref_count != 0)
|
||||
{
|
||||
metadata->decrementRefCount();
|
||||
writeMetadataToFile(path, transaction, metadata->serializeToString());
|
||||
writeStringToFile(path, metadata->serializeToString());
|
||||
}
|
||||
unlinkFile(path, transaction);
|
||||
return ref_count;
|
||||
unlinkFile(path);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,35 +13,16 @@ enum class MetadataFromDiskTransactionState
|
||||
PREPARING,
|
||||
FAILED,
|
||||
COMMITTED,
|
||||
ROLLED_BACK,
|
||||
PARTIALLY_ROLLED_BACK,
|
||||
};
|
||||
|
||||
std::string toString(MetadataFromDiskTransactionState state);
|
||||
|
||||
struct MetadataStorageFromDiskTransaction final : public IMetadataTransaction
|
||||
{
|
||||
private:
|
||||
std::optional<size_t> failed_operation_index;
|
||||
|
||||
std::shared_mutex & commit_mutex;
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
|
||||
public:
|
||||
explicit MetadataStorageFromDiskTransaction(std::shared_mutex & commit_mutex_)
|
||||
: commit_mutex(commit_mutex_)
|
||||
{}
|
||||
|
||||
void addOperation(MetadataOperationPtr && operation) override;
|
||||
void commit() override;
|
||||
void rollback() override;
|
||||
|
||||
~MetadataStorageFromDiskTransaction() override;
|
||||
};
|
||||
|
||||
class MetadataStorageFromDisk final : public IMetadataStorage
|
||||
{
|
||||
private:
|
||||
friend struct MetadataStorageFromDiskTransaction;
|
||||
|
||||
DiskPtr disk;
|
||||
std::string root_path_for_remote_metadata;
|
||||
mutable std::shared_mutex metadata_mutex;
|
||||
@ -53,10 +34,7 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
MetadataTransactionPtr createTransaction() const override
|
||||
{
|
||||
return std::make_shared<MetadataStorageFromDiskTransaction>(metadata_mutex);
|
||||
}
|
||||
MetadataTransactionPtr createTransaction() const override;
|
||||
|
||||
const std::string & getPath() const override;
|
||||
|
||||
@ -74,7 +52,7 @@ public:
|
||||
|
||||
DirectoryIteratorPtr iterateDirectory(const std::string & path) override;
|
||||
|
||||
std::string readMetadataFileToString(const std::string & path) const override;
|
||||
std::string readFileToString(const std::string & path) const override;
|
||||
|
||||
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
|
||||
|
||||
@ -84,45 +62,69 @@ public:
|
||||
|
||||
uint32_t getHardlinkCount(const std::string & path) const override;
|
||||
|
||||
void writeMetadataToFile(
|
||||
const std::string & path,
|
||||
MetadataTransactionPtr transaction,
|
||||
const std::string & data) override;
|
||||
|
||||
void createEmptyMetadataFile(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void setReadOnly(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void unlinkFile(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createDirectory(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
|
||||
|
||||
uint32_t unlinkAndGetHardlinkCount(const std::string & path, MetadataTransactionPtr transaction) override;
|
||||
|
||||
private:
|
||||
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
|
||||
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> & lock) const;
|
||||
};
|
||||
|
||||
struct MetadataStorageFromDiskTransaction final : public IMetadataTransaction
|
||||
{
|
||||
private:
|
||||
const MetadataStorageFromDisk & metadata_storage;
|
||||
|
||||
std::vector<MetadataOperationPtr> operations;
|
||||
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
|
||||
|
||||
void addOperation(MetadataOperationPtr && operation);
|
||||
void rollback(size_t until_pos);
|
||||
|
||||
public:
|
||||
explicit MetadataStorageFromDiskTransaction(const MetadataStorageFromDisk & metadata_storage_)
|
||||
: metadata_storage(metadata_storage_)
|
||||
{}
|
||||
|
||||
const IMetadataStorage & getStorageForNonTransactionalReads() const override
|
||||
{
|
||||
return metadata_storage;
|
||||
}
|
||||
|
||||
void commit() override;
|
||||
|
||||
void writeStringToFile(const std::string & path, const std::string & data) override;
|
||||
|
||||
void createEmptyMetadataFile(const std::string & path) override;
|
||||
|
||||
void createMetadataFile(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void addBlobToMetadata(const std::string & path, const std::string & blob_name, uint64_t size_in_bytes) override;
|
||||
|
||||
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
|
||||
|
||||
void setReadOnly(const std::string & path) override;
|
||||
|
||||
void unlinkFile(const std::string & path) override;
|
||||
|
||||
void createDirectory(const std::string & path) override;
|
||||
|
||||
void createDicrectoryRecursive(const std::string & path) override;
|
||||
|
||||
void removeDirectory(const std::string & path) override;
|
||||
|
||||
void removeRecursive(const std::string & path) override;
|
||||
|
||||
void createHardLink(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void moveDirectory(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void replaceFile(const std::string & path_from, const std::string & path_to) override;
|
||||
|
||||
void unlinkMetadata(const std::string & path) override;
|
||||
|
||||
~MetadataStorageFromDiskTransaction() override = default;
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
@ -261,17 +261,18 @@ void MergeTreeWriteAheadLog::shutdown()
|
||||
{
|
||||
{
|
||||
std::unique_lock lock(write_mutex);
|
||||
if (shutted_down)
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
if (sync_scheduled)
|
||||
sync_cv.wait(lock, [this] { return !sync_scheduled; });
|
||||
|
||||
shutted_down = true;
|
||||
shutdown_called = true;
|
||||
out->finalize();
|
||||
out.reset();
|
||||
}
|
||||
|
||||
/// Do it without lock, otherwise inversion between pool lock and write_mutex is possible
|
||||
sync_task->deactivate();
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ private:
|
||||
|
||||
size_t bytes_at_last_sync = 0;
|
||||
bool sync_scheduled = false;
|
||||
bool shutted_down = false;
|
||||
bool shutdown_called = false;
|
||||
|
||||
mutable std::mutex write_mutex;
|
||||
|
||||
|
@ -8165,7 +8165,7 @@ public:
|
||||
writeString(table_shared_id, buffer);
|
||||
buffer.write("\n", 1);
|
||||
|
||||
metadata_storage->writeMetadataToFile(file_path, tx, buffer.str());
|
||||
tx->writeStringToFile(file_path, buffer.str());
|
||||
tx->commit();
|
||||
}
|
||||
|
||||
@ -8176,7 +8176,7 @@ public:
|
||||
|
||||
if (!metadata_storage->exists(file_path))
|
||||
return false;
|
||||
auto metadata_str = metadata_storage->readMetadataFileToString(file_path);
|
||||
auto metadata_str = metadata_storage->readFileToString(file_path);
|
||||
ReadBufferFromString buffer(metadata_str);
|
||||
readIntText(version, buffer);
|
||||
if (version != 1)
|
||||
@ -8205,7 +8205,7 @@ public:
|
||||
if (metadata_storage->exists(fname))
|
||||
{
|
||||
auto tx = metadata_storage->createTransaction();
|
||||
metadata_storage->unlinkFile(fname, tx);
|
||||
tx->unlinkFile(fname);
|
||||
tx->commit();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user