Merge pull request #23429 from Jokser/disk-s3-restart-possibility

This commit is contained in:
Vladimir 2021-05-01 15:28:14 +03:00 committed by GitHub
commit 3b7c68196b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1135 additions and 417 deletions

View File

@ -133,6 +133,7 @@ enum class AccessType
M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_EMBEDDED_DICTIONARIES, "RELOAD EMBEDDED DICTIONARIES", GLOBAL, SYSTEM_RELOAD) /* implicitly enabled by the grant SYSTEM_RELOAD_DICTIONARY ON *.* */\
M(SYSTEM_RELOAD, "", GROUP, SYSTEM) \
M(SYSTEM_RESTART_DISK, "SYSTEM RESTART DISK", GLOBAL, SYSTEM) \
M(SYSTEM_MERGES, "SYSTEM STOP MERGES, SYSTEM START MERGES, STOP_MERGES, START MERGES", TABLE, SYSTEM) \
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \

View File

@ -1,20 +1,22 @@
#include "DiskCacheWrapper.h"
#include <IO/copyData.h>
#include <IO/ReadBufferFromFileDecorator.h>
#include <IO/WriteBufferFromFileDecorator.h>
#include <Common/quoteString.h>
#include <condition_variable>
namespace DB
{
/**
* Write buffer with possibility to set and invoke callback when buffer is finalized.
* Write buffer with possibility to set and invoke callback after 'finalize' call.
*/
class CompletionAwareWriteBuffer : public WriteBufferFromFileBase
class CompletionAwareWriteBuffer : public WriteBufferFromFileDecorator
{
public:
CompletionAwareWriteBuffer(std::unique_ptr<WriteBufferFromFileBase> impl_, std::function<void()> completion_callback_, size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0), impl(std::move(impl_)), completion_callback(completion_callback_) { }
CompletionAwareWriteBuffer(std::unique_ptr<WriteBufferFromFileBase> impl_, std::function<void()> completion_callback_)
: WriteBufferFromFileDecorator(std::move(impl_)), completion_callback(completion_callback_) { }
~CompletionAwareWriteBuffer() override
virtual ~CompletionAwareWriteBuffer() override
{
try
{
@ -31,31 +33,13 @@ public:
if (finalized)
return;
next();
impl->finalize();
finalized = true;
WriteBufferFromFileDecorator::finalize();
completion_callback();
}
void sync() override { impl->sync(); }
std::string getFileName() const override { return impl->getFileName(); }
private:
void nextImpl() override
{
impl->swap(*this);
impl->next();
impl->swap(*this);
}
/// Actual write buffer.
std::unique_ptr<WriteBufferFromFileBase> impl;
/// Callback is invoked when finalize is completed.
const std::function<void()> completion_callback;
bool finalized = false;
};
enum FileDownloadStatus
@ -200,8 +184,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();
},
buf_size);
});
}
void DiskCacheWrapper::clearDirectory(const String & path)
@ -316,22 +299,6 @@ void DiskCacheWrapper::createDirectories(const String & path)
DiskDecorator::createDirectories(path);
}
/// TODO: Current reservation mechanism leaks IDisk abstraction details.
/// This hack is needed to return proper disk pointer (wrapper instead of implementation) from reservation object.
class ReservationDelegate : public IReservation
{
public:
ReservationDelegate(ReservationPtr delegate_, DiskPtr wrapper_) : delegate(std::move(delegate_)), wrapper(wrapper_) { }
UInt64 getSize() const override { return delegate->getSize(); }
DiskPtr getDisk(size_t) const override { return wrapper; }
Disks getDisks() const override { return {wrapper}; }
void update(UInt64 new_size) override { delegate->update(new_size); }
private:
ReservationPtr delegate;
DiskPtr wrapper;
};
ReservationPtr DiskCacheWrapper::reserve(UInt64 bytes)
{
auto ptr = DiskDecorator::reserve(bytes);

View File

@ -196,4 +196,19 @@ void DiskDecorator::onFreeze(const String & path)
delegate->onFreeze(path);
}
void DiskDecorator::shutdown()
{
delegate->shutdown();
}
void DiskDecorator::startup()
{
delegate->startup();
}
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context)
{
delegate->applyNewSettings(config, context);
}
}

View File

@ -65,12 +65,33 @@ public:
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType::Type getType() const override { return delegate->getType(); }
Executor & getExecutor() override;
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override;
protected:
Executor & getExecutor() override;
DiskPtr delegate;
};
/// TODO: Current reservation mechanism leaks IDisk abstraction details.
/// This hack is needed to return proper disk pointer (wrapper instead of implementation) from reservation object.
class ReservationDelegate : public IReservation
{
public:
ReservationDelegate(ReservationPtr delegate_, DiskPtr wrapper_) : delegate(std::move(delegate_)), wrapper(wrapper_) { }
UInt64 getSize() const override { return delegate->getSize(); }
DiskPtr getDisk(size_t) const override { return wrapper; }
Disks getDisks() const override { return {wrapper}; }
void update(UInt64 new_size) override { delegate->update(new_size); }
private:
ReservationPtr delegate;
DiskPtr wrapper;
};
}

View File

@ -0,0 +1,311 @@
#include "DiskRestartProxy.h"
#include <IO/ReadBufferFromFileDecorator.h>
#include <IO/WriteBufferFromFileDecorator.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DEADLOCK_AVOIDED;
}
using Millis = std::chrono::milliseconds;
using Seconds = std::chrono::seconds;
/// Holds restart read lock till buffer destruction.
class RestartAwareReadBuffer : public ReadBufferFromFileDecorator
{
public:
RestartAwareReadBuffer(const DiskRestartProxy & disk, std::unique_ptr<ReadBufferFromFileBase> impl_)
: ReadBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
private:
ReadLock lock;
};
/// Holds restart read lock till buffer finalize.
class RestartAwareWriteBuffer : public WriteBufferFromFileDecorator
{
public:
RestartAwareWriteBuffer(const DiskRestartProxy & disk, std::unique_ptr<WriteBuffer> impl_)
: WriteBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
virtual ~RestartAwareWriteBuffer() override
{
try
{
RestartAwareWriteBuffer::finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void finalize() override
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
lock.unlock();
}
private:
ReadLock lock;
};
DiskRestartProxy::DiskRestartProxy(DiskPtr & delegate_)
: DiskDecorator(delegate_) { }
ReservationPtr DiskRestartProxy::reserve(UInt64 bytes)
{
ReadLock lock (mutex);
auto ptr = DiskDecorator::reserve(bytes);
if (ptr)
{
auto disk_ptr = std::static_pointer_cast<DiskRestartProxy>(shared_from_this());
return std::make_unique<ReservationDelegate>(std::move(ptr), disk_ptr);
}
return ptr;
}
const String & DiskRestartProxy::getPath() const
{
ReadLock lock (mutex);
return DiskDecorator::getPath();
}
UInt64 DiskRestartProxy::getTotalSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getTotalSpace();
}
UInt64 DiskRestartProxy::getAvailableSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getAvailableSpace();
}
UInt64 DiskRestartProxy::getUnreservedSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getUnreservedSpace();
}
UInt64 DiskRestartProxy::getKeepingFreeSpace() const
{
ReadLock lock (mutex);
return DiskDecorator::getKeepingFreeSpace();
}
bool DiskRestartProxy::exists(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::exists(path);
}
bool DiskRestartProxy::isFile(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::isFile(path);
}
bool DiskRestartProxy::isDirectory(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::isDirectory(path);
}
size_t DiskRestartProxy::getFileSize(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getFileSize(path);
}
void DiskRestartProxy::createDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createDirectory(path);
}
void DiskRestartProxy::createDirectories(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createDirectories(path);
}
void DiskRestartProxy::clearDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::clearDirectory(path);
}
void DiskRestartProxy::moveDirectory(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path)
{
ReadLock lock (mutex);
return DiskDecorator::iterateDirectory(path);
}
void DiskRestartProxy::createFile(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::createFile(path);
}
void DiskRestartProxy::moveFile(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::moveFile(from_path, to_path);
}
void DiskRestartProxy::replaceFile(const String & from_path, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::replaceFile(from_path, to_path);
}
void DiskRestartProxy::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
ReadLock lock (mutex);
DiskDecorator::copy(from_path, to_disk, to_path);
}
void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file_names)
{
ReadLock lock (mutex);
DiskDecorator::listFiles(path, file_names);
}
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache)
const
{
ReadLock lock (mutex);
auto impl = DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
return std::make_unique<RestartAwareReadBuffer>(*this, std::move(impl));
}
std::unique_ptr<WriteBufferFromFileBase> DiskRestartProxy::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
ReadLock lock (mutex);
auto impl = DiskDecorator::writeFile(path, buf_size, mode);
return std::make_unique<RestartAwareWriteBuffer>(*this, std::move(impl));
}
void DiskRestartProxy::removeFile(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeFile(path);
}
void DiskRestartProxy::removeFileIfExists(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeFileIfExists(path);
}
void DiskRestartProxy::removeDirectory(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeDirectory(path);
}
void DiskRestartProxy::removeRecursive(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::removeRecursive(path);
}
void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_s3)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedRecursive(path, keep_s3);
}
void DiskRestartProxy::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
ReadLock lock (mutex);
DiskDecorator::setLastModified(path, timestamp);
}
Poco::Timestamp DiskRestartProxy::getLastModified(const String & path)
{
ReadLock lock (mutex);
return DiskDecorator::getLastModified(path);
}
void DiskRestartProxy::setReadOnly(const String & path)
{
ReadLock lock (mutex);
DiskDecorator::setReadOnly(path);
}
void DiskRestartProxy::createHardLink(const String & src_path, const String & dst_path)
{
ReadLock lock (mutex);
DiskDecorator::createHardLink(src_path, dst_path);
}
void DiskRestartProxy::truncateFile(const String & path, size_t size)
{
ReadLock lock (mutex);
DiskDecorator::truncateFile(path, size);
}
String DiskRestartProxy::getUniqueId(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getUniqueId(path);
}
bool DiskRestartProxy::checkUniqueId(const String & id) const
{
ReadLock lock (mutex);
return DiskDecorator::checkUniqueId(id);
}
void DiskRestartProxy::restart()
{
/// Speed up processing unhealthy requests.
DiskDecorator::shutdown();
WriteLock lock (mutex, std::defer_lock);
LOG_INFO(log, "Acquiring lock to restart disk {}", DiskDecorator::getName());
auto start_time = std::chrono::steady_clock::now();
auto lock_timeout = Seconds(120);
do
{
/// Use a small timeout to not block read operations for a long time.
if (lock.try_lock_for(Millis(10)))
break;
} while (std::chrono::steady_clock::now() - start_time < lock_timeout);
if (!lock.owns_lock())
throw Exception("Failed to acquire restart lock within timeout. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName());
DiskDecorator::startup();
LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName());
}
}

View File

@ -0,0 +1,80 @@
#pragma once
#include "DiskDecorator.h"
#include <common/logger_useful.h>
#include <shared_mutex>
namespace DB
{
using ReadLock = std::shared_lock<std::shared_timed_mutex>;
using WriteLock = std::unique_lock<std::shared_timed_mutex>;
class RestartAwareReadBuffer;
class RestartAwareWriteBuffer;
/**
* Gives possibility to change underlying disk settings at runtime calling 'restart' method.
* All disk methods are protected by read-lock. Read/Write buffers produced by disk holds read-lock till buffer is finalized/destructed.
* When 'restart' method is called write-lock is acquired to make sure that no operations are running on that disk.
*/
class DiskRestartProxy : public DiskDecorator
{
public:
explicit DiskRestartProxy(DiskPtr & delegate_);
ReservationPtr reserve(UInt64 bytes) override;
const String & getPath() const override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const DiskPtr & to_disk, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override;
void restart();
private:
friend class RestartAwareReadBuffer;
friend class RestartAwareWriteBuffer;
/// Mutex to protect RW access.
mutable std::shared_timed_mutex mutex;
Poco::Logger * log = &Poco::Logger::get("DiskRestartProxy");
};
}

View File

@ -55,11 +55,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
std::shared_ptr<DiskSelector> result = std::make_shared<DiskSelector>(*this);
constexpr auto default_disk_name = "default";
std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->getDisksMap())
{
old_disks_minus_new_disks.insert(disk_name);
}
DisksMap old_disks_minus_new_disks (result->getDisksMap());
for (const auto & disk_name : keys)
{
@ -73,10 +69,11 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
}
else
{
old_disks_minus_new_disks.erase(disk_name);
auto disk = old_disks_minus_new_disks[disk_name];
/// TODO: Ideally ClickHouse shall complain if disk has changed, but
/// implementing that may appear as not trivial task.
disk->applyNewSettings(config, context);
old_disks_minus_new_disks.erase(disk_name);
}
}
@ -91,7 +88,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
writeString("Disks ", warning);
int index = 0;
for (const String & name : old_disks_minus_new_disks)
for (const auto & [name, _] : old_disks_minus_new_disks)
{
if (index++ > 0)
writeString(", ", warning);

View File

@ -1,5 +1,6 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Core/Defines.h>
#include <common/types.h>
#include <Common/CurrentMetrics.h>
@ -13,6 +14,7 @@
#include <boost/noncopyable.hpp>
#include <Poco/Path.h>
#include <Poco/Timestamp.h>
#include "Poco/Util/AbstractConfiguration.h"
namespace CurrentMetrics
@ -211,24 +213,33 @@ public:
/// Invoked when Global Context is shutdown.
virtual void shutdown() { }
/// Performs action on disk startup.
virtual void startup() { }
/// Return some uniq string for file, overrode for S3
/// Required for distinguish different copies of the same part on S3
virtual String getUniqueId(const String & path) const { return path; }
/// Check file exists and ClickHouse has an access to it
/// Overrode in DiskS3
/// Required for S3 to ensure that replica has access to data wroten by other node
/// Required for S3 to ensure that replica has access to data written by other node
virtual bool checkUniqueId(const String & id) const { return exists(id); }
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
/// Invoked on partitions freeze query.
virtual void onFreeze(const String &) { }
/// Returns guard, that insures synchronization of directory metadata with storage device.
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextConstPtr) { }
protected:
friend class DiskDecorator;
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
private:
std::unique_ptr<Executor> executor;
};

View File

@ -7,11 +7,13 @@
#include <optional>
#include <utility>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
@ -235,6 +237,28 @@ struct DiskS3::Metadata
}
};
DiskS3::Metadata DiskS3::readOrCreateMetaForWriting(const String & path, WriteMode mode)
{
bool exist = exists(path);
if (exist)
{
auto metadata = readMeta(path);
if (metadata.read_only)
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
if (mode == WriteMode::Rewrite)
removeFile(path); /// Remove for re-write.
else
return metadata;
}
auto metadata = createMeta(path);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
return metadata;
}
DiskS3::Metadata DiskS3::readMeta(const String & path) const
{
return Metadata(s3_root_path, metadata_path, path);
@ -250,7 +274,11 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, UInt64 s3_max_single_read_retries_, size_t buf_size_)
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
DiskS3::Metadata metadata_,
size_t s3_max_single_read_retries_,
size_t buf_size_)
: client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, metadata(std::move(metadata_))
@ -351,7 +379,7 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String & bucket;
DiskS3::Metadata metadata;
UInt64 s3_max_single_read_retries;
size_t s3_max_single_read_retries;
size_t buf_size;
size_t absolute_position = 0;
@ -360,30 +388,22 @@ private:
};
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
class WriteIndirectBufferFromS3 final : public WriteBufferFromFileBase
class WriteIndirectBufferFromS3 final : public WriteBufferFromFileDecorator
{
public:
WriteIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
const String & bucket_,
std::unique_ptr<WriteBufferFromS3> impl_,
DiskS3::Metadata metadata_,
const String & s3_path_,
std::optional<DiskS3::ObjectMetadata> object_metadata_,
size_t min_upload_part_size,
size_t max_single_part_upload_size,
size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, max_single_part_upload_size,std::move(object_metadata_), buf_size_))
String & s3_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, metadata(std::move(metadata_))
, s3_path(s3_path_)
{
}
, s3_path(s3_path_) { }
~WriteIndirectBufferFromS3() override
virtual ~WriteIndirectBufferFromS3() override
{
try
{
finalize();
WriteIndirectBufferFromS3::finalize();
}
catch (...)
{
@ -396,13 +416,10 @@ public:
if (finalized)
return;
next();
impl.finalize();
WriteBufferFromFileDecorator::finalize();
metadata.addObject(s3_path, count());
metadata.save();
finalized = true;
}
void sync() override
@ -414,20 +431,6 @@ public:
std::string getFileName() const override { return metadata.metadata_file_path; }
private:
void nextImpl() override
{
/// Transfer current working buffer to WriteBufferFromS3.
impl.swap(*this);
/// Write actual data to S3.
impl.next();
/// Return back working buffer.
impl.swap(*this);
}
WriteBufferFromS3 impl;
bool finalized = false;
DiskS3::Metadata metadata;
String s3_path;
};
@ -530,7 +533,6 @@ public:
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
pool.scheduleOrThrowOnError(
[promise, task]()
{
@ -553,6 +555,10 @@ public:
return promise->get_future();
}
void setMaxThreads(size_t threads)
{
pool.setMaxThreads(threads);
}
private:
ThreadPool pool;
};
@ -560,32 +566,18 @@ private:
DiskS3::DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
UInt64 s3_max_single_read_retries_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_)
: IDisk(std::make_unique<AsyncExecutor>(thread_pool_size_))
SettingsPtr settings_,
GetDiskSettings settings_getter_)
: IDisk(std::make_unique<AsyncExecutor>(settings_->thread_pool_size))
, name(std::move(name_))
, client(std::move(client_))
, proxy_configuration(std::move(proxy_configuration_))
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
, s3_max_single_read_retries(s3_max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, send_metadata(send_metadata_)
, revision_counter(0)
, list_object_keys_size(list_object_keys_size_)
, current_settings(std::move(settings_))
, settings_getter(settings_getter_)
{
}
@ -649,6 +641,13 @@ void DiskS3::clearDirectory(const String & path)
}
void DiskS3::moveFile(const String & from_path, const String & to_path)
{
auto settings = current_settings.get();
moveFile(from_path, to_path, settings->send_metadata);
}
void DiskS3::moveFile(const String & from_path, const String & to_path, bool send_metadata)
{
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
@ -681,26 +680,26 @@ void DiskS3::replaceFile(const String & from_path, const String & to_path)
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, s3_max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
bool exist = exists(path);
if (exist && readMeta(path).read_only)
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
auto settings = current_settings.get();
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new S3 object.
auto s3_path = getRandomName();
std::optional<ObjectMetadata> object_metadata;
if (send_metadata)
if (settings->send_metadata)
{
auto revision = ++revision_counter;
object_metadata = {
@ -709,31 +708,19 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
s3_path = "r" + revisionToString(revision) + "-file-" + s3_path;
}
if (!exist || mode == WriteMode::Rewrite)
{
/// If metadata file exists - remove and create new.
if (exist)
removeFile(path);
LOG_DEBUG(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), s3_root_path + s3_path);
auto metadata = createMeta(path);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,
bucket,
metadata.s3_root_path + s3_path,
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size);
LOG_DEBUG(log, "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, object_metadata, min_upload_part_size, max_single_part_upload_size, buf_size);
}
else
{
auto metadata = readMeta(path);
LOG_DEBUG(log, "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, object_metadata, min_upload_part_size, max_single_part_upload_size, buf_size);
}
return std::make_unique<WriteIndirectBufferFromS3>(std::move(s3_buffer), std::move(metadata), s3_path);
}
void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
@ -803,6 +790,8 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
{
if (!keys.empty())
{
auto settings = current_settings.get();
for (const auto & chunk : keys)
{
Aws::S3::Model::Delete delkeys;
@ -812,7 +801,7 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client->DeleteObjects(request);
auto outcome = settings->client->DeleteObjects(request);
throwIfError(outcome);
}
}
@ -889,6 +878,13 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
{
auto settings = current_settings.get();
createHardLink(src_path, dst_path, settings->send_metadata);
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path, bool send_metadata)
{
/// We don't need to record hardlinks created to shadow folder.
if (send_metadata && !dst_path.starts_with("shadow/"))
@ -928,28 +924,41 @@ void DiskS3::setReadOnly(const String & path)
void DiskS3::shutdown()
{
auto settings = current_settings.get();
/// This call stops any next retry attempts for ongoing S3 requests.
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
/// This should significantly speed up shutdown process if S3 is unhealthy.
client->DisableRequestProcessing();
settings->client->DisableRequestProcessing();
}
void DiskS3::createFileOperationObject(const String & operation_name, UInt64 revision, const DiskS3::ObjectMetadata & metadata)
{
auto settings = current_settings.get();
const String key = "operations/r" + revisionToString(revision) + "-" + operation_name;
WriteBufferFromS3 buffer(client, bucket, s3_root_path + key, min_upload_part_size, max_single_part_upload_size, metadata);
WriteBufferFromS3 buffer(
settings->client,
bucket,
s3_root_path + key,
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size,
metadata);
buffer.write('0');
buffer.finalize();
}
void DiskS3::startup()
{
if (!send_metadata)
auto settings = current_settings.get();
if (!settings->send_metadata)
return;
LOG_INFO(log, "Starting up disk {}", name);
restore();
if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION)
migrateToRestorableSchema();
@ -986,7 +995,13 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
return version;
ReadBufferFromS3 buffer(client, source_bucket, source_path + SCHEMA_VERSION_OBJECT, s3_max_single_read_retries);
auto settings = current_settings.get();
ReadBufferFromS3 buffer(
settings->client,
source_bucket,
source_path + SCHEMA_VERSION_OBJECT,
settings->s3_max_single_read_retries);
readIntText(version, buffer);
return version;
@ -994,13 +1009,22 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
void DiskS3::saveSchemaVersion(const int & version)
{
WriteBufferFromS3 buffer (client, bucket, s3_root_path + SCHEMA_VERSION_OBJECT, min_upload_part_size, max_single_part_upload_size);
auto settings = current_settings.get();
WriteBufferFromS3 buffer(
settings->client,
bucket,
s3_root_path + SCHEMA_VERSION_OBJECT,
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size);
writeIntText(version, buffer);
buffer.finalize();
}
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
{
auto settings = current_settings.get();
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(bucket + "/" + key);
request.SetBucket(bucket);
@ -1008,7 +1032,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
request.SetMetadata(metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
auto outcome = client->CopyObject(request);
auto outcome = settings->client->CopyObject(request);
throwIfError(outcome);
}
@ -1097,14 +1121,15 @@ void DiskS3::migrateToRestorableSchema()
}
}
bool DiskS3::checkObjectExists(const String & source_bucket, const String & prefix)
bool DiskS3::checkObjectExists(const String & source_bucket, const String & prefix) const
{
auto settings = current_settings.get();
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(source_bucket);
request.SetPrefix(prefix);
request.SetMaxKeys(1);
auto outcome = client->ListObjectsV2(request);
auto outcome = settings->client->ListObjectsV2(request);
throwIfError(outcome);
return !outcome.GetResult().GetContents().empty();
@ -1112,12 +1137,13 @@ bool DiskS3::checkObjectExists(const String & source_bucket, const String & pref
bool DiskS3::checkUniqueId(const String & id) const
{
auto settings = current_settings.get();
/// Check that we have right s3 and have access rights
/// Actually interprets id as s3 object name and checks if it exists
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetPrefix(id);
auto resp = client->ListObjectsV2(request);
auto resp = settings->client->ListObjectsV2(request);
throwIfError(resp);
Aws::Vector<Aws::S3::Model::Object> object_list = resp.GetResult().GetContents();
@ -1127,29 +1153,31 @@ bool DiskS3::checkUniqueId(const String & id) const
return false;
}
Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket, const String & key)
Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket, const String & key) const
{
auto settings = current_settings.get();
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(source_bucket);
request.SetKey(key);
auto outcome = client->HeadObject(request);
auto outcome = settings->client->HeadObject(request);
throwIfError(outcome);
return outcome.GetResultWithOwnership();
}
void DiskS3::listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback)
void DiskS3::listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback) const
{
auto settings = current_settings.get();
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(source_bucket);
request.SetPrefix(source_path);
request.SetMaxKeys(list_object_keys_size);
request.SetMaxKeys(settings->list_object_keys_size);
Aws::S3::Model::ListObjectsV2Outcome outcome;
do
{
outcome = client->ListObjectsV2(request);
outcome = settings->client->ListObjectsV2(request);
throwIfError(outcome);
bool should_continue = callback(outcome.GetResult());
@ -1161,14 +1189,15 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
} while (outcome.GetResult().GetIsTruncated());
}
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key)
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const
{
auto settings = current_settings.get();
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
auto outcome = client->CopyObject(request);
auto outcome = settings->client->CopyObject(request);
throwIfError(outcome);
}
@ -1375,13 +1404,15 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
void DiskS3::restoreFileOperations(const RestoreInformation & restore_information)
{
auto settings = current_settings.get();
LOG_INFO(log, "Starting restore file operations for disk {}", name);
/// Enable recording file operations if we restore to different bucket / path.
send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path;
bool send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path;
std::set<String> renames;
auto restore_file_operations = [this, &restore_information, &renames](auto list_result)
auto restore_file_operations = [this, &restore_information, &renames, &send_metadata](auto list_result)
{
const String rename = "rename";
const String hardlink = "hardlink";
@ -1413,7 +1444,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
auto to_path = object_metadata["to_path"];
if (exists(from_path))
{
moveFile(from_path, to_path);
moveFile(from_path, to_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
if (restore_information.detached && isDirectory(to_path))
@ -1440,7 +1471,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
if (exists(src_path))
{
createDirectories(directoryPath(dst_path));
createHardLink(src_path, dst_path);
createHardLink(src_path, dst_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
}
}
@ -1477,8 +1508,6 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
}
}
send_metadata = true;
LOG_INFO(log, "File operations restored for disk {}", name);
}
@ -1518,4 +1547,34 @@ void DiskS3::onFreeze(const String & path)
revision_file_buf.finalize();
}
void DiskS3::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context)
{
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);
current_settings.set(std::move(new_settings));
if (AsyncExecutor * exec = dynamic_cast<AsyncExecutor*>(&getExecutor()))
exec->setMaxThreads(current_settings.get()->thread_pool_size);
}
DiskS3Settings::DiskS3Settings(
const std::shared_ptr<Aws::S3::S3Client> & client_,
size_t s3_max_single_read_retries_,
size_t s3_min_upload_part_size_,
size_t s3_max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_)
: client(client_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, s3_min_upload_part_size(s3_min_upload_part_size_)
, s3_max_single_part_upload_size(s3_max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, send_metadata(send_metadata_)
, thread_pool_size(thread_pool_size_)
, list_object_keys_size(list_object_keys_size_)
{
}
}

View File

@ -2,9 +2,9 @@
#include <atomic>
#include <common/logger_useful.h>
#include <Common/MultiVersion.h>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
#include "ProxyConfiguration.h"
#include <aws/s3/S3Client.h>
#include <aws/s3/model/HeadObjectResult.h>
@ -17,6 +17,29 @@
namespace DB
{
/// Settings for DiskS3 that can be changed in runtime.
struct DiskS3Settings
{
DiskS3Settings(
const std::shared_ptr<Aws::S3::S3Client> & client_,
size_t s3_max_single_read_retries_,
size_t s3_min_upload_part_size_,
size_t s3_max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_);
std::shared_ptr<Aws::S3::S3Client> client;
size_t s3_max_single_read_retries;
size_t s3_min_upload_part_size;
size_t s3_max_single_part_upload_size;
size_t min_bytes_for_seek;
bool send_metadata;
int thread_pool_size;
int list_object_keys_size;
};
/**
* Storage for persisting data in S3 and metadata on the local disk.
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
@ -27,6 +50,8 @@ class DiskS3 : public IDisk
public:
using ObjectMetadata = std::map<std::string, std::string>;
using Futures = std::vector<std::future<void>>;
using SettingsPtr = std::unique_ptr<DiskS3Settings>;
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextConstPtr)>;
friend class DiskS3Reservation;
@ -36,18 +61,11 @@ public:
DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
UInt64 s3_max_single_read_retries_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_);
SettingsPtr settings_,
GetDiskSettings settings_getter_);
const String & getName() const override { return name; }
@ -82,7 +100,7 @@ public:
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
void replaceFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
@ -109,6 +127,7 @@ public:
void removeSharedRecursive(const String & path, bool keep_s3) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void createHardLink(const String & src_path, const String & dst_path, bool send_metadata);
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
@ -122,6 +141,8 @@ public:
void shutdown() override;
void startup() override;
/// Return some uniq string for file
/// Required for distinguish different copies of the same part on S3
String getUniqueId(const String & path) const override;
@ -130,15 +151,11 @@ public:
/// Required for S3 to ensure that replica has access to data wroten by other node
bool checkUniqueId(const String & id) const override;
/// Actions performed after disk creation.
void startup();
/// Restore S3 metadata files on file system.
void restore();
/// Dumps current revision counter into file 'revision.txt' at given path.
void onFreeze(const String & path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override;
private:
bool tryReserve(UInt64 bytes);
@ -146,6 +163,7 @@ private:
void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys);
void removeAws(const AwsS3KeyKeeper & keys);
Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode);
Metadata readMeta(const String & path) const;
Metadata createMeta(const String & path) const;
@ -153,7 +171,7 @@ private:
/// Converts revision to binary string with leading zeroes (64 bit).
static String revisionToString(UInt64 revision);
bool checkObjectExists(const String & source_bucket, const String & prefix);
bool checkObjectExists(const String & source_bucket, const String & prefix) const;
void findLastRevision();
int readSchemaVersion(const String & source_bucket, const String & source_path);
@ -163,10 +181,12 @@ private:
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
void migrateToRestorableSchema();
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key);
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key) const;
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback) const;
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const;
/// Restore S3 metadata files on file system.
void restore();
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(const RestoreInformation & restore_information);
void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector<String> keys);
@ -181,29 +201,24 @@ private:
static String pathToDetached(const String & source_path);
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration;
const String bucket;
const String s3_root_path;
String metadata_path;
UInt64 s3_max_single_read_retries;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;
bool send_metadata;
const String metadata_path;
MultiVersion<DiskS3Settings> current_settings;
/// Gets disk settings from context.
GetDiskSettings settings_getter;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter;
std::atomic<UInt64> revision_counter = 0;
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0;
/// File at path {metadata_path}/restore contains metadata restore information
inline static const String RESTORE_FILE_NAME = "restore";
/// The number of keys listed in one request (1000 is max value)
int list_object_keys_size;
/// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};

View File

@ -19,6 +19,7 @@
#include "ProxyConfiguration.h"
#include "ProxyListConfiguration.h"
#include "ProxyResolverConfiguration.h"
#include "Disks/DiskRestartProxy.h"
namespace DB
@ -31,84 +32,132 @@ namespace ErrorCodes
namespace
{
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk) { disk.removeFile("test_acl"); }
void checkRemoveAccess(IDisk & disk) { disk.removeFile("test_acl"); }
std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_resolver_config)
{
auto endpoint = Poco::URI(proxy_resolver_config.getString(prefix + ".endpoint"));
auto proxy_scheme = proxy_resolver_config.getString(prefix + ".proxy_scheme");
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_resolver_config)
{
auto endpoint = Poco::URI(proxy_resolver_config.getString(prefix + ".endpoint"));
auto proxy_scheme = proxy_resolver_config.getString(prefix + ".proxy_scheme");
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}",
endpoint.toString(), proxy_scheme, proxy_port);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}",
endpoint.toString(), proxy_scheme, proxy_port);
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
}
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
}
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_config)
{
std::vector<String> keys;
proxy_config.keys(prefix, keys);
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_config)
{
std::vector<String> keys;
proxy_config.keys(prefix, keys);
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(proxy_config.getString(prefix + "." + key));
if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
if (proxy_uri.getHost().empty())
throw Exception("Empty host in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
proxies.push_back(proxy_uri);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy: {}", proxy_uri.toString());
}
if (!proxies.empty())
return std::make_shared<S3::ProxyListConfiguration>(proxies);
return nullptr;
}
std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & prefix, const Poco::Util::AbstractConfiguration & config)
{
if (!config.has(prefix + ".proxy"))
return nullptr;
std::vector<String> config_keys;
config.keys(prefix + ".proxy", config_keys);
if (auto resolver_configs = std::count(config_keys.begin(), config_keys.end(), "resolver"))
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
if (resolver_configs > 1)
throw Exception("Multiple proxy resolver configurations aren't allowed", ErrorCodes::BAD_ARGUMENTS);
Poco::URI proxy_uri(proxy_config.getString(prefix + "." + key));
return getProxyResolverConfiguration(prefix + ".proxy.resolver", config);
if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
if (proxy_uri.getHost().empty())
throw Exception("Empty host in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
proxies.push_back(proxy_uri);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy: {}", proxy_uri.toString());
}
return getProxyListConfiguration(prefix + ".proxy", config);
if (!proxies.empty())
return std::make_shared<S3::ProxyListConfiguration>(proxies);
return nullptr;
}
std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & prefix, const Poco::Util::AbstractConfiguration & config)
{
if (!config.has(prefix + ".proxy"))
return nullptr;
std::vector<String> config_keys;
config.keys(prefix + ".proxy", config_keys);
if (auto resolver_configs = std::count(config_keys.begin(), config_keys.end(), "resolver"))
{
if (resolver_configs > 1)
throw Exception("Multiple proxy resolver configurations aren't allowed", ErrorCodes::BAD_ARGUMENTS);
return getProxyResolverConfiguration(prefix + ".proxy.resolver", config);
}
return getProxyListConfiguration(prefix + ".proxy", config);
}
std::shared_ptr<Aws::S3::S3Client>
getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextConstPtr context)
{
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
context->getRemoteHostFilter(), context->getGlobalContext()->getSettingsRef().s3_max_redirects);
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;
auto proxy_config = getProxyConfiguration(config_prefix, config);
if (proxy_config)
client_configuration.perRequestConfiguration
= [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
return S3::ClientFactory::instance().create(
client_configuration,
uri.is_virtual_hosted_style,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""),
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),
{},
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)));
}
std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextConstPtr context)
{
return std::make_unique<DiskS3Settings>(
getClient(config, config_prefix, context),
config.getUInt64(config_prefix + ".s3_max_single_read_retries", context->getSettingsRef().s3_max_single_read_retries),
config.getUInt64(config_prefix + ".s3_min_upload_part_size", context->getSettingsRef().s3_min_upload_part_size),
config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".list_object_keys_size", 1000));
}
}
@ -118,56 +167,20 @@ void registerDiskS3(DiskFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextConstPtr context) -> DiskPtr {
Poco::File disk{context->getPath() + "disks/" + name};
disk.createDirectories();
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
context->getRemoteHostFilter(),
context->getGlobalContext()->getSettingsRef().s3_max_redirects);
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;
auto proxy_config = getProxyConfiguration(config_prefix, config);
if (proxy_config)
client_configuration.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(
config.getUInt(config_prefix + ".retry_attempts", 10));
auto client = S3::ClientFactory::instance().create(
client_configuration,
uri.is_virtual_hosted_style,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""),
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),
{},
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false))
);
String metadata_path = config.getString(config_prefix + ".metadata_path", context->getPath() + "disks/" + name + "/");
Poco::File (metadata_path).createDirectories();
auto s3disk = std::make_shared<DiskS3>(
std::shared_ptr<IDisk> s3disk = std::make_shared<DiskS3>(
name,
client,
proxy_config,
uri.bucket,
uri.key,
metadata_path,
context->getSettingsRef().s3_max_single_read_retries,
context->getSettingsRef().s3_min_upload_part_size,
context->getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".list_object_keys_size", 1000));
getSettings(config, config_prefix, context),
getSettings);
/// This code is used only to check access to the corresponding disk.
if (!config.getBool(config_prefix + ".skip_access_check", false))
@ -177,7 +190,6 @@ void registerDiskS3(DiskFactory & factory)
checkRemoveAccess(*s3disk);
}
s3disk->restore();
s3disk->startup();
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
@ -197,10 +209,10 @@ void registerDiskS3(DiskFactory & factory)
|| path.ends_with("txt") || path.ends_with("dat");
};
return std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
}
return s3disk;
return std::make_shared<DiskRestartProxy>(s3disk);
};
factory.registerDiskType("s3", creator);
}

View File

@ -0,0 +1,46 @@
#include <IO/ReadBufferFromFileDecorator.h>
namespace DB
{
ReadBufferFromFileDecorator::ReadBufferFromFileDecorator(std::unique_ptr<ReadBufferFromFileBase> impl_)
: impl(std::move(impl_))
{
swap(*impl);
}
std::string ReadBufferFromFileDecorator::getFileName() const
{
return impl->getFileName();
}
off_t ReadBufferFromFileDecorator::getPosition()
{
swap(*impl);
auto position = impl->getPosition();
swap(*impl);
return position;
}
off_t ReadBufferFromFileDecorator::seek(off_t off, int whence)
{
swap(*impl);
auto result = impl->seek(off, whence);
swap(*impl);
return result;
}
bool ReadBufferFromFileDecorator::nextImpl()
{
swap(*impl);
auto result = impl->next();
swap(*impl);
return result;
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
namespace DB
{
/// Delegates all reads to underlying buffer. Doesn't have own memory.
class ReadBufferFromFileDecorator : public ReadBufferFromFileBase
{
public:
explicit ReadBufferFromFileDecorator(std::unique_ptr<ReadBufferFromFileBase> impl_);
std::string getFileName() const override;
off_t getPosition() override;
off_t seek(off_t off, int whence) override;
bool nextImpl() override;
protected:
std::unique_ptr<ReadBufferFromFileBase> impl;
};
}

View File

@ -4,28 +4,10 @@
namespace DB
{
SeekAvoidingReadBuffer::SeekAvoidingReadBuffer(std::unique_ptr<ReadBufferFromFileBase> nested_, UInt64 min_bytes_for_seek_)
: nested(std::move(nested_))
SeekAvoidingReadBuffer::SeekAvoidingReadBuffer(std::unique_ptr<ReadBufferFromFileBase> impl_, UInt64 min_bytes_for_seek_)
: ReadBufferFromFileDecorator(std::move(impl_))
, min_bytes_for_seek(min_bytes_for_seek_)
{
swap(*nested);
}
std::string SeekAvoidingReadBuffer::getFileName() const
{
return nested->getFileName();
}
off_t SeekAvoidingReadBuffer::getPosition()
{
swap(*nested);
off_t position = nested->getPosition();
swap(*nested);
return position;
}
{ }
off_t SeekAvoidingReadBuffer::seek(off_t off, int whence)
{
@ -39,28 +21,13 @@ off_t SeekAvoidingReadBuffer::seek(off_t off, int whence)
if (whence == SEEK_SET && off >= position && off < position + static_cast<off_t>(min_bytes_for_seek))
{
swap(*nested);
nested->ignore(off - position);
swap(*nested);
position = off;
}
else
{
swap(*nested);
position = nested->seek(off, whence);
swap(*nested);
swap(*impl);
impl->ignore(off - position);
swap(*impl);
return off;
}
return position;
}
bool SeekAvoidingReadBuffer::nextImpl()
{
swap(*nested);
bool nested_result = nested->next();
swap(*nested);
return nested_result;
return ReadBufferFromFileDecorator::seek(off, whence);
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFileDecorator.h>
namespace DB
@ -10,22 +10,15 @@ namespace DB
/// It is useful in network and spinning disk storage media when seek is relatively expensive
/// operation.
/// See also: `merge_tree_min_rows_for_seek`.
class SeekAvoidingReadBuffer : public ReadBufferFromFileBase
class SeekAvoidingReadBuffer : public ReadBufferFromFileDecorator
{
std::unique_ptr<ReadBufferFromFileBase> nested;
UInt64 min_bytes_for_seek; /// Minimum positive seek offset which shall be executed using seek operation.
public:
SeekAvoidingReadBuffer(std::unique_ptr<ReadBufferFromFileBase> nested_, UInt64 min_bytes_for_seek_);
std::string getFileName() const override;
off_t getPosition() override;
SeekAvoidingReadBuffer(std::unique_ptr<ReadBufferFromFileBase> impl_, UInt64 min_bytes_for_seek_);
off_t seek(off_t off, int whence) override;
bool nextImpl() override;
private:
UInt64 min_bytes_for_seek; /// Minimum positive seek offset which shall be executed using seek operation.
};
}

View File

@ -0,0 +1,56 @@
#include "WriteBufferFromFileDecorator.h"
#include <IO/WriteBuffer.h>
namespace DB
{
WriteBufferFromFileDecorator::WriteBufferFromFileDecorator(std::unique_ptr<WriteBuffer> impl_)
: WriteBufferFromFileBase(0, nullptr, 0), impl(std::move(impl_))
{
swap(*impl);
}
void WriteBufferFromFileDecorator::finalize()
{
if (finalized)
return;
next();
impl->finalize();
finalized = true;
}
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
try
{
WriteBufferFromFileDecorator::finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferFromFileDecorator::sync()
{
impl->sync();
}
std::string WriteBufferFromFileDecorator::getFileName() const
{
if (WriteBufferFromFileBase * buffer = dynamic_cast<WriteBufferFromFileBase*>(impl.get()))
return buffer->getFileName();
return std::string();
}
void WriteBufferFromFileDecorator::nextImpl()
{
swap(*impl);
impl->next();
swap(*impl);
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <IO/WriteBufferFromFileBase.h>
namespace DB
{
/// Delegates all writes to underlying buffer. Doesn't have own memory.
class WriteBufferFromFileDecorator : public WriteBufferFromFileBase
{
public:
explicit WriteBufferFromFileDecorator(std::unique_ptr<WriteBuffer> impl_);
~WriteBufferFromFileDecorator() override;
void finalize() override;
void sync() override;
std::string getFileName() const override;
protected:
std::unique_ptr<WriteBuffer> impl;
bool finalized = false;
private:
void nextImpl() override;
};
}

View File

@ -29,6 +29,7 @@
#include <Access/ContextAccess.h>
#include <Access/AllowedClientHosts.h>
#include <Databases/IDatabase.h>
#include <Disks/DiskRestartProxy.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageFactory.h>
@ -394,6 +395,9 @@ BlockIO InterpreterSystemQuery::execute()
throw Exception("There is no " + query.database + "." + query.table + " replicated table",
ErrorCodes::BAD_ARGUMENTS);
break;
case Type::RESTART_DISK:
restartDisk(query.disk);
break;
case Type::FLUSH_LOGS:
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
@ -637,6 +641,18 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}
void InterpreterSystemQuery::restartDisk(String & name)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_DISK);
auto disk = getContext()->getDisk(name);
if (DiskRestartProxy * restart_proxy = dynamic_cast<DiskRestartProxy*>(disk.get()))
restart_proxy->restart();
else
throw Exception("Disk " + name + " doesn't have possibility to restart", ErrorCodes::BAD_ARGUMENTS);
}
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
@ -779,6 +795,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_FLUSH_LOGS);
break;
}
case Type::RESTART_DISK:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_DISK);
break;
}
case Type::STOP_LISTEN_QUERIES: break;
case Type::START_LISTEN_QUERIES: break;
case Type::UNKNOWN: break;

View File

@ -52,6 +52,7 @@ private:
void dropReplica(ASTSystemQuery & query);
bool dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table);
void flushDistributed(ASTSystemQuery & query);
void restartDisk(String & name);
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);

View File

@ -94,6 +94,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "START DISTRIBUTED SENDS";
case Type::FLUSH_LOGS:
return "FLUSH LOGS";
case Type::RESTART_DISK:
return "RESTART DISK";
default:
throw Exception("Unknown SYSTEM query command", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -41,6 +41,7 @@ public:
RELOAD_EMBEDDED_DICTIONARIES,
RELOAD_CONFIG,
RELOAD_SYMBOLS,
RESTART_DISK,
STOP_MERGES,
START_MERGES,
STOP_TTL_MERGES,
@ -73,6 +74,7 @@ public:
bool is_drop_whole_replica{};
String storage_policy;
String volume;
String disk;
UInt64 seconds{};
String getID(char) const override { return "SYSTEM query"; }

View File

@ -134,6 +134,17 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
return false;
break;
case Type::RESTART_DISK:
{
ASTPtr ast;
if (ParserIdentifier{}.parse(pos, ast, expected))
res->disk = ast->as<ASTIdentifier &>().name();
else
return false;
break;
}
case Type::STOP_DISTRIBUTED_SENDS:
case Type::START_DISTRIBUTED_SENDS:
case Type::FLUSH_DISTRIBUTED:

View File

@ -6,6 +6,7 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3>
<hdd>
<type>local</type>

View File

@ -1,5 +0,0 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -2,6 +2,8 @@ import logging
import random
import string
import time
import threading
import os
import pytest
from helpers.cluster import ClickHouseCluster
@ -10,13 +12,47 @@ logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/storage_conf.xml')
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=["configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], with_minio=True)
"configs/config.d/log_conf.xml"], with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -365,3 +401,63 @@ def test_freeze_unfreeze(cluster):
# Data should be removed from S3.
assert len(
list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD
def test_s3_disk_apply_new_settings(cluster):
create_table(cluster, "s3_test")
node = cluster.instances["node"]
def get_s3_requests():
node.query("SYSTEM FLUSH LOGS")
return int(node.query("SELECT value FROM system.events WHERE event='S3WriteRequestsCount'"))
s3_requests_before = get_s3_requests()
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
s3_requests_to_write_partition = get_s3_requests() - s3_requests_before
# Force multi-part upload mode.
replace_config("<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>",
"<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>")
node.query("SYSTEM RELOAD CONFIG")
s3_requests_before = get_s3_requests()
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
# There should be 3 times more S3 requests because multi-part upload mode uses 3 requests to upload object.
assert get_s3_requests() - s3_requests_before == s3_requests_to_write_partition * 3
def test_s3_disk_restart_during_load(cluster):
create_table(cluster, "s3_test")
node = cluster.instances["node"]
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 1024 * 1024)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-05', 1024 * 1024, -1)))
def read():
for ii in range(0, 20):
logging.info("Executing %d query", ii)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
logging.info("Query %d executed", ii)
time.sleep(0.2)
def restart_disk():
for iii in range(0, 5):
logging.info("Restarting disk, attempt %d", iii)
node.query("SYSTEM RESTART DISK s3")
logging.info("Disk restarted, attempt %d", iii)
time.sleep(0.5)
threads = []
for i in range(0, 4):
threads.append(SafeThread(target=read))
threads.append(SafeThread(target=restart_disk))
for thread in threads:
thread.start()
for thread in threads:
thread.join()

View File

@ -66,11 +66,11 @@ def generate_values(date_str, count, sign=1):
return ",".join(["('{}',{},'{}',{})".format(x, y, z, 0) for x, y, z in data])
def create_table(node, table_name, replicated=False):
def create_table(node, table_name, attach=False, replicated=False):
node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = Ordinary")
create_table_statement = """
CREATE TABLE s3.{table_name} {on_cluster} (
{create} TABLE s3.{table_name} {on_cluster} (
dt Date,
id Int64,
data String,
@ -83,7 +83,8 @@ def create_table(node, table_name, replicated=False):
storage_policy='s3',
old_parts_lifetime=600,
index_granularity=512
""".format(table_name=table_name,
""".format(create="ATTACH" if attach else "CREATE",
table_name=table_name,
on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "",
engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()")
@ -107,6 +108,7 @@ def drop_shadow_information(node):
def create_restore_file(node, revision=None, bucket=None, path=None, detached=None):
node.exec_in_container(['bash', '-c', 'mkdir -p /var/lib/clickhouse/disks/s3/'], user='root')
node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root')
add_restore_option = 'echo -en "{}={}\n" >> /var/lib/clickhouse/disks/s3/restore'
@ -150,17 +152,18 @@ def drop_table(cluster):
def test_full_restore(cluster, replicated):
node = cluster.instances["node"]
create_table(node, "test", replicated)
create_table(node, "test", attach=False, replicated=replicated)
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
node.stop_clickhouse()
node.query("DETACH TABLE s3.test")
drop_s3_metadata(node)
create_restore_file(node)
node.start_clickhouse()
node.query("SYSTEM RESTART DISK s3")
node.query("ATTACH TABLE s3.test")
assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -184,22 +187,18 @@ def test_restore_another_bucket_path(cluster):
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
node_another_bucket.stop_clickhouse()
create_restore_file(node_another_bucket, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
create_table(node_another_bucket, "test", attach=True)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
node_another_bucket_path = cluster.instances["node_another_bucket_path"]
create_table(node_another_bucket_path, "test")
node_another_bucket_path.stop_clickhouse()
create_restore_file(node_another_bucket_path, bucket="root2", path="data")
node_another_bucket_path.start_clickhouse()
node_another_bucket_path.query("SYSTEM RESTART DISK s3")
create_table(node_another_bucket_path, "test", attach=True)
assert node_another_bucket_path.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket_path.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -234,36 +233,30 @@ def test_restore_different_revisions(cluster):
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
# Restore to revision 1 (2 parts).
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision1, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
create_table(node_another_bucket, "test", attach=True)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '2\n'
# Restore to revision 2 (4 parts).
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
node_another_bucket.query("DETACH TABLE s3.test")
create_restore_file(node_another_bucket, revision=revision2, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '4\n'
# Restore to revision 3 (4 parts + 1 merged).
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
node_another_bucket.query("DETACH TABLE s3.test")
create_restore_file(node_another_bucket, revision=revision3, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -288,25 +281,20 @@ def test_restore_mutations(cluster):
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
# Restore to revision before mutation.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision_before_mutation, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
create_table(node_another_bucket, "test", attach=True)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(0)
# Restore to revision after mutation.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
node_another_bucket.query("DETACH TABLE s3.test")
create_restore_file(node_another_bucket, revision=revision_after_mutation, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -315,12 +303,11 @@ def test_restore_mutations(cluster):
# Restore to revision in the middle of mutation.
# Unfinished mutation should be completed after table startup.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
node_another_bucket.query("DETACH TABLE s3.test")
revision = (revision_before_mutation + revision_after_mutation) // 2
create_restore_file(node_another_bucket, revision=revision, bucket="root")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
node_another_bucket.query("ATTACH TABLE s3.test")
# Wait for unfinished mutation completion.
time.sleep(3)
@ -342,7 +329,6 @@ def test_migrate_to_restorable_schema(cluster):
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
replace_config("<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>")
node.restart_clickhouse()
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096)))
@ -355,14 +341,10 @@ def test_migrate_to_restorable_schema(cluster):
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
# Restore to revision before mutation.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data")
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
create_table(node_another_bucket, "test", attach=True)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -374,7 +356,7 @@ def test_migrate_to_restorable_schema(cluster):
def test_restore_to_detached(cluster, replicated):
node = cluster.instances["node"]
create_table(node, "test", replicated)
create_table(node, "test", attach=False, replicated=replicated)
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
@ -393,11 +375,9 @@ def test_restore_to_detached(cluster, replicated):
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test", replicated)
node_another_bucket.stop_clickhouse()
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True)
node_another_bucket.start_clickhouse()
node_another_bucket.query("SYSTEM RESTART DISK s3")
create_table(node_another_bucket, "test", replicated=replicated)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0)

View File

@ -85,6 +85,7 @@ SYSTEM RELOAD DICTIONARY ['SYSTEM RELOAD DICTIONARIES','RELOAD DICTIONARY','RELO
SYSTEM RELOAD MODEL ['SYSTEM RELOAD MODELS','RELOAD MODEL','RELOAD MODELS'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD EMBEDDED DICTIONARIES ['RELOAD EMBEDDED DICTIONARIES'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD [] \N SYSTEM
SYSTEM RESTART DISK ['SYSTEM RESTART DISK'] GLOBAL SYSTEM
SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP_MERGES','START MERGES'] TABLE SYSTEM
SYSTEM TTL MERGES ['SYSTEM STOP TTL MERGES','SYSTEM START TTL MERGES','STOP TTL MERGES','START TTL MERGES'] TABLE SYSTEM
SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','START FETCHES'] TABLE SYSTEM