Apply disk settings on config reload.

This commit is contained in:
Pavel Kovalenko 2021-04-29 23:32:19 +03:00
parent 838ede6c62
commit 0050dbb75b
14 changed files with 108 additions and 91 deletions

View File

@ -206,9 +206,9 @@ void DiskDecorator::startup()
delegate->startup();
}
void DiskDecorator::applyNewSettings(ContextConstPtr context)
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context)
{
delegate->applyNewSettings(context);
delegate->applyNewSettings(config, context);
}
}

View File

@ -69,9 +69,9 @@ public:
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override;
protected:
void applyNewSettings(ContextConstPtr context) override;
Executor & getExecutor() override;
DiskPtr delegate;

View File

@ -280,7 +280,7 @@ bool DiskRestartProxy::checkUniqueId(const String & id) const
return DiskDecorator::checkUniqueId(id);
}
void DiskRestartProxy::restart(ContextConstPtr context)
void DiskRestartProxy::restart()
{
/// Speed up processing unhealthy requests.
DiskDecorator::shutdown();
@ -303,7 +303,6 @@ void DiskRestartProxy::restart(ContextConstPtr context)
LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName());
DiskDecorator::applyNewSettings(context);
DiskDecorator::startup();
LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName());

View File

@ -65,7 +65,7 @@ public:
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override;
void restart(ContextConstPtr context);
void restart();
private:
friend class RestartAwareReadBuffer;

View File

@ -55,11 +55,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
std::shared_ptr<DiskSelector> result = std::make_shared<DiskSelector>(*this);
constexpr auto default_disk_name = "default";
std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->getDisksMap())
{
old_disks_minus_new_disks.insert(disk_name);
}
DisksMap old_disks_minus_new_disks (result->getDisksMap());
for (const auto & disk_name : keys)
{
@ -73,10 +69,11 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
}
else
{
old_disks_minus_new_disks.erase(disk_name);
auto disk = old_disks_minus_new_disks[disk_name];
/// TODO: Ideally ClickHouse shall complain if disk has changed, but
/// implementing that may appear as not trivial task.
disk->applyNewSettings(config, context);
old_disks_minus_new_disks.erase(disk_name);
}
}
@ -91,7 +88,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
writeString("Disks ", warning);
int index = 0;
for (const String & name : old_disks_minus_new_disks)
for (const auto & [name, _] : old_disks_minus_new_disks)
{
if (index++ > 0)
writeString(", ", warning);

View File

@ -81,9 +81,4 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
void IDisk::applyNewSettings(ContextConstPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Apply new settings is not implemented for disk of type {}", getType());
}
}

View File

@ -14,6 +14,7 @@
#include <boost/noncopyable.hpp>
#include <Poco/Path.h>
#include <Poco/Timestamp.h>
#include "Poco/Util/AbstractConfiguration.h"
namespace CurrentMetrics
@ -230,12 +231,12 @@ public:
/// Returns guard, that insures synchronization of directory metadata with storage device.
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextConstPtr) { }
protected:
friend class DiskDecorator;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(ContextConstPtr);
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }

View File

@ -569,15 +569,15 @@ DiskS3::DiskS3(
String bucket_,
String s3_root_path_,
String metadata_path_,
DiskS3Settings settings_,
GetDiskSettings settings_getter)
: IDisk(std::make_unique<AsyncExecutor>(settings_.thread_pool_size))
SettingsPtr settings_,
GetDiskSettings settings_getter_)
: IDisk(std::make_unique<AsyncExecutor>(settings_->thread_pool_size))
, name(std::move(name_))
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
, settings(std::move(settings_))
, disk_settings_getter(settings_getter)
, current_settings(std::move(settings_))
, settings_getter(settings_getter_)
{
}
@ -641,11 +641,18 @@ void DiskS3::clearDirectory(const String & path)
}
void DiskS3::moveFile(const String & from_path, const String & to_path)
{
auto settings = current_settings.get();
moveFile(from_path, to_path, settings->send_metadata);
}
void DiskS3::moveFile(const String & from_path, const String & to_path, bool send_metadata)
{
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
if (settings.send_metadata)
if (send_metadata)
{
auto revision = ++revision_counter;
const ObjectMetadata object_metadata {
@ -673,24 +680,26 @@ void DiskS3::replaceFile(const String & from_path, const String & to_path)
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings.client, bucket, metadata, settings.s3_max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings.min_bytes_for_seek);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
auto settings = current_settings.get();
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new S3 object.
auto s3_path = getRandomName();
std::optional<ObjectMetadata> object_metadata;
if (settings.send_metadata)
if (settings->send_metadata)
{
auto revision = ++revision_counter;
object_metadata = {
@ -703,11 +712,11 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), s3_root_path + s3_path);
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings.client,
settings->client,
bucket,
metadata.s3_root_path + s3_path,
settings.s3_min_upload_part_size,
settings.s3_max_single_part_upload_size,
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size);
@ -781,6 +790,8 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
{
if (!keys.empty())
{
auto settings = current_settings.get();
for (const auto & chunk : keys)
{
Aws::S3::Model::Delete delkeys;
@ -790,7 +801,7 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = settings.client->DeleteObjects(request);
auto outcome = settings->client->DeleteObjects(request);
throwIfError(outcome);
}
}
@ -867,9 +878,16 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
{
auto settings = current_settings.get();
createHardLink(src_path, dst_path, settings->send_metadata);
}
void DiskS3::createHardLink(const String & src_path, const String & dst_path, bool send_metadata)
{
/// We don't need to record hardlinks created to shadow folder.
if (settings.send_metadata && !dst_path.starts_with("shadow/"))
if (send_metadata && !dst_path.starts_with("shadow/"))
{
auto revision = ++revision_counter;
const ObjectMetadata object_metadata {
@ -906,22 +924,24 @@ void DiskS3::setReadOnly(const String & path)
void DiskS3::shutdown()
{
auto settings = current_settings.get();
/// This call stops any next retry attempts for ongoing S3 requests.
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
/// This should significantly speed up shutdown process if S3 is unhealthy.
settings.client->DisableRequestProcessing();
settings->client->DisableRequestProcessing();
}
void DiskS3::createFileOperationObject(const String & operation_name, UInt64 revision, const DiskS3::ObjectMetadata & metadata)
{
auto settings = current_settings.get();
const String key = "operations/r" + revisionToString(revision) + "-" + operation_name;
WriteBufferFromS3 buffer(
settings.client,
settings->client,
bucket,
s3_root_path + key,
settings.s3_min_upload_part_size,
settings.s3_max_single_part_upload_size,
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size,
metadata);
buffer.write('0');
@ -930,7 +950,9 @@ void DiskS3::createFileOperationObject(const String & operation_name, UInt64 rev
void DiskS3::startup()
{
if (!settings.send_metadata)
auto settings = current_settings.get();
if (!settings->send_metadata)
return;
LOG_INFO(log, "Starting up disk {}", name);
@ -973,11 +995,12 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
return version;
auto settings = current_settings.get();
ReadBufferFromS3 buffer(
settings.client,
settings->client,
source_bucket,
source_path + SCHEMA_VERSION_OBJECT,
settings.s3_max_single_read_retries);
settings->s3_max_single_read_retries);
readIntText(version, buffer);
@ -986,12 +1009,14 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
void DiskS3::saveSchemaVersion(const int & version)
{
auto settings = current_settings.get();
WriteBufferFromS3 buffer(
settings.client,
settings->client,
bucket,
s3_root_path + SCHEMA_VERSION_OBJECT,
settings.s3_min_upload_part_size,
settings.s3_max_single_part_upload_size);
settings->s3_min_upload_part_size,
settings->s3_max_single_part_upload_size);
writeIntText(version, buffer);
buffer.finalize();
@ -999,6 +1024,7 @@ void DiskS3::saveSchemaVersion(const int & version)
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
{
auto settings = current_settings.get();
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(bucket + "/" + key);
request.SetBucket(bucket);
@ -1006,7 +1032,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
request.SetMetadata(metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
auto outcome = settings.client->CopyObject(request);
auto outcome = settings->client->CopyObject(request);
throwIfError(outcome);
}
@ -1097,12 +1123,13 @@ void DiskS3::migrateToRestorableSchema()
bool DiskS3::checkObjectExists(const String & source_bucket, const String & prefix) const
{
auto settings = current_settings.get();
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(source_bucket);
request.SetPrefix(prefix);
request.SetMaxKeys(1);
auto outcome = settings.client->ListObjectsV2(request);
auto outcome = settings->client->ListObjectsV2(request);
throwIfError(outcome);
return !outcome.GetResult().GetContents().empty();
@ -1110,12 +1137,13 @@ bool DiskS3::checkObjectExists(const String & source_bucket, const String & pref
bool DiskS3::checkUniqueId(const String & id) const
{
auto settings = current_settings.get();
/// Check that we have right s3 and have access rights
/// Actually interprets id as s3 object name and checks if it exists
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetPrefix(id);
auto resp = settings.client->ListObjectsV2(request);
auto resp = settings->client->ListObjectsV2(request);
throwIfError(resp);
Aws::Vector<Aws::S3::Model::Object> object_list = resp.GetResult().GetContents();
@ -1127,11 +1155,12 @@ bool DiskS3::checkUniqueId(const String & id) const
Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket, const String & key) const
{
auto settings = current_settings.get();
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(source_bucket);
request.SetKey(key);
auto outcome = settings.client->HeadObject(request);
auto outcome = settings->client->HeadObject(request);
throwIfError(outcome);
return outcome.GetResultWithOwnership();
@ -1139,15 +1168,16 @@ Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket
void DiskS3::listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback) const
{
auto settings = current_settings.get();
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(source_bucket);
request.SetPrefix(source_path);
request.SetMaxKeys(settings.list_object_keys_size);
request.SetMaxKeys(settings->list_object_keys_size);
Aws::S3::Model::ListObjectsV2Outcome outcome;
do
{
outcome = settings.client->ListObjectsV2(request);
outcome = settings->client->ListObjectsV2(request);
throwIfError(outcome);
bool should_continue = callback(outcome.GetResult());
@ -1161,12 +1191,13 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const
{
auto settings = current_settings.get();
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
auto outcome = settings.client->CopyObject(request);
auto outcome = settings->client->CopyObject(request);
throwIfError(outcome);
}
@ -1373,13 +1404,15 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
void DiskS3::restoreFileOperations(const RestoreInformation & restore_information)
{
auto settings = current_settings.get();
LOG_INFO(log, "Starting restore file operations for disk {}", name);
/// Enable recording file operations if we restore to different bucket / path.
settings.send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path;
bool send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path;
std::set<String> renames;
auto restore_file_operations = [this, &restore_information, &renames](auto list_result)
auto restore_file_operations = [this, &restore_information, &renames, &send_metadata](auto list_result)
{
const String rename = "rename";
const String hardlink = "hardlink";
@ -1401,7 +1434,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
return false;
/// Keep original revision if restore to different bucket / path.
if (settings.send_metadata)
if (send_metadata)
revision_counter = revision - 1;
auto object_metadata = headObject(restore_information.source_bucket, key).GetMetadata();
@ -1411,7 +1444,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
auto to_path = object_metadata["to_path"];
if (exists(from_path))
{
moveFile(from_path, to_path);
moveFile(from_path, to_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
if (restore_information.detached && isDirectory(to_path))
@ -1438,7 +1471,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
if (exists(src_path))
{
createDirectories(directoryPath(dst_path));
createHardLink(src_path, dst_path);
createHardLink(src_path, dst_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
}
}
@ -1475,8 +1508,6 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
}
}
settings.send_metadata = true;
LOG_INFO(log, "File operations restored for disk {}", name);
}
@ -1516,14 +1547,14 @@ void DiskS3::onFreeze(const String & path)
revision_file_buf.finalize();
}
void DiskS3::applyNewSettings(ContextConstPtr context)
void DiskS3::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context)
{
auto new_settings = disk_settings_getter(context->getConfigRef(), "storage_configuration.disks." + name, context);
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);
settings = new_settings;
current_settings.set(std::move(new_settings));
if (AsyncExecutor * exec = dynamic_cast<AsyncExecutor*>(&getExecutor()))
exec->setMaxThreads(new_settings.thread_pool_size);
exec->setMaxThreads(current_settings.get()->thread_pool_size);
}
DiskS3Settings::DiskS3Settings(

View File

@ -2,6 +2,7 @@
#include <atomic>
#include <common/logger_useful.h>
#include <Common/MultiVersion.h>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
@ -49,7 +50,8 @@ class DiskS3 : public IDisk
public:
using ObjectMetadata = std::map<std::string, std::string>;
using Futures = std::vector<std::future<void>>;
using GetDiskSettings = std::function<DiskS3Settings(const Poco::Util::AbstractConfiguration &, const String, ContextConstPtr)>;
using SettingsPtr = std::unique_ptr<DiskS3Settings>;
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextConstPtr)>;
friend class DiskS3Reservation;
@ -62,8 +64,8 @@ public:
String bucket_,
String s3_root_path_,
String metadata_path_,
DiskS3Settings settings_,
GetDiskSettings settings_getter);
SettingsPtr settings_,
GetDiskSettings settings_getter_);
const String & getName() const override { return name; }
@ -98,7 +100,7 @@ public:
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
void replaceFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
@ -125,6 +127,7 @@ public:
void removeSharedRecursive(const String & path, bool keep_s3) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void createHardLink(const String & src_path, const String & dst_path, bool send_metadata);
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
@ -151,8 +154,7 @@ public:
/// Dumps current revision counter into file 'revision.txt' at given path.
void onFreeze(const String & path) override;
protected:
void applyNewSettings(ContextConstPtr context) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override;
private:
bool tryReserve(UInt64 bytes);
@ -202,10 +204,10 @@ private:
const String bucket;
const String s3_root_path;
const String metadata_path;
DiskS3Settings settings;
MultiVersion<DiskS3Settings> current_settings;
/// Gets current disk settings from context.
GetDiskSettings disk_settings_getter;
/// Gets disk settings from context.
GetDiskSettings settings_getter;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;

View File

@ -145,13 +145,13 @@ getClient(const Poco::Util::AbstractConfiguration & config, const String & confi
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)));
}
DiskS3Settings getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextConstPtr context)
std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextConstPtr context)
{
return DiskS3Settings(
return std::make_unique<DiskS3Settings>(
getClient(config, config_prefix, context),
context->getSettingsRef().s3_max_single_read_retries,
context->getSettingsRef().s3_min_upload_part_size,
context->getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".s3_max_single_read_retries", context->getSettingsRef().s3_max_single_read_retries),
config.getUInt64(config_prefix + ".s3_min_upload_part_size", context->getSettingsRef().s3_min_upload_part_size),
config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),

View File

@ -648,7 +648,7 @@ void InterpreterSystemQuery::restartDisk(String & name)
auto disk = getContext()->getDisk(name);
if (DiskRestartProxy * restart_proxy = dynamic_cast<DiskRestartProxy*>(disk.get()))
restart_proxy->restart(getContext());
restart_proxy->restart();
else
throw Exception("Disk " + name + " doesn't have possibility to restart", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -1,7 +0,0 @@
<yandex>
<profiles>
<default>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</default>
</profiles>
</yandex>

View File

@ -6,6 +6,7 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3>
<hdd>
<type>local</type>

View File

@ -33,7 +33,7 @@ class SafeThread(threading.Thread):
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/users.d/s3.xml')
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/storage_conf.xml')
def replace_config(old, new):
@ -52,8 +52,7 @@ def cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=["configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"],
user_configs=["configs/config.d/s3.xml"], with_minio=True)
"configs/config.d/log_conf.xml"], with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -421,7 +420,6 @@ def test_s3_disk_apply_new_settings(cluster):
"<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>")
node.query("SYSTEM RELOAD CONFIG")
node.query("SYSTEM RESTART DISK s3")
s3_requests_before = get_s3_requests()
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))