Fix race condition on hardlink/erase/read metadata

This commit is contained in:
alesapin 2022-02-02 19:40:21 +03:00
parent e87ed0ff02
commit b9c118524f
14 changed files with 247 additions and 273 deletions

View File

@ -66,7 +66,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskAzureBlobStorage::readFile(
std::optional<size_t>) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
auto metadata = readMetadata(path);
LOG_TEST(log, "Read from file by path: {}", backQuote(metadata_disk->getPath() + path));
@ -94,7 +94,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
size_t buf_size,
WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
auto blob_path = path + "_" + getRandomASCIIString(8); /// NOTE: path contains the tmp_* prefix in the blob name
LOG_TRACE(log, "{} to file by path: {}. AzureBlob Storage path: {}",
@ -106,7 +105,12 @@ std::unique_ptr<WriteBufferFromFileBase> DiskAzureBlobStorage::writeFile(
current_settings.get()->max_single_part_upload_size,
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromAzureBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
auto create_metadata_callback = [this, path, mode, blob_path] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_path, count] (Metadata & metadata) { metadata.addObject(blob_path, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromAzureBlobStorage>>(std::move(buffer), std::move(create_metadata_callback), path);
}

View File

@ -71,17 +71,7 @@ public:
void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const override { return delegate->readMetaFile(path, settings, size); }
std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode) override { return delegate->writeMetaFile(path, buf_size, mode); }
void removeMetaFileIfExists(const String & path) override { delegate->removeMetaFileIfExists(path); }
DiskPtr getMetadataDiskIfExistsOrSelf() override { return delegate->getMetadataDiskIfExistsOrSelf(); }
UInt32 getRefCount(const String & path) const override { return delegate->getRefCount(path); }

View File

@ -73,7 +73,7 @@ DiskHDFS::DiskHDFS(
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
{
auto metadata = readMeta(path);
auto metadata = readMetadata(path);
LOG_TEST(log,
"Read from file by path: {}. Existing HDFS objects: {}",
@ -87,8 +87,6 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new HDFS object.
auto file_name = getRandomName();
auto hdfs_path = remote_fs_root_path + file_name;
@ -100,10 +98,13 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
config, settings->replication, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
auto create_metadata_callback = [this, path, mode, hdfs_path] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [hdfs_path, count] (Metadata & metadata) { metadata.addObject(hdfs_path, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(std::move(hdfs_buffer),
std::move(metadata),
file_name);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(
std::move(hdfs_buffer), std::move(create_metadata_callback), path);
}

View File

@ -86,28 +86,4 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
std::unique_ptr<ReadBufferFromFileBase> IDisk::readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Read local metafile: {}", path);
return readFile(path, settings, size);
}
std::unique_ptr<WriteBufferFromFileBase> IDisk::writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Write local metafile: {}", path);
return writeFile(path, buf_size, mode);
}
void IDisk::removeMetaFileIfExists(const String & path)
{
LOG_TRACE(&Poco::Logger::get("IDisk"), "Remove local metafile: {}", path);
removeFileIfExists(path);
}
}

View File

@ -251,26 +251,19 @@ public:
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
/// Open the local file for read and return ReadBufferFromFileBase object.
/// Overridden in IDiskRemote.
/// Used for work with custom metadata.
virtual std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const;
/// Open the local file for write and return WriteBufferFromFileBase object.
/// Overridden in IDiskRemote.
/// Used for work with custom metadata.
virtual std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode);
virtual void removeMetaFileIfExists(const String & path);
virtual std::shared_ptr<IDisk> getMetadataDiskIfExistsOrSelf() { return std::static_pointer_cast<IDisk>(shared_from_this()); }
/// Return reference count for remote FS.
/// Overridden in IDiskRemote.
/// You can ask -- why we have zero and what does it mean? For some unknown reason
/// the decision was made to take 0 as "no references exist", but only file itself left.
/// With normal file system we will get 1 in this case:
/// $ stat clickhouse
/// File: clickhouse
/// Size: 3014014920 Blocks: 5886760 IO Block: 4096 regular file
/// Device: 10301h/66305d Inode: 3109907 Links: 1
/// Why we have always zero by default? Because normal filesystem
/// manages hardlinks by itself. So you can always remove hardlink and all
/// other alive harlinks will not be removed.
virtual UInt32 getRefCount(const String &) const { return 0; }
protected:

View File

@ -28,19 +28,59 @@ namespace ErrorCodes
}
/// Load metadata by path or create empty if `create` flag is set.
IDiskRemote::Metadata::Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_,
bool create)
: RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, metadata_disk(metadata_disk_)
, total_size(0), ref_count(0)
IDiskRemote::Metadata IDiskRemote::Metadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
{
if (create)
return;
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, IDiskRemote::MetadataUpdater updater)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
updater(result);
result.save(sync);
return result;
}
IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
{
if (overwrite || !metadata_disk_->exists(metadata_file_path_))
{
return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync);
}
else
{
auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
if (result.read_only)
throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED);
return result;
}
}
void IDiskRemote::Metadata::load()
{
try
{
const ReadSettings read_settings;
@ -102,6 +142,17 @@ IDiskRemote::Metadata::Metadata(
}
}
/// Load metadata by path or create empty if `create` flag is set.
IDiskRemote::Metadata::Metadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_)
: RemoteMetadata(remote_fs_root_path_, metadata_file_path_)
, metadata_disk(metadata_disk_)
, total_size(0), ref_count(0)
{
}
void IDiskRemote::Metadata::addObject(const String & path, size_t size)
{
total_size += size;
@ -111,6 +162,7 @@ void IDiskRemote::Metadata::addObject(const String & path, size_t size)
/// Fsync metadata file if 'sync' flag is set.
void IDiskRemote::Metadata::save(bool sync)
{
namespace fs = std::filesystem;
auto buf = metadata_disk->writeFile(metadata_file_path, 1024);
writeIntText(VERSION_RELATIVE_PATHS, *buf);
@ -140,42 +192,43 @@ void IDiskRemote::Metadata::save(bool sync)
buf->sync();
}
IDiskRemote::Metadata IDiskRemote::readOrCreateMetaForWriting(const String & path, WriteMode mode)
IDiskRemote::Metadata IDiskRemote::readMetadata(const String & path) const
{
bool exist = exists(path);
if (exist)
std::shared_lock lock(metadata_mutex);
return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path);
}
IDiskRemote::Metadata IDiskRemote::readUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
IDiskRemote::Metadata IDiskRemote::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, IDiskRemote::MetadataUpdater updater)
{
if (mode == WriteMode::Rewrite || !metadata_disk->exists(path))
{
auto metadata = readMeta(path);
if (metadata.read_only)
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
if (mode == WriteMode::Rewrite)
removeFile(path); /// Remove for re-write.
else
return metadata;
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
else
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
auto metadata = createMeta(path);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
return metadata;
}
IDiskRemote::Metadata IDiskRemote::readMeta(const String & path) const
IDiskRemote::Metadata IDiskRemote::createAndStoreMetadata(const String & path, bool sync)
{
return Metadata(remote_fs_root_path, metadata_disk, path);
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync);
}
IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const
IDiskRemote::Metadata IDiskRemote::createUpdateAndStoreMetadata(const String & path, bool sync, IDiskRemote::MetadataUpdater updater)
{
return Metadata(remote_fs_root_path, metadata_disk, path, true);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
}
void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
@ -184,21 +237,25 @@ void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths
try
{
auto metadata = readMeta(path);
auto metadata_updater = [fs_paths_keeper, this] (Metadata & metadata)
{
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
return false;
}
else /// In other case decrement number of references, save metadata and delete hardlink.
{
--metadata.ref_count;
}
return true;
};
readUpdateAndStoreMetadata(path, false, metadata_updater);
metadata_disk->removeFile(path);
/// If there is no references - delete content from remote FS.
if (metadata.ref_count == 0)
{
metadata_disk->removeFile(path);
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
}
else /// In other case decrement number of references, save metadata and delete file.
{
--metadata.ref_count;
metadata.save();
metadata_disk->removeFile(path);
}
}
catch (const Exception & e)
{
@ -216,18 +273,19 @@ void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths
}
void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
void IDiskRemote::removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (metadata_disk->isFile(path))
{
removeMeta(path, fs_paths_keeper);
removeMetadata(path, fs_paths_keeper);
}
else
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
removeMetaRecursive(it->path(), fs_paths_keeper);
for (auto it = iterateDirectory(path); it->isValid(); it->next())
removeMetadataRecursive(it->path(), fs_paths_keeper);
metadata_disk->removeDirectory(path);
}
}
@ -305,16 +363,13 @@ bool IDiskRemote::isFile(const String & path) const
void IDiskRemote::createFile(const String & path)
{
/// Create empty metadata file.
auto metadata = createMeta(path);
metadata.save();
createAndStoreMetadata(path, false);
}
size_t IDiskRemote::getFileSize(const String & path) const
{
auto metadata = readMeta(path);
return metadata.total_size;
return readMetadata(path).total_size;
}
@ -341,32 +396,32 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
}
void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMeta(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeMetadata(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
void IDiskRemote::removeSharedFileIfExists(const String & path, bool keep_in_remote_fs)
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
if (metadata_disk->exists(path))
{
removeMeta(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeMetadata(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
}
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs)
void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetaRecursive(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeMetadataRecursive(path, fs_paths_keeper);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
}
@ -375,9 +430,7 @@ void IDiskRemote::setReadOnly(const String & path)
{
/// We should store read only flag inside metadata file (instead of using FS flag),
/// because we modify metadata file when create hard-links from it.
auto metadata = readMeta(path);
metadata.read_only = true;
metadata.save();
readUpdateAndStoreMetadata(path, false, [] (Metadata & metadata) { metadata.read_only = true; return true; });
}
@ -401,7 +454,7 @@ void IDiskRemote::createDirectories(const String & path)
void IDiskRemote::clearDirectory(const String & path)
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (isFile(it->path()))
removeFile(it->path());
}
@ -440,10 +493,7 @@ Poco::Timestamp IDiskRemote::getLastModified(const String & path)
void IDiskRemote::createHardLink(const String & src_path, const String & dst_path)
{
/// Increment number of references.
auto src = readMeta(src_path);
++src.ref_count;
src.save();
readUpdateAndStoreMetadata(src_path, false, [] (Metadata & metadata) { metadata.ref_count++; return true; });
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
@ -485,7 +535,7 @@ bool IDiskRemote::tryReserve(UInt64 bytes)
String IDiskRemote::getUniqueId(const String & path) const
{
LOG_TRACE(log, "Remote path: {}, Path: {}", remote_fs_root_path, path);
Metadata metadata(remote_fs_root_path, metadata_disk, path);
auto metadata = readMetadata(path);
String id;
if (!metadata.remote_fs_objects.empty())
id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first;
@ -501,34 +551,9 @@ AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
return reader;
}
std::unique_ptr<ReadBufferFromFileBase> IDiskRemote::readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const
{
LOG_TRACE(log, "Read metafile: {}", path);
return metadata_disk->readFile(path, settings, size);
}
std::unique_ptr<WriteBufferFromFileBase> IDiskRemote::writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode)
{
LOG_TRACE(log, "Write metafile: {}", path);
return metadata_disk->writeFile(path, buf_size, mode);
}
void IDiskRemote::removeMetaFileIfExists(const String & path)
{
LOG_TRACE(log, "Remove metafile: {}", path);
return metadata_disk->removeFileIfExists(path);
}
UInt32 IDiskRemote::getRefCount(const String & path) const
{
auto meta = readMeta(path);
return meta.ref_count;
return readMetadata(path).ref_count;
}
}

View File

@ -57,16 +57,18 @@ public:
size_t thread_pool_size);
struct Metadata;
using MetadataUpdater = std::function<bool(Metadata & metadata)>;
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_disk->getPath(); }
Metadata readMeta(const String & path) const;
Metadata readMetadata(const String & path) const;
Metadata readUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
Metadata createMeta(const String & path) const;
Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode);
Metadata createAndStoreMetadata(const String & path, bool sync);
Metadata createUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
Metadata readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, MetadataUpdater updater);
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
@ -94,11 +96,11 @@ public:
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
void removeSharedFile(const String & path, bool keep_in_remote_fs) override;
void removeSharedFile(const String & path, bool delete_metadata_only) override;
void removeSharedFileIfExists(const String & path, bool keep_in_remote_fs) override;
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override;
void removeSharedRecursive(const String & path, bool delete_metadata_only) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
@ -136,18 +138,7 @@ public:
static AsynchronousReaderPtr getThreadPoolReader();
virtual std::unique_ptr<ReadBufferFromFileBase> readMetaFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> size) const override;
virtual std::unique_ptr<WriteBufferFromFileBase> writeMetaFile(
const String & path,
size_t buf_size,
WriteMode mode) override;
virtual void removeMetaFileIfExists(
const String & path) override;
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
UInt32 getRefCount(const String & path) const override;
@ -159,15 +150,16 @@ protected:
DiskPtr metadata_disk;
private:
void removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
bool tryReserve(UInt64 bytes);
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
mutable std::shared_mutex metadata_mutex;
};
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
@ -197,6 +189,7 @@ struct RemoteMetadata
struct IDiskRemote::Metadata : RemoteMetadata
{
using Updater = std::function<bool(IDiskRemote::Metadata & metadata)>;
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
@ -213,17 +206,23 @@ struct IDiskRemote::Metadata : RemoteMetadata
/// Flag indicates that file is read only.
bool read_only = false;
/// Load metadata by path or create empty if `create` flag is set.
Metadata(const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
const String & metadata_file_path_,
bool create = false);
const String & metadata_file_path_);
void addObject(const String & path, size_t size);
static Metadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
static Metadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
static Metadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static Metadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite);
private:
/// Fsync metadata file if 'sync' flag is set.
void save(bool sync = false);
void load();
};
class DiskRemoteReservation final : public IReservation

View File

@ -12,15 +12,14 @@ namespace DB
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_)
CreateMetadataCallback && create_callback_,
const String & metadata_file_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, metadata(std::move(metadata_))
, remote_fs_path(remote_fs_path_)
, create_metadata_callback(std::move(create_callback_))
, metadata_file_path(metadata_file_path_)
{
}
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
{
@ -34,25 +33,13 @@ WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
}
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::finalizeImpl()
{
WriteBufferFromFileDecorator::finalizeImpl();
metadata.addObject(remote_fs_path, count());
metadata.save();
create_metadata_callback(count());
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::sync()
{
if (finalized)
metadata.save(true);
}
#if USE_AWS_S3
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>;

View File

@ -9,6 +9,8 @@
namespace DB
{
using CreateMetadataCallback = std::function<void(size_t bytes_count)>;
/// Stores data in S3/HDFS and adds the object path and object size to metadata file on local FS.
template <typename T>
class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator
@ -16,21 +18,18 @@ class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorato
public:
WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_);
CreateMetadataCallback && create_callback_,
const String & metadata_file_path_);
virtual ~WriteIndirectBufferFromRemoteFS() override;
~WriteIndirectBufferFromRemoteFS() override;
void sync() override;
String getFileName() const override { return metadata.metadata_file_path; }
String getFileName() const override { return metadata_file_path; }
private:
void finalizeImpl() override;
IDiskRemote::Metadata metadata;
String remote_fs_path;
CreateMetadataCallback create_metadata_callback;
String metadata_file_path;
};
}

View File

@ -217,7 +217,7 @@ void DiskS3::moveFile(const String & from_path, const String & to_path, bool sen
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, const ReadSettings & read_settings, std::optional<size_t>, std::optional<size_t>) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
auto metadata = readMetadata(path);
LOG_TEST(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_disk->getPath() + path), metadata.remote_fs_objects.size());
@ -244,7 +244,6 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
auto settings = current_settings.get();
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new S3 object.
auto s3_path = getRandomASCIIString();
@ -265,13 +264,18 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,
bucket,
metadata.remote_fs_root_path + s3_path,
remote_fs_root_path + s3_path,
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path);
auto create_metadata_callback = [this, path, s3_path, mode] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false, [s3_path, count] (Metadata & metadata) { metadata.addObject(s3_path, count); return true; });
};
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(create_metadata_callback), path);
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
@ -293,13 +297,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path, bo
createFileOperationObject("hardlink", revision, object_metadata);
}
/// Increment number of references.
auto src = readMeta(src_path);
++src.ref_count;
src.save();
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
IDiskRemote::createHardLink(src_path, dst_path);
}
void DiskS3::shutdown()
@ -415,7 +413,7 @@ void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_TRACE(log, "Migrate file {} to restorable schema", metadata_disk->getPath() + path);
auto meta = readMeta(path);
auto meta = readMetadata(path);
for (const auto & [key, _] : meta.remote_fs_objects)
{
@ -871,15 +869,19 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
const auto & path = path_entry->second;
createDirectories(directoryPath(path));
auto metadata = createMeta(path);
auto relative_key = shrinkKey(source_path, key);
/// Copy object if we restore to different bucket / path.
if (bucket != source_bucket || remote_fs_root_path != source_path)
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key, head_result);
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();
auto updater = [relative_key, head_result] (Metadata & metadata)
{
metadata.addObject(relative_key, head_result.GetContentLength());
return true;
};
createUpdateAndStoreMetadata(path, false, updater);
LOG_TRACE(log, "Restored file {}", path);
}

View File

@ -1633,17 +1633,9 @@ String IMergeTreeDataPart::getUniqueId() const
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("Disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
String id = disk->getUniqueId(fs::path(getFullRelativePath()) / "checksums.txt");
return id;
return disk->getUniqueId(fs::path(getFullRelativePath()) / FILE_FOR_REFERENCES_CHECK);
}
UInt32 IMergeTreeDataPart::getNumberOfRefereneces() const
{
return volume->getDisk()->getRefCount(fs::path(getFullRelativePath()) / "checksums.txt");
}
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
{
if (info.level != 0)

View File

@ -406,6 +406,18 @@ public:
/// (number of rows, number of rows with default values, etc).
static inline constexpr auto SERIALIZATION_FILE_NAME = "serialization.json";
/// One of part files which is used to check how many references (I'd like
/// to say hardlinks, but it will confuse even more) we have for the part
/// for zero copy replication. Sadly it's very complex.
///
/// NOTE: it's not a random "metadata" file for part like 'columns.txt'. If
/// two relative parts (for example all_1_1_0 and all_1_1_0_100) has equal
/// checksums.txt it means that one part was obtained by FREEZE operation or
/// it was mutation without any change for source part. In this case we
/// really don't need to remove data from remote FS and need only decrement
/// reference counter locally.
static inline constexpr auto FILE_FOR_REFERENCES_CHECK = "checksums.txt";
/// Checks that all TTLs (table min/max, column ttls, so on) for part
/// calculated. Part without calculated TTL may exist if TTL was added after
/// part creation (using alter query with materialize_ttl setting).
@ -415,10 +427,6 @@ public:
/// Required for distinguish different copies of the same part on remote FS.
String getUniqueId() const;
/// Return hardlink count for part.
/// Required for keep data on remote FS when part has shadow copies.
UInt32 getNumberOfRefereneces() const;
protected:
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk

View File

@ -7182,15 +7182,11 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
if (!disk || !disk->supportZeroCopyReplication())
return true;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return true;
auto ref_count = part.getNumberOfRefereneces();
auto ref_count = disk->getRefCount(fs::path(part.getFullRelativePath()) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
if (ref_count > 0) /// Keep part shard info for frozen backups
return false;
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, zookeeper, *getSettings(), log,
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, getZooKeeper(), *getSettings(), log,
zookeeper_path);
}
@ -7203,7 +7199,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
bool res = true;
bool part_has_no_more_locks = true;
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
@ -7223,7 +7219,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
if (!children.empty())
{
LOG_TRACE(logger, "Found zookeper locks for {}", zookeeper_part_uniq_node);
res = false;
part_has_no_more_locks = false;
continue;
}
@ -7252,7 +7248,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
}
}
return res;
return part_has_no_more_locks;
}
@ -7660,10 +7656,12 @@ public:
table_shared_id = storage.getTableSharedID();
}
void save(DiskPtr disk, const String & path) const
void save(DiskPtr data_disk, const String & path) const
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto file_path = getFileName(path);
auto buffer = disk->writeMetaFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
auto buffer = metadata_disk->writeFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
writeIntText(version, *buffer);
buffer->write("\n", 1);
writeBoolText(is_replicated, *buffer);
@ -7678,12 +7676,14 @@ public:
buffer->write("\n", 1);
}
bool load(DiskPtr disk, const String & path)
bool load(DiskPtr data_disk, const String & path)
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto file_path = getFileName(path);
if (!disk->exists(file_path))
if (!metadata_disk->exists(file_path))
return false;
auto buffer = disk->readMetaFile(file_path, ReadSettings(), {});
auto buffer = metadata_disk->readFile(file_path, ReadSettings(), {});
readIntText(version, *buffer);
if (version != 1)
{
@ -7704,9 +7704,10 @@ public:
return true;
}
static void clean(DiskPtr disk, const String & path)
static void clean(DiskPtr data_disk, const String & path)
{
disk->removeMetaFileIfExists(getFileName(path));
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
metadata_disk->removeFileIfExists(getFileName(path));
}
private:
@ -7760,22 +7761,18 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (zookeeper)
fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
if (disk->exists(checksums))
{
fs::path checksums = fs::path(path) / "checksums.txt";
if (disk->exists(checksums))
if (disk->getRefCount(checksums) == 0)
{
auto ref_count = disk->getRefCount(checksums);
if (ref_count == 0)
{
String id = disk->getUniqueId(checksums);
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
}
else
keep_shared = true;
String id = disk->getUniqueId(checksums);
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
}
else
keep_shared = true;
}
disk->removeSharedRecursive(path, keep_shared);

View File

@ -329,6 +329,7 @@ def test_s3_zero_copy_unfreeze(cluster):
check_objects_exisis(cluster, objects01)
node1.query("TRUNCATE TABLE unfreeze_test")
node2.query("SYSTEM SYNC REPLICA unfreeze_test")
objects11 = node1.get_backuped_s3_objects("s31", "freeze_backup1")
objects12 = node2.get_backuped_s3_objects("s31", "freeze_backup2")