Remove disk restart proxy and disk decorator (#44647)

* Remove disk restart proxy and disk decorator
* Automatic style fix
* Returned some trash back
* Fix build again
* Fix failing test

Co-authored-by: robot-clickhouse <robot-clickhouse@users.noreply.github.com>
This commit is contained in:
alesapin 2022-12-30 14:47:30 +01:00 committed by GitHub
parent 178417d1ef
commit 4948a8c17b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 61 additions and 1019 deletions

View File

@ -1,259 +0,0 @@
#include "DiskDecorator.h"
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
namespace DB
{
DiskDecorator::DiskDecorator(const DiskPtr & delegate_)
: IDisk(/* name_= */ "<decorator>")
, delegate(delegate_)
{
}
DiskTransactionPtr DiskDecorator::createTransaction()
{
return delegate->createTransaction();
}
const String & DiskDecorator::getName() const
{
return delegate->getName();
}
ReservationPtr DiskDecorator::reserve(UInt64 bytes)
{
return delegate->reserve(bytes);
}
const String & DiskDecorator::getPath() const
{
return delegate->getPath();
}
UInt64 DiskDecorator::getTotalSpace() const
{
return delegate->getTotalSpace();
}
UInt64 DiskDecorator::getAvailableSpace() const
{
return delegate->getAvailableSpace();
}
UInt64 DiskDecorator::getUnreservedSpace() const
{
return delegate->getUnreservedSpace();
}
UInt64 DiskDecorator::getKeepingFreeSpace() const
{
return delegate->getKeepingFreeSpace();
}
bool DiskDecorator::exists(const String & path) const
{
return delegate->exists(path);
}
bool DiskDecorator::isFile(const String & path) const
{
return delegate->isFile(path);
}
bool DiskDecorator::isDirectory(const String & path) const
{
return delegate->isDirectory(path);
}
size_t DiskDecorator::getFileSize(const String & path) const
{
return delegate->getFileSize(path);
}
void DiskDecorator::createDirectory(const String & path)
{
delegate->createDirectory(path);
}
void DiskDecorator::createDirectories(const String & path)
{
delegate->createDirectories(path);
}
void DiskDecorator::clearDirectory(const String & path)
{
delegate->clearDirectory(path);
}
void DiskDecorator::moveDirectory(const String & from_path, const String & to_path)
{
delegate->moveDirectory(from_path, to_path);
}
DirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path) const
{
return delegate->iterateDirectory(path);
}
void DiskDecorator::createFile(const String & path)
{
delegate->createFile(path);
}
void DiskDecorator::moveFile(const String & from_path, const String & to_path)
{
delegate->moveFile(from_path, to_path);
}
void DiskDecorator::replaceFile(const String & from_path, const String & to_path)
{
delegate->replaceFile(from_path, to_path);
}
void DiskDecorator::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
delegate->copy(from_path, to_disk, to_path);
}
void DiskDecorator::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
{
delegate->copyDirectoryContent(from_dir, to_disk, to_dir);
}
void DiskDecorator::listFiles(const String & path, std::vector<String> & file_names) const
{
delegate->listFiles(path, file_names);
}
std::unique_ptr<ReadBufferFromFileBase>
DiskDecorator::readFile(
const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
{
return delegate->readFile(path, settings, read_hint, file_size);
}
std::unique_ptr<WriteBufferFromFileBase>
DiskDecorator::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings)
{
return delegate->writeFile(path, buf_size, mode, settings);
}
void DiskDecorator::removeFile(const String & path)
{
delegate->removeFile(path);
}
void DiskDecorator::removeFileIfExists(const String & path)
{
delegate->removeFileIfExists(path);
}
void DiskDecorator::removeDirectory(const String & path)
{
delegate->removeDirectory(path);
}
void DiskDecorator::removeRecursive(const String & path)
{
delegate->removeRecursive(path);
}
void DiskDecorator::removeSharedFile(const String & path, bool keep_s3)
{
delegate->removeSharedFile(path, keep_s3);
}
void DiskDecorator::removeSharedFileIfExists(const String & path, bool keep_s3)
{
delegate->removeSharedFileIfExists(path, keep_s3);
}
void DiskDecorator::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
delegate->removeSharedFiles(files, keep_all_batch_data, file_names_remove_metadata_only);
}
void DiskDecorator::removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
delegate->removeSharedRecursive(path, keep_all_batch_data, file_names_remove_metadata_only);
}
void DiskDecorator::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
delegate->setLastModified(path, timestamp);
}
Poco::Timestamp DiskDecorator::getLastModified(const String & path) const
{
return delegate->getLastModified(path);
}
time_t DiskDecorator::getLastChanged(const String & path) const
{
return delegate->getLastChanged(path);
}
void DiskDecorator::setReadOnly(const String & path)
{
delegate->setReadOnly(path);
}
void DiskDecorator::createHardLink(const String & src_path, const String & dst_path)
{
delegate->createHardLink(src_path, dst_path);
}
void DiskDecorator::truncateFile(const String & path, size_t size)
{
delegate->truncateFile(path, size);
}
Executor & DiskDecorator::getExecutor()
{
return delegate->getExecutor();
}
SyncGuardPtr DiskDecorator::getDirectorySyncGuard(const String & path) const
{
return delegate->getDirectorySyncGuard(path);
}
void DiskDecorator::onFreeze(const String & path)
{
delegate->onFreeze(path);
}
void DiskDecorator::shutdown()
{
delegate->shutdown();
}
void DiskDecorator::startupImpl(ContextPtr context)
{
delegate->startupImpl(context);
}
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map)
{
delegate->applyNewSettings(config, context, config_prefix, map);
}
DiskObjectStoragePtr DiskDecorator::createDiskObjectStorage()
{
return delegate->createDiskObjectStorage();
}
ObjectStoragePtr DiskDecorator::getObjectStorage()
{
return delegate->getObjectStorage();
}
DiskPtr DiskDecorator::getNestedDisk() const
{
if (const auto * decorator = dynamic_cast<const DiskDecorator *>(delegate.get()))
return decorator->getNestedDisk();
return delegate;
}
}

View File

@ -1,139 +0,0 @@
#pragma once
#include "Disks/IDisk.h"
namespace DB
{
/** Forwards all methods to another disk.
* Methods can be overridden by descendants.
*/
class DiskDecorator : public IDisk
{
public:
explicit DiskDecorator(const DiskPtr & delegate_);
DiskTransactionPtr createTransaction() override;
const String & getName() const override;
ReservationPtr reserve(UInt64 bytes) override;
~DiskDecorator() override = default;
const String & getPath() const override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) const override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
void listFiles(const String & path, std::vector<String> & file_names) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode,
const WriteSettings & settings) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeSharedFileIfExists(const String & path, bool keep_s3) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
time_t getLastChanged(const String & path) const override;
Poco::Timestamp getLastModified(const String & path) const override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
int open(const String & path, mode_t mode) const;
void close(int fd) const;
void sync(int fd) const;
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DataSourceDescription getDataSourceDescription() const override { return delegate->getDataSourceDescription(); }
bool isRemote() const override { return delegate->isRemote(); }
bool isReadOnly() const override { return delegate->isReadOnly(); }
bool isWriteOnce() const override { return delegate->isWriteOnce(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
bool supportParallelWrite() const override { return delegate->supportParallelWrite(); }
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startupImpl(ContextPtr context) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
bool supportsCache() const override { return delegate->supportsCache(); }
const String & getCacheBasePath() const override { return delegate->getCacheBasePath(); }
StoredObjects getStorageObjects(const String & path) const override { return delegate->getStorageObjects(path); }
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithObjectStoragePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
DiskObjectStoragePtr createDiskObjectStorage() override;
ObjectStoragePtr getObjectStorage() override;
NameSet getCacheLayersNames() const override { return delegate->getCacheLayersNames(); }
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override { return delegate->getSerializedMetadata(file_paths); }
UInt32 getRefCount(const String & path) const override { return delegate->getRefCount(path); }
void syncRevision(UInt64 revision) override { delegate->syncRevision(revision); }
UInt64 getRevision() const override { return delegate->getRevision(); }
bool supportsStat() const override { return delegate->supportsStat(); }
struct stat stat(const String & path) const override { return delegate->stat(path); }
bool supportsChmod() const override { return delegate->supportsChmod(); }
void chmod(const String & path, mode_t mode) override { delegate->chmod(path, mode); }
virtual DiskPtr getNestedDisk() const;
protected:
Executor & getExecutor() override;
DiskPtr delegate;
};
/// TODO: Current reservation mechanism leaks IDisk abstraction details.
/// This hack is needed to return proper disk pointer (wrapper instead of implementation) from reservation object.
class ReservationDelegate : public IReservation
{
public:
ReservationDelegate(ReservationPtr delegate_, DiskPtr wrapper_) : delegate(std::move(delegate_)), wrapper(wrapper_) { }
UInt64 getSize() const override { return delegate->getSize(); }
UInt64 getUnreservedSpace() const override { return delegate->getUnreservedSpace(); }
DiskPtr getDisk(size_t) const override { return wrapper; }
Disks getDisks() const override { return {wrapper}; }
void update(UInt64 new_size) override { delegate->update(new_size); }
private:
ReservationPtr delegate;
DiskPtr wrapper;
};
}

View File

@ -209,7 +209,8 @@ DiskEncrypted::DiskEncrypted(
}
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
: DiskDecorator(settings_->wrapped_disk)
: IDisk(name_)
, delegate(settings_->wrapped_disk)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)

View File

@ -4,7 +4,6 @@
#if USE_SSL
#include <Disks/IDisk.h>
#include <Disks/DiskDecorator.h>
#include <Common/MultiVersion.h>
#include <Disks/FakeDiskTransaction.h>
@ -27,7 +26,7 @@ struct DiskEncryptedSettings
/// Encrypted disk ciphers all written files on the fly and writes the encrypted files to an underlying (normal) disk.
/// And when we read files from an encrypted disk it deciphers them automatically,
/// so we can work with a encrypted disk like it's a normal disk.
class DiskEncrypted : public DiskDecorator
class DiskEncrypted : public IDisk
{
public:
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
@ -252,6 +251,32 @@ public:
return std::make_shared<FakeDiskTransaction>(*this);
}
UInt64 getTotalSpace() const override
{
return delegate->getTotalSpace();
}
UInt64 getAvailableSpace() const override
{
return delegate->getAvailableSpace();
}
UInt64 getUnreservedSpace() const override
{
return delegate->getUnreservedSpace();
}
bool supportZeroCopyReplication() const override
{
return delegate->supportZeroCopyReplication();
}
MetadataStoragePtr getMetadataStorage() override
{
return delegate->getMetadataStorage();
}
private:
String wrappedPath(const String & path) const
{
@ -261,6 +286,7 @@ private:
return disk_path + path;
}
DiskPtr delegate;
const String encrypted_name;
const String disk_path;
const String disk_absolute_path;

View File

@ -18,7 +18,6 @@
#include <sys/stat.h>
#include <Disks/DiskFactory.h>
#include <Disks/DiskRestartProxy.h>
#include <Common/randomSeed.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromTemporaryFile.h>
@ -775,7 +774,7 @@ void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(disk);
return disk;
};
factory.registerDiskType("local", creator);
}

View File

@ -1,378 +0,0 @@
#include "DiskRestartProxy.h"
#include <IO/ReadBufferFromFileDecorator.h>
#include <IO/WriteBufferFromFileDecorator.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DEADLOCK_AVOIDED;
}
using Millis = std::chrono::milliseconds;
using Seconds = std::chrono::seconds;
/// Holds restart read lock till buffer destruction.
class RestartAwareReadBuffer : public ReadBufferFromFileDecorator
{
public:
RestartAwareReadBuffer(const DiskRestartProxy & disk, std::unique_ptr<ReadBufferFromFileBase> impl_)
: ReadBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
void prefetch() override
{
swap(*impl);
impl->prefetch();
swap(*impl);
}
void setReadUntilPosition(size_t position) override
{
swap(*impl);
impl->setReadUntilPosition(position);
swap(*impl);
}
void setReadUntilEnd() override
{
swap(*impl);
impl->setReadUntilEnd();
swap(*impl);
}
String getInfoForLog() override { return impl->getInfoForLog(); }
private:
ReadLock lock;
};
/// Holds restart read lock till buffer finalize.
class RestartAwareWriteBuffer : public WriteBufferFromFileDecorator
{
public:
RestartAwareWriteBuffer(const DiskRestartProxy & disk, std::unique_ptr<WriteBuffer> impl_)
: WriteBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
~RestartAwareWriteBuffer() override
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void finalizeImpl() override
{
WriteBufferFromFileDecorator::finalizeImpl();
lock.unlock();
}
private:
ReadLock lock;
};
DiskRestartProxy::DiskRestartProxy(DiskPtr & delegate_)
: DiskDecorator(delegate_)
{}
ReservationPtr DiskRestartProxy::reserve(UInt64 bytes)
{
ReadLock lock (mutex);
auto ptr = DiskDecorator::reserve(bytes);
if (ptr)
{
auto disk_ptr = std::static_pointer_cast<DiskRestartProxy>(shared_from_this());
return std::make_unique<ReservationDelegate>(std::move(ptr), disk_ptr);
}
return ptr;
}
const String & DiskRestartProxy::getPath() const
{
ReadLock lock (mutex);
return DiskDecorator::getPath();
}
UInt64 DiskRestartProxy::getTotalSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getTotalSpace();
}
UInt64 DiskRestartProxy::getAvailableSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getAvailableSpace();
}
UInt64 DiskRestartProxy::getUnreservedSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getUnreservedSpace();
}
UInt64 DiskRestartProxy::getKeepingFreeSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getKeepingFreeSpace();
}
bool DiskRestartProxy::exists(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::exists(path);
}
bool DiskRestartProxy::isFile(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::isFile(path);
}
bool DiskRestartProxy::isDirectory(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::isDirectory(path);
}
size_t DiskRestartProxy::getFileSize(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getFileSize(path);
}
void DiskRestartProxy::createDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createDirectory(path);
}
void DiskRestartProxy::createDirectories(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createDirectories(path);
}
void DiskRestartProxy::clearDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::clearDirectory(path);
}
void DiskRestartProxy::moveDirectory(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::moveDirectory(from_path, to_path);
}
DirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::iterateDirectory(path);
}
void DiskRestartProxy::createFile(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createFile(path);
}
void DiskRestartProxy::moveFile(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::moveFile(from_path, to_path);
}
void DiskRestartProxy::replaceFile(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::replaceFile(from_path, to_path);
}
void DiskRestartProxy::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::copy(from_path, to_disk, to_path);
}
void DiskRestartProxy::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
{
ReadLock lock (mutex);
DiskDecorator::copyDirectoryContent(from_dir, to_disk, to_dir);
}
void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file_names) const
{
ReadLock lock (mutex);
DiskDecorator::listFiles(path, file_names);
}
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
{
ReadLock lock (mutex);
auto impl = DiskDecorator::readFile(path, settings, read_hint, file_size);
return std::make_unique<RestartAwareReadBuffer>(*this, std::move(impl));
}
std::unique_ptr<WriteBufferFromFileBase> DiskRestartProxy::writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings)
{
ReadLock lock (mutex);
auto impl = DiskDecorator::writeFile(path, buf_size, mode, settings);
return std::make_unique<RestartAwareWriteBuffer>(*this, std::move(impl));
}
void DiskRestartProxy::removeFile(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeFile(path);
}
void DiskRestartProxy::removeFileIfExists(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeFileIfExists(path);
}
void DiskRestartProxy::removeDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeDirectory(path);
}
void DiskRestartProxy::removeRecursive(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeRecursive(path);
}
void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskRestartProxy::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedFiles(files, keep_all_batch_data, file_names_remove_metadata_only);
}
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedRecursive(path, keep_all_batch_data, file_names_remove_metadata_only);
}
void DiskRestartProxy::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
ReadLock lock (mutex);
DiskDecorator::setLastModified(path, timestamp);
}
Poco::Timestamp DiskRestartProxy::getLastModified(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getLastModified(path);
}
void DiskRestartProxy::setReadOnly(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::setReadOnly(path);
}
void DiskRestartProxy::createHardLink(const String & src_path, const String & dst_path)
{
ReadLock lock (mutex);
DiskDecorator::createHardLink(src_path, dst_path);
}
void DiskRestartProxy::truncateFile(const String & path, size_t size)
{
ReadLock lock (mutex);
DiskDecorator::truncateFile(path, size);
}
String DiskRestartProxy::getUniqueId(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getUniqueId(path);
}
bool DiskRestartProxy::checkUniqueId(const String & id) const
{
ReadLock lock (mutex);
return DiskDecorator::checkUniqueId(id);
}
const String & DiskRestartProxy::getCacheBasePath() const
{
ReadLock lock (mutex);
return DiskDecorator::getCacheBasePath();
}
StoredObjects DiskRestartProxy::getStorageObjects(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getStorageObjects(path);
}
void DiskRestartProxy::getRemotePathsRecursive(
const String & path, std::vector<LocalPathWithObjectStoragePaths> & paths_map)
{
ReadLock lock (mutex);
return DiskDecorator::getRemotePathsRecursive(path, paths_map);
}
DiskPtr DiskRestartProxy::getNestedDisk() const
{
DiskPtr delegate_copy;
{
ReadLock lock (mutex);
delegate_copy = delegate;
}
if (const auto * decorator = dynamic_cast<const DiskDecorator *>(delegate_copy.get()))
return decorator->getNestedDisk();
return delegate_copy;
}
void DiskRestartProxy::restart(ContextPtr context)
{
/// Speed up processing unhealthy requests.
DiskDecorator::shutdown();
WriteLock lock (mutex, std::defer_lock);
LOG_INFO(log, "Acquiring lock to restart disk {}", DiskDecorator::getName());
auto start_time = std::chrono::steady_clock::now();
auto lock_timeout = Seconds(120);
do
{
/// Use a small timeout to not block read operations for a long time.
if (lock.try_lock_for(Millis(10)))
break;
} while (std::chrono::steady_clock::now() - start_time < lock_timeout);
if (!lock.owns_lock())
throw Exception("Failed to acquire restart lock within timeout. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName());
/// NOTE: access checking will cause deadlock here, so skip it.
DiskDecorator::startup(context, /* skip_access_check= */ true);
LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName());
}
}

View File

@ -1,86 +0,0 @@
#pragma once
#include "DiskDecorator.h"
#include <Common/logger_useful.h>
#include <shared_mutex>
namespace DB
{
using ReadLock = std::shared_lock<std::shared_timed_mutex>;
using WriteLock = std::unique_lock<std::shared_timed_mutex>;
class RestartAwareReadBuffer;
class RestartAwareWriteBuffer;
/**
* Gives possibility to change underlying disk settings at runtime calling 'restart' method.
* All disk methods are protected by read-lock. Read/Write buffers produced by disk holds read-lock till buffer is finalized/destructed.
* When 'restart' method is called write-lock is acquired to make sure that no operations are running on that disk.
*/
class DiskRestartProxy : public DiskDecorator
{
public:
explicit DiskRestartProxy(DiskPtr & delegate_);
ReservationPtr reserve(UInt64 bytes) override;
const String & getPath() const override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) const override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const DiskPtr & to_disk, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
void listFiles(const String & path, std::vector<String> & file_names) const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) const override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override;
const String & getCacheBasePath() const override;
StoredObjects getStorageObjects(const String & path) const override;
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithObjectStoragePaths> & paths_map) override;
void restart(ContextPtr context);
DiskPtr getNestedDisk() const override;
private:
friend class RestartAwareReadBuffer;
friend class RestartAwareWriteBuffer;
/// Mutex to protect RW access.
mutable std::shared_timed_mutex mutex;
Poco::Logger * log = &Poco::Logger::get("DiskRestartProxy");
};
}

View File

@ -4,8 +4,6 @@
#if USE_AZURE_BLOB_STORAGE
#include <Disks/DiskRestartProxy.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
@ -51,7 +49,7 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
azure_blob_storage_disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(azure_blob_storage_disk);
return azure_blob_storage_disk;
};
factory.registerDiskType("azure_blob_storage", creator);

View File

@ -3,7 +3,6 @@
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/DiskFactory.h>
#include <Disks/DiskRestartProxy.h>
#include <Storages/HDFS/HDFSCommon.h>
namespace DB
@ -55,7 +54,7 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
copy_thread_pool_size);
disk->startup(context, skip_access_check);
return std::make_shared<DiskRestartProxy>(disk);
return disk;
};
factory.registerDiskType("hdfs", creator);

View File

@ -20,7 +20,6 @@
#include <Disks/ObjectStorages/S3/ProxyListConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskLocal.h>
namespace DB

View File

@ -11,7 +11,6 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <base/getFQDNOrHostName.h>
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
@ -166,9 +165,7 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
s3disk->startup(context, skip_access_check);
std::shared_ptr<IDisk> disk_result = s3disk;
return std::make_shared<DiskRestartProxy>(disk_result);
return s3disk;
};
factory.registerDiskType("s3", creator);
factory.registerDiskType("s3_plain", creator);

View File

@ -31,7 +31,6 @@
#include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/DiskDecorator.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
@ -826,8 +825,6 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
/// Check that underlying disk is local (can be wrapped in decorator)
DiskPtr disk_ptr = disk;
if (const auto * disk_decorator = dynamic_cast<const DiskDecorator *>(disk_ptr.get()))
disk_ptr = disk_decorator->getNestedDisk();
if (dynamic_cast<const DiskLocal *>(disk_ptr.get()) == nullptr)
{

View File

@ -42,7 +42,6 @@
#include <Access/Common/AllowedClientHosts.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseReplicated.h>
#include <Disks/DiskRestartProxy.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h>
@ -62,6 +61,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -509,7 +509,6 @@ BlockIO InterpreterSystemQuery::execute()
break;
case Type::RESTART_DISK:
restartDisk(query.disk);
break;
case Type::FLUSH_LOGS:
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
@ -912,16 +911,10 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}
void InterpreterSystemQuery::restartDisk(String & name)
[[noreturn]] void InterpreterSystemQuery::restartDisk(String &)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_DISK);
auto disk = getContext()->getDisk(name);
if (DiskRestartProxy * restart_proxy = dynamic_cast<DiskRestartProxy*>(disk.get()))
restart_proxy->restart(getContext());
else
throw Exception("Disk " + name + " doesn't have possibility to restart", ErrorCodes::BAD_ARGUMENTS);
throw Exception("SYSTEM RESTART DISK is not supported", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -68,7 +68,7 @@ private:
bool dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table);
void dropDatabaseReplica(ASTSystemQuery & query);
void flushDistributed(ASTSystemQuery & query);
void restartDisk(String & name);
[[noreturn]] void restartDisk(String & name);
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);

View File

@ -68,7 +68,7 @@ enum class SystemQueryTargetType
{
Model,
Function,
Disk
Disk,
};
[[nodiscard]] static bool parseQueryWithOnClusterAndTarget(std::shared_ptr<ASTSystemQuery> & res, IParser::Pos & pos, Expected & expected, SystemQueryTargetType target_type)
@ -269,14 +269,12 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
return false;
break;
}
case Type::RESTART_DISK:
{
if (!parseQueryWithOnClusterAndTarget(res, pos, expected, SystemQueryTargetType::Disk))
return false;
break;
}
/// FLUSH DISTRIBUTED requires table
/// START/STOP DISTRIBUTED SENDS does not require table
case Type::STOP_DISTRIBUTED_SENDS:

View File

@ -97,8 +97,8 @@ def test_jbod_ha(start_cluster):
privileged=True,
user="root",
)
node1.query("system restart disk jbod1")
node1.restart_clickhouse()
time.sleep(5)
assert (

View File

@ -514,61 +514,6 @@ def test_apply_new_settings(cluster):
)
# NOTE: this test takes a couple of minutes when run together with other tests
@pytest.mark.long_run
def test_restart_during_load(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
# Force multi-part upload mode.
replace_config(
config_path, "<container_already_exists>false</container_already_exists>", ""
)
azure_query(
node, f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}"
)
azure_query(
node,
f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-05', 4096, -1)}",
)
def read():
for ii in range(0, 5):
logging.info(f"Executing {ii} query")
assert (
azure_query(node, f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values")
== "(0)"
)
logging.info(f"Query {ii} executed")
time.sleep(0.2)
def restart_disk():
for iii in range(0, 2):
logging.info(f"Restarting disk, attempt {iii}")
node.query(f"SYSTEM RESTART DISK {AZURE_BLOB_STORAGE_DISK}")
logging.info(f"Disk restarted, attempt {iii}")
time.sleep(0.5)
threads = []
for _ in range(0, 4):
threads.append(SafeThread(target=read))
threads.append(SafeThread(target=restart_disk))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
def test_big_insert(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)

View File

@ -650,49 +650,6 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
assert get_s3_requests() - s3_requests_before == s3_requests_to_write_partition * 3
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_disk_restart_during_load(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
node.query(
"INSERT INTO s3_test VALUES {}".format(
generate_values("2020-01-04", 1024 * 1024)
)
)
node.query(
"INSERT INTO s3_test VALUES {}".format(
generate_values("2020-01-05", 1024 * 1024, -1)
)
)
def read():
for ii in range(0, 20):
logging.info("Executing %d query", ii)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
logging.info("Query %d executed", ii)
time.sleep(0.2)
def restart_disk():
for iii in range(0, 5):
logging.info("Restarting disk, attempt %d", iii)
node.query("SYSTEM RESTART DISK s3")
logging.info("Disk restarted, attempt %d", iii)
time.sleep(0.5)
threads = []
for i in range(0, 4):
threads.append(SafeThread(target=read))
threads.append(SafeThread(target=restart_disk))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_no_delete_objects(cluster, node_name):
node = cluster.instances[node_name]

View File

@ -242,8 +242,7 @@ def test_full_restore(cluster, replicated, db_atomic):
node.query("DETACH TABLE s3.test")
drop_s3_metadata(node)
create_restore_file(node)
node.query("SYSTEM RESTART DISK s3")
node.query("ATTACH TABLE s3.test")
node.restart_clickhouse()
assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(
4096 * 4
@ -283,7 +282,7 @@ def test_restore_another_bucket_path(cluster, db_atomic):
node_another_bucket = cluster.instances["node_another_bucket"]
create_restore_file(node_another_bucket, bucket="root")
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid
)
@ -298,7 +297,7 @@ def test_restore_another_bucket_path(cluster, db_atomic):
node_another_bucket_path = cluster.instances["node_another_bucket_path"]
create_restore_file(node_another_bucket_path, bucket="root2", path="data")
node_another_bucket_path.query("SYSTEM RESTART DISK s3")
node_another_bucket_path.restart_clickhouse()
create_table(
node_another_bucket_path, "test", attach=True, db_atomic=db_atomic, uuid=uuid
)
@ -357,7 +356,7 @@ def test_restore_different_revisions(cluster, db_atomic):
# Restore to revision 1 (2 parts).
create_restore_file(node_another_bucket, revision=revision1, bucket="root")
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid
)
@ -378,8 +377,7 @@ def test_restore_different_revisions(cluster, db_atomic):
# Restore to revision 2 (4 parts).
node_another_bucket.query("DETACH TABLE s3.test")
create_restore_file(node_another_bucket, revision=revision2, bucket="root")
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
node_another_bucket.restart_clickhouse()
assert node_another_bucket.query(
"SELECT count(*) FROM s3.test FORMAT Values"
@ -397,8 +395,7 @@ def test_restore_different_revisions(cluster, db_atomic):
# Restore to revision 3 (4 parts + 1 merged).
node_another_bucket.query("DETACH TABLE s3.test")
create_restore_file(node_another_bucket, revision=revision3, bucket="root")
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
node_another_bucket.restart_clickhouse()
assert node_another_bucket.query(
"SELECT count(*) FROM s3.test FORMAT Values"
@ -444,7 +441,7 @@ def test_restore_mutations(cluster, db_atomic):
create_restore_file(
node_another_bucket, revision=revision_before_mutation, bucket="root"
)
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid
)
@ -464,8 +461,7 @@ def test_restore_mutations(cluster, db_atomic):
create_restore_file(
node_another_bucket, revision=revision_after_mutation, bucket="root"
)
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
node_another_bucket.restart_clickhouse()
assert node_another_bucket.query(
"SELECT count(*) FROM s3.test FORMAT Values"
@ -485,8 +481,7 @@ def test_restore_mutations(cluster, db_atomic):
node_another_bucket.query("DETACH TABLE s3.test")
revision = (revision_before_mutation + revision_after_mutation) // 2
create_restore_file(node_another_bucket, revision=revision, bucket="root")
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
node_another_bucket.restart_clickhouse()
# Wait for unfinished mutation completion.
time.sleep(3)
@ -556,7 +551,7 @@ def test_migrate_to_restorable_schema(cluster):
create_restore_file(
node_another_bucket, revision=revision, bucket="root", path="another_data"
)
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid
)
@ -615,7 +610,7 @@ def test_restore_to_detached(cluster, replicated, db_atomic):
path="data",
detached=True,
)
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket,
"test",
@ -681,7 +676,7 @@ def test_restore_without_detached(cluster, replicated, db_atomic):
path="data",
detached=True,
)
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket,
"test",

View File

@ -249,7 +249,7 @@ def test_restore_another_bucket_path(cluster, db_atomic, zero_copy):
node_another_bucket = cluster.instances["node_another_bucket"]
create_restore_file(node_another_bucket, bucket="root")
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.restart_clickhouse()
create_table(
node_another_bucket, "test", schema, attach=True, db_atomic=db_atomic, uuid=uuid
)