Rename and move setting to ObjectStorage classes

This commit is contained in:
Кирилл Гарбар 2024-07-30 16:09:50 +03:00
parent 5dfbcabcf1
commit 66489ef6d6
12 changed files with 45 additions and 70 deletions

View File

@ -121,4 +121,6 @@ static constexpr auto QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS = 1000000000;
static constexpr auto QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS = 0;
#endif
static constexpr auto DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT = 1000uz;
}

View File

@ -891,7 +891,7 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(UInt64, object_storage_remove_recursive_batch_size, S3::DEFAULT_REMOVE_SHARED_RECURSIVE_BATCH_SIZE, "Max number of files to collect for removal in one transaction. Used to reduce memory usage.", 0) \
M(UInt64, object_storage_remove_recursive_file_limit, DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT, "Max number of files to store in memory during remove. Zero value means unlimited. Used to reduce memory usage.", 0) \
\
\
/* ###################################### */ \

View File

@ -57,7 +57,8 @@ String ClickHouseVersion::toString() const
/// Note: please check if the key already exists to prevent duplicate entries.
static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory::SettingsChanges>> settings_changes_history_initializer =
{
{"24.8", {{"object_storage_remove_recursive_batch_size", UINT64_MAX, 1000, "Added new setting to limit number of files stored in memory while removing from object storage."}}},
{"24.8", {{"object_storage_remove_recursive_file_limit", 0, 1000, "Added new setting to limit number of files stored in memory while removing from object storage. Zero value means unlimited."}
}},
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
{"output_format_binary_encode_types_in_binary_format", false, false, "Added new setting to allow to write type names in binary format in RowBinaryWithNamesAndTypes output format"},
{"input_format_binary_decode_types_in_binary_format", false, false, "Added new setting to allow to read type names in binary format in RowBinaryWithNamesAndTypes input format"},
@ -81,7 +82,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}}},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},

View File

@ -251,7 +251,6 @@ catch (Exception & e)
void IDisk::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr /*context*/, const String & config_prefix, const DisksMap & /*map*/)
{
copying_thread_pool.setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
remove_shared_recursive_batch_size = config.getUInt64(config_prefix + ".remove_shared_recursive_batch_size", S3::DEFAULT_REMOVE_SHARED_RECURSIVE_BATCH_SIZE);
}
}

View File

@ -8,7 +8,6 @@
#include <Common/Exception.h>
#include <Disks/DiskType.h>
#include <IO/ReadSettings.h>
#include <IO/S3Defines.h>
#include <IO/WriteSettings.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/WriteMode.h>
@ -116,7 +115,6 @@ public:
/// Default constructor.
IDisk(const String & name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: name(name_)
, remove_shared_recursive_batch_size(config.getUInt64(config_prefix + ".remove_shared_recursive_batch_size", S3::DEFAULT_REMOVE_SHARED_RECURSIVE_BATCH_SIZE))
, copying_thread_pool(
CurrentMetrics::IDiskCopierThreads,
CurrentMetrics::IDiskCopierThreadsActive,
@ -127,7 +125,6 @@ public:
explicit IDisk(const String & name_)
: name(name_)
, remove_shared_recursive_batch_size(S3::DEFAULT_REMOVE_SHARED_RECURSIVE_BATCH_SIZE)
, copying_thread_pool(
CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, CurrentMetrics::IDiskCopierThreadsScheduled, 16)
{
@ -505,8 +502,6 @@ protected:
virtual void checkAccessImpl(const String & path);
UInt64 remove_shared_recursive_batch_size;
private:
ThreadPool copying_thread_pool;
bool is_custom_disk = false;

View File

@ -32,10 +32,6 @@ using RemoveBatchRequest = std::vector<RemoveRequest>;
struct IDiskTransaction : private boost::noncopyable
{
public:
IDiskTransaction(): remove_shared_recursive_batch_size(S3::DEFAULT_REMOVE_SHARED_RECURSIVE_BATCH_SIZE) {}
explicit IDiskTransaction(UInt64 remove_shared_recursive_batch_size_): remove_shared_recursive_batch_size(remove_shared_recursive_batch_size_) {}
/// Tries to commit all accumulated operations simultaneously.
/// If something fails rollback and throw exception.
virtual void commit() = 0;
@ -135,9 +131,6 @@ public:
/// Truncate file to the target size.
virtual void truncateFile(const std::string & src_path, size_t target_size) = 0;
protected:
UInt64 remove_shared_recursive_batch_size;
};
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;

View File

@ -42,7 +42,7 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
*object_storage,
*metadata_storage,
send_metadata ? metadata_helper.get() : nullptr,
remove_shared_recursive_batch_size);
remove_shared_recursive_file_limit);
}
DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDisk(DiskObjectStorage& to_disk)
@ -52,7 +52,8 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransactionToAnotherDis
*metadata_storage,
*to_disk.getObjectStorage(),
*to_disk.getMetadataStorage(),
send_metadata ? metadata_helper.get() : nullptr);
send_metadata ? metadata_helper.get() : nullptr,
remove_shared_recursive_file_limit);
}
@ -72,6 +73,7 @@ DiskObjectStorage::DiskObjectStorage(
, read_resource_name(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name(config.getString(config_prefix + ".write_resource", ""))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}, WriteSettings{}))
, remove_shared_recursive_file_limit(config.getUInt64(config_prefix + ".remove_shared_recursive_file_limit", DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT))
{
data_source_description = DataSourceDescription{
.type = DataSourceType::ObjectStorage,
@ -374,6 +376,8 @@ void DiskObjectStorage::removeSharedFileIfExists(const String & path, bool delet
void DiskObjectStorage::removeSharedRecursive(
const String & path, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only)
{
/// At most remove_shared_recursive_file_limit files are removed in one transaction
/// Retry until all of them are removed
while (exists(path))
{
auto transaction = createObjectStorageTransaction();
@ -558,6 +562,8 @@ void DiskObjectStorage::applyNewSettings(
write_resource_name = new_write_resource_name;
}
remove_shared_recursive_file_limit = config.getUInt64(config_prefix + ".remove_shared_recursive_file_limit", DEFAULT_REMOVE_SHARED_RECURSIVE_FILE_LIMIT);
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}

View File

@ -246,6 +246,8 @@ private:
String write_resource_name;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
UInt64 remove_shared_recursive_file_limit;
};
using DiskObjectStoragePtr = std::shared_ptr<DiskObjectStorage>;

View File

@ -30,34 +30,13 @@ namespace ErrorCodes
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
UInt64 remove_shared_recursive_file_limit_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_storage.createTransaction())
, metadata_helper(metadata_helper_)
{}
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
UInt64 remove_shared_recursive_batch_size_)
: IDiskTransaction(remove_shared_recursive_batch_size_)
, object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_storage.createTransaction())
, metadata_helper(metadata_helper_)
{}
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_transaction_)
, metadata_helper(metadata_helper_)
, remove_shared_recursive_file_limit(remove_shared_recursive_file_limit_)
{}
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
@ -65,12 +44,12 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_,
UInt64 remove_shared_recursive_batch_size_)
: IDiskTransaction(remove_shared_recursive_batch_size_)
, object_storage(object_storage_)
UInt64 remove_shared_recursive_file_limit_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_transaction_)
, metadata_helper(metadata_helper_)
, remove_shared_recursive_file_limit(remove_shared_recursive_file_limit_)
{}
MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction(
@ -78,8 +57,9 @@ MultipleDisksObjectStorageTransaction::MultipleDisksObjectStorageTransaction(
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage_,
IMetadataStorage& destination_metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
: DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction())
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
UInt64 remove_shared_recursive_file_limit_)
: DiskObjectStorageTransaction(object_storage_, metadata_storage_, metadata_helper_, destination_metadata_storage_.createTransaction(), remove_shared_recursive_file_limit_)
, destination_object_storage(destination_object_storage_)
, destination_metadata_storage(destination_metadata_storage_)
{}
@ -316,7 +296,7 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
const bool keep_all_batch_data;
/// paths inside the 'this->path'
const NameSet file_names_remove_metadata_only;
const UInt64 batch_size;
const UInt64 limit;
/// map from local_path to its remote objects with hardlinks counter
/// local_path is the path inside 'this->path'
@ -328,12 +308,12 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
const std::string & path_,
bool keep_all_batch_data_,
const NameSet & file_names_remove_metadata_only_,
UInt64 batch_size_)
UInt64 limit_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, keep_all_batch_data(keep_all_batch_data_)
, file_names_remove_metadata_only(file_names_remove_metadata_only_)
, batch_size(batch_size_)
, limit(limit_)
{}
std::string getInfoForLog() const override
@ -341,10 +321,15 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
return fmt::format("RemoveRecursiveObjectStorageOperation (path: {})", path);
}
bool checkLimitReached() const
{
return limit > 0 && objects_to_remove_by_path.size() == limit;
}
void removeMetadataRecursive(MetadataTransactionPtr tx, const std::string & path_to_remove)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (objects_to_remove_by_path.size() == batch_size)
if (checkLimitReached())
return;
if (metadata_storage.isFile(path_to_remove))
@ -388,12 +373,12 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
{
for (auto it = metadata_storage.iterateDirectory(path_to_remove); it->isValid(); it->next())
{
if (objects_to_remove_by_path.size() == batch_size)
if (checkLimitReached())
return;
removeMetadataRecursive(tx, it->path());
}
/// Do not delete in case directory contains >= batch_size files
if (objects_to_remove_by_path.size() < batch_size)
/// Do not delete in case directory contains >= limit files
if (objects_to_remove_by_path.size() < limit)
tx->removeDirectory(path_to_remove);
}
}
@ -717,7 +702,7 @@ void DiskObjectStorageTransaction::removeSharedRecursive(
const std::string & path, bool keep_all_shared_data, const NameSet & file_names_remove_metadata_only)
{
auto operation = std::make_unique<RemoveRecursiveObjectStorageOperation>(
object_storage, metadata_storage, path, keep_all_shared_data, file_names_remove_metadata_only, remove_shared_recursive_batch_size);
object_storage, metadata_storage, path, keep_all_shared_data, file_names_remove_metadata_only, remove_shared_recursive_file_limit);
operations_to_execute.emplace_back(std::move(operation));
}

View File

@ -63,30 +63,21 @@ protected:
DiskObjectStorageOperations operations_to_execute;
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_);
UInt64 remove_shared_recursive_file_limit;
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
MetadataTransactionPtr metadata_transaction_,
UInt64 remove_shared_recursive_batch_size);
UInt64 remove_shared_recursive_file_limit);
public:
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
UInt64 remove_shared_recursive_batch_size);
UInt64 remove_shared_recursive_file_limit);
void commit() override;
void undo() override;
@ -149,7 +140,8 @@ struct MultipleDisksObjectStorageTransaction final : public DiskObjectStorageTra
IMetadataStorage & metadata_storage_,
IObjectStorage& destination_object_storage,
IMetadataStorage& destination_metadata_storage,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_,
UInt64 remove_shared_recursive_file_limit_);
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
};

View File

@ -34,7 +34,6 @@ inline static constexpr uint64_t DEFAULT_MAX_SINGLE_READ_TRIES = 4;
inline static constexpr uint64_t DEFAULT_MAX_UNEXPECTED_WRITE_ERROR_RETRIES = 4;
inline static constexpr uint64_t DEFAULT_MAX_REDIRECTS = 10;
inline static constexpr uint64_t DEFAULT_RETRY_ATTEMPTS = 100;
inline static constexpr uint64_t DEFAULT_REMOVE_SHARED_RECURSIVE_BATCH_SIZE = 1000;
inline static constexpr bool DEFAULT_ALLOW_NATIVE_COPY = true;
inline static constexpr bool DEFAULT_CHECK_OBJECTS_AFTER_UPLOAD = false;

View File

@ -15,7 +15,7 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<remove_shared_recursive_batch_size>3</remove_shared_recursive_batch_size>
<remove_shared_recursive_file_limit>3</remove_shared_recursive_file_limit>
</test3>
</disks>
<policies>