From 1d80262a2a4d150098e267b52660d768807bbe08 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 14 Mar 2022 20:15:07 +0100 Subject: [PATCH] Add write settings, file cache settings --- src/Common/FileCache.cpp | 17 ++++---- src/Common/FileCache.h | 8 +--- src/Common/FileCacheFactory.cpp | 8 +--- src/Common/FileCacheFactory.h | 2 +- src/Common/FileCacheSettings.cpp | 16 ++++++++ src/Common/FileCacheSettings.h | 18 +++++++++ src/Common/FileCache_fwd.h | 3 ++ src/Disks/RemoteDisksCommon.cpp | 18 ++++++--- src/IO/WriteBufferFromS3.cpp | 40 ++++++++++++++----- src/IO/WriteBufferFromS3.h | 26 +++++++----- .../MergeTree/MergedBlockOutputStream.cpp | 8 ++-- .../MergeTree/MergedBlockOutputStream.h | 7 +++- 12 files changed, 118 insertions(+), 53 deletions(-) create mode 100644 src/Common/FileCacheSettings.cpp create mode 100644 src/Common/FileCacheSettings.h diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index dffa4fac44d..5cc2e707667 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -31,13 +32,11 @@ namespace IFileCache::IFileCache( const String & cache_base_path_, - size_t max_size_, - size_t max_element_size_, - size_t max_file_segment_size_) + const FileCacheSettings & cache_settings_) : cache_base_path(cache_base_path_) - , max_size(max_size_) - , max_element_size(max_element_size_) - , max_file_segment_size(max_file_segment_size_) + , max_size(cache_settings_.max_cache_size) + , max_element_size(cache_settings_.max_cache_elements) + , max_file_segment_size(cache_settings_.max_cache_elements) { } @@ -71,8 +70,8 @@ void IFileCache::assertInitialized() const throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized"); } -LRUFileCache::LRUFileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_, size_t max_file_segment_size_) - : IFileCache(cache_base_path_, max_size_, max_element_size_, max_file_segment_size_) +LRUFileCache::LRUFileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_) + : IFileCache(cache_base_path_, cache_settings_) , log(&Poco::Logger::get("LRUFileCache")) { } @@ -364,7 +363,7 @@ bool LRUFileCache::tryReserve( auto is_overflow = [&] { - return (current_size + size - removed_size > max_size) + return (max_size != 0 && current_size + size - removed_size > max_size) || (max_element_size != 0 && queue_size > max_element_size); }; diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index d51dfe7a9ff..dd585369853 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -32,9 +32,7 @@ public: IFileCache( const String & cache_base_path_, - size_t max_size_, - size_t max_element_size_, - size_t max_file_segment_size_); + const FileCacheSettings & cache_settings_); virtual ~IFileCache() = default; @@ -111,9 +109,7 @@ class LRUFileCache final : public IFileCache public: LRUFileCache( const String & cache_base_path_, - size_t max_size_, - size_t max_element_size_ = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS, - size_t max_file_segment_size_ = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE); + const FileCacheSettings & cache_settings_); FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override; diff --git a/src/Common/FileCacheFactory.cpp b/src/Common/FileCacheFactory.cpp index fc8dff0b26c..567d091fdeb 100644 --- a/src/Common/FileCacheFactory.cpp +++ b/src/Common/FileCacheFactory.cpp @@ -24,18 +24,14 @@ FileCachePtr FileCacheFactory::getImpl(const std::string & cache_base_path, std: } FileCachePtr FileCacheFactory::getOrCreate( - const std::string & cache_base_path, size_t max_size, size_t max_elements_size, size_t max_file_segment_size) + const std::string & cache_base_path, const FileCacheSettings & file_cache_settings) { std::lock_guard lock(mutex); auto cache = getImpl(cache_base_path, lock); if (cache) - { - if (cache->capacity() != max_size) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cache with path `{}` already exists, but has different max size", cache_base_path); return cache; - } - cache = std::make_shared(cache_base_path, max_size, max_elements_size, max_file_segment_size); + cache = std::make_shared(cache_base_path, file_cache_settings); caches.emplace(cache_base_path, cache); return cache; } diff --git a/src/Common/FileCacheFactory.h b/src/Common/FileCacheFactory.h index f2432f03cae..176b96a862e 100644 --- a/src/Common/FileCacheFactory.h +++ b/src/Common/FileCacheFactory.h @@ -17,7 +17,7 @@ class FileCacheFactory final : private boost::noncopyable public: static FileCacheFactory & instance(); - FileCachePtr getOrCreate(const std::string & cache_base_path, size_t max_size, size_t max_elements_size, size_t max_file_segment_size); + FileCachePtr getOrCreate(const std::string & cache_base_path, const FileCacheSettings & file_cache_settings); private: FileCachePtr getImpl(const std::string & cache_base_path, std::lock_guard &); diff --git a/src/Common/FileCacheSettings.cpp b/src/Common/FileCacheSettings.cpp new file mode 100644 index 00000000000..39a6a2ec723 --- /dev/null +++ b/src/Common/FileCacheSettings.cpp @@ -0,0 +1,16 @@ +#include "FileCacheSettings.h" + +#include + +namespace DB +{ + +void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) +{ + max_cache_size = config.getUInt64(config_prefix + ".data_cache_max_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE); + max_cache_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS); + max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE); + cache_on_insert = config.getUInt64(config_prefix + ".cache_on_insert", false); +} + +} diff --git a/src/Common/FileCacheSettings.h b/src/Common/FileCacheSettings.h new file mode 100644 index 00000000000..8aa2bfb9214 --- /dev/null +++ b/src/Common/FileCacheSettings.h @@ -0,0 +1,18 @@ +#include + +namespace Poco { namespace Util { class AbstractConfiguration; }} + +namespace DB +{ + +struct FileCacheSettings +{ + size_t max_cache_size = 0; + size_t max_cache_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS; + size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE; + bool cache_on_insert = false; + + void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); +}; + +} diff --git a/src/Common/FileCache_fwd.h b/src/Common/FileCache_fwd.h index cab1525600b..7448f0c8c89 100644 --- a/src/Common/FileCache_fwd.h +++ b/src/Common/FileCache_fwd.h @@ -4,10 +4,13 @@ namespace DB { +static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE = 1024 * 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024; class IFileCache; using FileCachePtr = std::shared_ptr; +struct FileCacheSettings; + } diff --git a/src/Disks/RemoteDisksCommon.cpp b/src/Disks/RemoteDisksCommon.cpp index 36f2aed3e7c..4805434e5ee 100644 --- a/src/Disks/RemoteDisksCommon.cpp +++ b/src/Disks/RemoteDisksCommon.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -64,18 +65,23 @@ FileCachePtr getCachePtrForDisk( if (!fs::exists(cache_base_path)) fs::create_directories(cache_base_path); - LOG_INFO(&Poco::Logger::get("Disk(" + name + ")"), "Disk registered with cache path: {}", cache_base_path); - auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context); if (metadata_path == cache_base_path) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata path and cache base path must be different: {}", metadata_path); - size_t max_cache_size = config.getUInt64(config_prefix + ".data_cache_max_size", 1024*1024*1024); - size_t max_cache_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS); - size_t max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE); + FileCacheSettings file_cache_settings; + file_cache_settings.loadFromConfig(config, config_prefix); - auto cache = FileCacheFactory::instance().getOrCreate(cache_base_path, max_cache_size, max_cache_elements, max_file_segment_size); + auto cache = FileCacheFactory::instance().getOrCreate(cache_base_path, file_cache_settings); cache->initialize(); + + auto * log = &Poco::Logger::get("Disk(" + name + ")"); + LOG_INFO(log, "Disk registered with cache path: {}. Cache size: {}, max cache elements size: {}, max_file_segment_size: {}", + cache_base_path, + max_cache_size ? toString(max_cache_size) : "UNLIMITED", + max_cache_elements ? toString(max_cache_elements) : "UNLIMITED", + max_file_segment_size); + return cache; } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index eda7bb6f8ae..4f1016d43bd 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -2,17 +2,19 @@ #if USE_AWS_S3 -# include -# include +#include +#include -# include -# include -# include -# include -# include -# include +#include +#include -# include +#include +#include +#include +#include +#include + +#include namespace ProfileEvents @@ -59,7 +61,8 @@ WriteBufferFromS3::WriteBufferFromS3( size_t max_single_part_upload_size_, std::optional> object_metadata_, size_t buffer_size_, - ScheduleFunc schedule_) + ScheduleFunc schedule_, + FileCachePtr cache_) : BufferWithOwnMemory(buffer_size_, nullptr, 0) , bucket(bucket_) , key(key_) @@ -70,6 +73,7 @@ WriteBufferFromS3::WriteBufferFromS3( , upload_part_size_multiply_threshold(upload_part_size_multiply_threshold_) , max_single_part_upload_size(max_single_part_upload_size_) , schedule(std::move(schedule_)) + , cache(cache_) { allocateBuffer(); } @@ -95,7 +99,6 @@ void WriteBufferFromS3::nextImpl() if (!multipart_upload_id.empty() && last_part_size > upload_part_size) { - writePart(); allocateBuffer(); @@ -126,6 +129,21 @@ WriteBufferFromS3::~WriteBufferFromS3() } } +void WriteBufferFromS3::tryWriteToCacheIfNeeded() +{ + if (!cache || IFileCache::shouldBypassCache()) + return; + + try + { + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + throw; + } +} + void WriteBufferFromS3::preFinalize() { next(); diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index a4fbcbcdeeb..595a7a929c1 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -4,16 +4,19 @@ #if USE_AWS_S3 -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include -# include -# include +#include +#include -# include +#include +#include + +#include namespace Aws::S3 { @@ -51,7 +54,8 @@ public: size_t max_single_part_upload_size_, std::optional> object_metadata_ = std::nullopt, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, - ScheduleFunc schedule_ = {}); + ScheduleFunc schedule_ = {}, + FileCachePtr cache_ = nullptr); ~WriteBufferFromS3() override; @@ -82,6 +86,8 @@ private: void waitForReadyBackGroundTasks(); void waitForAllBackGroundTasks(); + void tryWriteToCacheIfNeeded(); + String bucket; String key; std::optional> object_metadata; @@ -113,6 +119,8 @@ private: std::condition_variable bg_tasks_condvar; Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); + + FileCachePtr cache; }; } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index f94c89e20bd..5bd9217226d 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -122,7 +122,8 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( MergeTreeData::MutableDataPartPtr & new_part, bool sync, const NamesAndTypesList * total_columns_list, - MergeTreeData::DataPart::Checksums * additional_column_checksums) + MergeTreeData::DataPart::Checksums * additional_column_checksums, + const WriteSettings & write_settings) { /// Finish write and get checksums. MergeTreeData::DataPart::Checksums checksums; @@ -156,7 +157,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( auto finalizer = std::make_unique(*writer, new_part, files_to_remove_after_sync, sync); if (new_part->isStoredOnDisk()) - finalizer->written_files = finalizePartOnDisk(new_part, checksums); + finalizer->written_files = finalizePartOnDisk(new_part, checksums, write_settings); new_part->rows_count = rows_count; new_part->modification_time = time(nullptr); @@ -174,7 +175,8 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk( const MergeTreeData::DataPartPtr & new_part, - MergeTreeData::DataPart::Checksums & checksums) + MergeTreeData::DataPart::Checksums & checksums, + const WriteSettings & write_settings) { WrittenFiles written_files; if (new_part->isProjectionPart()) diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index c17cfd22cd8..05f70239517 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -54,7 +55,8 @@ public: MergeTreeData::MutableDataPartPtr & new_part, bool sync, const NamesAndTypesList * total_columns_list = nullptr, - MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); + MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr, + const WriteSettings & write_settings = {}); void finalizePart( MergeTreeData::MutableDataPartPtr & new_part, @@ -71,7 +73,8 @@ private: using WrittenFiles = std::vector>; WrittenFiles finalizePartOnDisk( const MergeTreeData::DataPartPtr & new_part, - MergeTreeData::DataPart::Checksums & checksums); + MergeTreeData::DataPart::Checksums & checksums, + const WriteSettings & write_settings); NamesAndTypesList columns_list; IMergeTreeDataPart::MinMaxIndex minmax_idx;