Introduce DiskRestartProxy.

This commit is contained in:
Pavel Kovalenko 2021-04-16 16:15:36 +03:00
parent 4c1022ac03
commit e8b4dd1e4b
5 changed files with 307 additions and 4 deletions

View File

@ -0,0 +1,224 @@
#include "DiskRestartProxy.h"
namespace DB
{
DiskRestartProxy::DiskRestartProxy(DiskPtr & delegate_)
: DiskDecorator(delegate_) { }
ReservationPtr DiskRestartProxy::reserve(UInt64 bytes)
{
ReadLock lock (mutex);
return DiskDecorator::reserve(bytes);
}
const String & DiskRestartProxy::getPath() const
{
ReadLock lock (mutex);
return DiskDecorator::getPath();
}
UInt64 DiskRestartProxy::getTotalSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getTotalSpace();
}
UInt64 DiskRestartProxy::getAvailableSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getAvailableSpace();
}
UInt64 DiskRestartProxy::getUnreservedSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getUnreservedSpace();
}
UInt64 DiskRestartProxy::getKeepingFreeSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getKeepingFreeSpace();
}
bool DiskRestartProxy::exists(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::exists(path);
}
bool DiskRestartProxy::isFile(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::isFile(path);
}
bool DiskRestartProxy::isDirectory(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::isDirectory(path);
}
size_t DiskRestartProxy::getFileSize(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getFileSize(path);
}
void DiskRestartProxy::createDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createDirectory(path);
}
void DiskRestartProxy::createDirectories(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createDirectories(path);
}
void DiskRestartProxy::clearDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::clearDirectory(path);
}
void DiskRestartProxy::moveDirectory(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path)
{
ReadLock lock (mutex);
return DiskDecorator::iterateDirectory(path);
}
void DiskRestartProxy::createFile(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createFile(path);
}
void DiskRestartProxy::moveFile(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::moveFile(from_path, to_path);
}
void DiskRestartProxy::replaceFile(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::replaceFile(from_path, to_path);
}
void DiskRestartProxy::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::copy(from_path, to_disk, to_path);
}
void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file_names)
{
ReadLock lock (mutex);
DiskDecorator::listFiles(path, file_names);
}
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache)
const
{
ReadLock lock (mutex);
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
}
std::unique_ptr<WriteBufferFromFileBase> DiskRestartProxy::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
ReadLock lock (mutex);
return DiskDecorator::writeFile(path, buf_size, mode);
}
void DiskRestartProxy::removeFile(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeFile(path);
}
void DiskRestartProxy::removeFileIfExists(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeFileIfExists(path);
}
void DiskRestartProxy::removeDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeDirectory(path);
}
void DiskRestartProxy::removeRecursive(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeRecursive(path);
}
void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_s3)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedRecursive(path, keep_s3);
}
void DiskRestartProxy::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
ReadLock lock (mutex);
DiskDecorator::setLastModified(path, timestamp);
}
Poco::Timestamp DiskRestartProxy::getLastModified(const String & path)
{
ReadLock lock (mutex);
return DiskDecorator::getLastModified(path);
}
void DiskRestartProxy::setReadOnly(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::setReadOnly(path);
}
void DiskRestartProxy::createHardLink(const String & src_path, const String & dst_path)
{
ReadLock lock (mutex);
DiskDecorator::createHardLink(src_path, dst_path);
}
void DiskRestartProxy::truncateFile(const String & path, size_t size)
{
ReadLock lock (mutex);
DiskDecorator::truncateFile(path, size);
}
String DiskRestartProxy::getUniqueId(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getUniqueId(path);
}
bool DiskRestartProxy::checkUniqueId(const String & id) const
{
ReadLock lock (mutex);
return DiskDecorator::checkUniqueId(id);
}
void DiskRestartProxy::restart(ContextConstPtr context)
{
/// Speed up processing unhealthy requests.
DiskDecorator::shutdown();
WriteLock lock (mutex, std::defer_lock);
LOG_INFO(log, "Acquiring lock to restart disk {}", DiskDecorator::getName());
lock.lock();
LOG_INFO(log, "Restart disk {}", DiskDecorator::getName());
}
}

View File

@ -0,0 +1,77 @@
#pragma once
#include "DiskDecorator.h"
#include <common/logger_useful.h>
#include <shared_mutex>
namespace DB
{
class RestartAwareReadBuffer;
class RestartAwareWriteBuffer;
class DiskRestartProxy : public DiskDecorator
{
public:
using ReadLock = std::shared_lock<std::shared_timed_mutex>;
using WriteLock = std::unique_lock<std::shared_timed_mutex>;
explicit DiskRestartProxy(DiskPtr & delegate_);
ReservationPtr reserve(UInt64 bytes) override;
const String & getPath() const override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const DiskPtr & to_disk, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override;
void restart(ContextConstPtr context);
private:
friend class RestartAwareReadBuffer;
friend class RestartAwareWriteBuffer;
/// Mutex to protect RW access.
mutable std::shared_timed_mutex mutex;
Poco::Logger * log = &Poco::Logger::get("DiskRestartProxy");
};
}

View File

@ -950,6 +950,8 @@ void DiskS3::startup()
LOG_INFO(log, "Starting up disk {}", name);
restore();
if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION)
migrateToRestorableSchema();

View File

@ -133,9 +133,6 @@ public:
/// Actions performed after disk creation.
void startup();
/// Restore S3 metadata files on file system.
void restore();
/// Dumps current revision counter into file 'revision.txt' at given path.
void onFreeze(const String & path) override;
@ -167,6 +164,8 @@ private:
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
/// Restore S3 metadata files on file system.
void restore();
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(const RestoreInformation & restore_information);
void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector<String> keys);

View File

@ -19,6 +19,8 @@
#include "ProxyConfiguration.h"
#include "ProxyListConfiguration.h"
#include "ProxyResolverConfiguration.h"
#include "Disks/DiskRestartProxy.h"
#include "Disks/DiskRestartProxy.cpp"
namespace DB
@ -177,7 +179,6 @@ void registerDiskS3(DiskFactory & factory)
checkRemoveAccess(*s3disk);
}
s3disk->restore();
s3disk->startup();
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);