From e8b4dd1e4b1d766764f9430f98531fbab12da09c Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Fri, 16 Apr 2021 16:15:36 +0300 Subject: [PATCH] Introduce DiskRestartProxy. --- src/Disks/DiskRestartProxy.cpp | 224 ++++++++++++++++++++++++++++++++ src/Disks/DiskRestartProxy.h | 77 +++++++++++ src/Disks/S3/DiskS3.cpp | 2 + src/Disks/S3/DiskS3.h | 5 +- src/Disks/S3/registerDiskS3.cpp | 3 +- 5 files changed, 307 insertions(+), 4 deletions(-) create mode 100644 src/Disks/DiskRestartProxy.cpp create mode 100644 src/Disks/DiskRestartProxy.h diff --git a/src/Disks/DiskRestartProxy.cpp b/src/Disks/DiskRestartProxy.cpp new file mode 100644 index 00000000000..9ced50d3f15 --- /dev/null +++ b/src/Disks/DiskRestartProxy.cpp @@ -0,0 +1,224 @@ +#include "DiskRestartProxy.h" + +namespace DB +{ + +DiskRestartProxy::DiskRestartProxy(DiskPtr & delegate_) + : DiskDecorator(delegate_) { } + +ReservationPtr DiskRestartProxy::reserve(UInt64 bytes) +{ + ReadLock lock (mutex); + return DiskDecorator::reserve(bytes); +} + +const String & DiskRestartProxy::getPath() const +{ + ReadLock lock (mutex); + return DiskDecorator::getPath(); +} + +UInt64 DiskRestartProxy::getTotalSpace() const +{ + ReadLock lock (mutex); + return DiskDecorator::getTotalSpace(); +} +UInt64 DiskRestartProxy::getAvailableSpace() const +{ + ReadLock lock (mutex); + return DiskDecorator::getAvailableSpace(); +} +UInt64 DiskRestartProxy::getUnreservedSpace() const +{ + ReadLock lock (mutex); + return DiskDecorator::getUnreservedSpace(); +} +UInt64 DiskRestartProxy::getKeepingFreeSpace() const +{ + ReadLock lock (mutex); + return DiskDecorator::getKeepingFreeSpace(); +} +bool DiskRestartProxy::exists(const String & path) const +{ + ReadLock lock (mutex); + return DiskDecorator::exists(path); +} +bool DiskRestartProxy::isFile(const String & path) const +{ + ReadLock lock (mutex); + return DiskDecorator::isFile(path); +} +bool DiskRestartProxy::isDirectory(const String & path) const +{ + ReadLock lock (mutex); + return DiskDecorator::isDirectory(path); +} +size_t DiskRestartProxy::getFileSize(const String & path) const +{ + ReadLock lock (mutex); + return DiskDecorator::getFileSize(path); +} +void DiskRestartProxy::createDirectory(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::createDirectory(path); +} +void DiskRestartProxy::createDirectories(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::createDirectories(path); +} +void DiskRestartProxy::clearDirectory(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::clearDirectory(path); +} +void DiskRestartProxy::moveDirectory(const String & from_path, const String & to_path) +{ + ReadLock lock (mutex); + DiskDecorator::moveDirectory(from_path, to_path); +} + +DiskDirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path) +{ + ReadLock lock (mutex); + return DiskDecorator::iterateDirectory(path); +} + +void DiskRestartProxy::createFile(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::createFile(path); +} + +void DiskRestartProxy::moveFile(const String & from_path, const String & to_path) +{ + ReadLock lock (mutex); + DiskDecorator::moveFile(from_path, to_path); +} + +void DiskRestartProxy::replaceFile(const String & from_path, const String & to_path) +{ + ReadLock lock (mutex); + DiskDecorator::replaceFile(from_path, to_path); +} + +void DiskRestartProxy::copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) +{ + ReadLock lock (mutex); + DiskDecorator::copy(from_path, to_disk, to_path); +} + +void DiskRestartProxy::listFiles(const String & path, std::vector & file_names) +{ + ReadLock lock (mutex); + DiskDecorator::listFiles(path, file_names); +} + +std::unique_ptr DiskRestartProxy::readFile( + const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) + const +{ + ReadLock lock (mutex); + return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache); +} + +std::unique_ptr DiskRestartProxy::writeFile(const String & path, size_t buf_size, WriteMode mode) +{ + ReadLock lock (mutex); + return DiskDecorator::writeFile(path, buf_size, mode); +} + +void DiskRestartProxy::removeFile(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::removeFile(path); +} + +void DiskRestartProxy::removeFileIfExists(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::removeFileIfExists(path); +} + +void DiskRestartProxy::removeDirectory(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::removeDirectory(path); +} + +void DiskRestartProxy::removeRecursive(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::removeRecursive(path); +} + +void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3) +{ + ReadLock lock (mutex); + DiskDecorator::removeSharedFile(path, keep_s3); +} + +void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_s3) +{ + ReadLock lock (mutex); + DiskDecorator::removeSharedRecursive(path, keep_s3); +} + +void DiskRestartProxy::setLastModified(const String & path, const Poco::Timestamp & timestamp) +{ + ReadLock lock (mutex); + DiskDecorator::setLastModified(path, timestamp); +} + +Poco::Timestamp DiskRestartProxy::getLastModified(const String & path) +{ + ReadLock lock (mutex); + return DiskDecorator::getLastModified(path); +} + +void DiskRestartProxy::setReadOnly(const String & path) +{ + ReadLock lock (mutex); + DiskDecorator::setReadOnly(path); +} + +void DiskRestartProxy::createHardLink(const String & src_path, const String & dst_path) +{ + ReadLock lock (mutex); + DiskDecorator::createHardLink(src_path, dst_path); +} + +void DiskRestartProxy::truncateFile(const String & path, size_t size) +{ + ReadLock lock (mutex); + DiskDecorator::truncateFile(path, size); +} + +String DiskRestartProxy::getUniqueId(const String & path) const +{ + ReadLock lock (mutex); + return DiskDecorator::getUniqueId(path); +} + +bool DiskRestartProxy::checkUniqueId(const String & id) const +{ + ReadLock lock (mutex); + return DiskDecorator::checkUniqueId(id); +} + +void DiskRestartProxy::restart(ContextConstPtr context) +{ + /// Speed up processing unhealthy requests. + DiskDecorator::shutdown(); + + WriteLock lock (mutex, std::defer_lock); + + LOG_INFO(log, "Acquiring lock to restart disk {}", DiskDecorator::getName()); + + lock.lock(); + + LOG_INFO(log, "Restart disk {}", DiskDecorator::getName()); +} + +} diff --git a/src/Disks/DiskRestartProxy.h b/src/Disks/DiskRestartProxy.h new file mode 100644 index 00000000000..9dfdfdd801b --- /dev/null +++ b/src/Disks/DiskRestartProxy.h @@ -0,0 +1,77 @@ +#pragma once + +#include "DiskDecorator.h" + +#include +#include + +namespace DB +{ + +class RestartAwareReadBuffer; +class RestartAwareWriteBuffer; + +class DiskRestartProxy : public DiskDecorator +{ +public: + using ReadLock = std::shared_lock; + using WriteLock = std::unique_lock; + + + explicit DiskRestartProxy(DiskPtr & delegate_); + + ReservationPtr reserve(UInt64 bytes) override; + const String & getPath() const override; + UInt64 getTotalSpace() const override; + UInt64 getAvailableSpace() const override; + UInt64 getUnreservedSpace() const override; + UInt64 getKeepingFreeSpace() const override; + bool exists(const String & path) const override; + bool isFile(const String & path) const override; + bool isDirectory(const String & path) const override; + size_t getFileSize(const String & path) const override; + void createDirectory(const String & path) override; + void createDirectories(const String & path) override; + void clearDirectory(const String & path) override; + void moveDirectory(const String & from_path, const String & to_path) override; + DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + void createFile(const String & path) override; + void moveFile(const String & from_path, const String & to_path) override; + void replaceFile(const String & from_path, const String & to_path) override; + void copy(const String & from_path, const DiskPtr & to_disk, const String & to_path) override; + void listFiles(const String & path, std::vector & file_names) override; + std::unique_ptr readFile( + const String & path, + size_t buf_size, + size_t estimated_size, + size_t aio_threshold, + size_t mmap_threshold, + MMappedFileCache * mmap_cache) const override; + std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; + void removeFile(const String & path) override; + void removeFileIfExists(const String & path) override; + void removeDirectory(const String & path) override; + void removeRecursive(const String & path) override; + void removeSharedFile(const String & path, bool keep_s3) override; + void removeSharedRecursive(const String & path, bool keep_s3) override; + void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; + Poco::Timestamp getLastModified(const String & path) override; + void setReadOnly(const String & path) override; + void createHardLink(const String & src_path, const String & dst_path) override; + void truncateFile(const String & path, size_t size) override; + String getUniqueId(const String & path) const override; + bool checkUniqueId(const String & id) const override; + + void restart(ContextConstPtr context); + +private: + friend class RestartAwareReadBuffer; + friend class RestartAwareWriteBuffer; + + /// Mutex to protect RW access. + mutable std::shared_timed_mutex mutex; + + Poco::Logger * log = &Poco::Logger::get("DiskRestartProxy"); +}; + +} diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 14d3e1e5d5e..53de3f15f28 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -950,6 +950,8 @@ void DiskS3::startup() LOG_INFO(log, "Starting up disk {}", name); + restore(); + if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION) migrateToRestorableSchema(); diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index 758d4055a3e..3d85fdb74f9 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -133,9 +133,6 @@ public: /// Actions performed after disk creation. void startup(); - /// Restore S3 metadata files on file system. - void restore(); - /// Dumps current revision counter into file 'revision.txt' at given path. void onFreeze(const String & path) override; @@ -167,6 +164,8 @@ private: void listObjects(const String & source_bucket, const String & source_path, std::function callback); void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key); + /// Restore S3 metadata files on file system. + void restore(); void readRestoreInformation(RestoreInformation & restore_information); void restoreFiles(const RestoreInformation & restore_information); void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector keys); diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 352f7467ba0..32d2bff86d8 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -19,6 +19,8 @@ #include "ProxyConfiguration.h" #include "ProxyListConfiguration.h" #include "ProxyResolverConfiguration.h" +#include "Disks/DiskRestartProxy.h" +#include "Disks/DiskRestartProxy.cpp" namespace DB @@ -177,7 +179,6 @@ void registerDiskS3(DiskFactory & factory) checkRemoveAccess(*s3disk); } - s3disk->restore(); s3disk->startup(); bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);