Add write settings, file cache settings

This commit is contained in:
kssenii 2022-03-14 20:15:07 +01:00
parent af3bb3b7af
commit 1d80262a2a
12 changed files with 118 additions and 53 deletions

View File

@ -3,6 +3,7 @@
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/hex.h> #include <Common/hex.h>
#include <Common/FileCacheSettings.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
@ -31,13 +32,11 @@ namespace
IFileCache::IFileCache( IFileCache::IFileCache(
const String & cache_base_path_, const String & cache_base_path_,
size_t max_size_, const FileCacheSettings & cache_settings_)
size_t max_element_size_,
size_t max_file_segment_size_)
: cache_base_path(cache_base_path_) : cache_base_path(cache_base_path_)
, max_size(max_size_) , max_size(cache_settings_.max_cache_size)
, max_element_size(max_element_size_) , max_element_size(cache_settings_.max_cache_elements)
, max_file_segment_size(max_file_segment_size_) , max_file_segment_size(cache_settings_.max_cache_elements)
{ {
} }
@ -71,8 +70,8 @@ void IFileCache::assertInitialized() const
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized"); throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
} }
LRUFileCache::LRUFileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_, size_t max_file_segment_size_) LRUFileCache::LRUFileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_)
: IFileCache(cache_base_path_, max_size_, max_element_size_, max_file_segment_size_) : IFileCache(cache_base_path_, cache_settings_)
, log(&Poco::Logger::get("LRUFileCache")) , log(&Poco::Logger::get("LRUFileCache"))
{ {
} }
@ -364,7 +363,7 @@ bool LRUFileCache::tryReserve(
auto is_overflow = [&] auto is_overflow = [&]
{ {
return (current_size + size - removed_size > max_size) return (max_size != 0 && current_size + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size); || (max_element_size != 0 && queue_size > max_element_size);
}; };

View File

@ -32,9 +32,7 @@ public:
IFileCache( IFileCache(
const String & cache_base_path_, const String & cache_base_path_,
size_t max_size_, const FileCacheSettings & cache_settings_);
size_t max_element_size_,
size_t max_file_segment_size_);
virtual ~IFileCache() = default; virtual ~IFileCache() = default;
@ -111,9 +109,7 @@ class LRUFileCache final : public IFileCache
public: public:
LRUFileCache( LRUFileCache(
const String & cache_base_path_, const String & cache_base_path_,
size_t max_size_, const FileCacheSettings & cache_settings_);
size_t max_element_size_ = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS,
size_t max_file_segment_size_ = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override; FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;

View File

@ -24,18 +24,14 @@ FileCachePtr FileCacheFactory::getImpl(const std::string & cache_base_path, std:
} }
FileCachePtr FileCacheFactory::getOrCreate( FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_base_path, size_t max_size, size_t max_elements_size, size_t max_file_segment_size) const std::string & cache_base_path, const FileCacheSettings & file_cache_settings)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto cache = getImpl(cache_base_path, lock); auto cache = getImpl(cache_base_path, lock);
if (cache) if (cache)
{
if (cache->capacity() != max_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cache with path `{}` already exists, but has different max size", cache_base_path);
return cache; return cache;
}
cache = std::make_shared<LRUFileCache>(cache_base_path, max_size, max_elements_size, max_file_segment_size); cache = std::make_shared<LRUFileCache>(cache_base_path, file_cache_settings);
caches.emplace(cache_base_path, cache); caches.emplace(cache_base_path, cache);
return cache; return cache;
} }

View File

@ -17,7 +17,7 @@ class FileCacheFactory final : private boost::noncopyable
public: public:
static FileCacheFactory & instance(); static FileCacheFactory & instance();
FileCachePtr getOrCreate(const std::string & cache_base_path, size_t max_size, size_t max_elements_size, size_t max_file_segment_size); FileCachePtr getOrCreate(const std::string & cache_base_path, const FileCacheSettings & file_cache_settings);
private: private:
FileCachePtr getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &); FileCachePtr getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &);

View File

@ -0,0 +1,16 @@
#include "FileCacheSettings.h"
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
max_cache_size = config.getUInt64(config_prefix + ".data_cache_max_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE);
max_cache_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
cache_on_insert = config.getUInt64(config_prefix + ".cache_on_insert", false);
}
}

View File

@ -0,0 +1,18 @@
#include <Common/FileCache_fwd.h>
namespace Poco { namespace Util { class AbstractConfiguration; }}
namespace DB
{
struct FileCacheSettings
{
size_t max_cache_size = 0;
size_t max_cache_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS;
size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
bool cache_on_insert = false;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
};
}

View File

@ -4,10 +4,13 @@
namespace DB namespace DB
{ {
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE = 1024 * 1024 * 1024;
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024;
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024;
class IFileCache; class IFileCache;
using FileCachePtr = std::shared_ptr<IFileCache>; using FileCachePtr = std::shared_ptr<IFileCache>;
struct FileCacheSettings;
} }

View File

@ -2,6 +2,7 @@
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <Common/FileCacheFactory.h> #include <Common/FileCacheFactory.h>
#include <Common/FileCache.h> #include <Common/FileCache.h>
#include <Common/FileCacheSettings.h>
namespace DB namespace DB
{ {
@ -64,18 +65,23 @@ FileCachePtr getCachePtrForDisk(
if (!fs::exists(cache_base_path)) if (!fs::exists(cache_base_path))
fs::create_directories(cache_base_path); fs::create_directories(cache_base_path);
LOG_INFO(&Poco::Logger::get("Disk(" + name + ")"), "Disk registered with cache path: {}", cache_base_path);
auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context); auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context);
if (metadata_path == cache_base_path) if (metadata_path == cache_base_path)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata path and cache base path must be different: {}", metadata_path); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata path and cache base path must be different: {}", metadata_path);
size_t max_cache_size = config.getUInt64(config_prefix + ".data_cache_max_size", 1024*1024*1024); FileCacheSettings file_cache_settings;
size_t max_cache_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS); file_cache_settings.loadFromConfig(config, config_prefix);
size_t max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
auto cache = FileCacheFactory::instance().getOrCreate(cache_base_path, max_cache_size, max_cache_elements, max_file_segment_size); auto cache = FileCacheFactory::instance().getOrCreate(cache_base_path, file_cache_settings);
cache->initialize(); cache->initialize();
auto * log = &Poco::Logger::get("Disk(" + name + ")");
LOG_INFO(log, "Disk registered with cache path: {}. Cache size: {}, max cache elements size: {}, max_file_segment_size: {}",
cache_base_path,
max_cache_size ? toString(max_cache_size) : "UNLIMITED",
max_cache_elements ? toString(max_cache_elements) : "UNLIMITED",
max_file_segment_size);
return cache; return cache;
} }

View File

@ -2,17 +2,19 @@
#if USE_AWS_S3 #if USE_AWS_S3
# include <IO/WriteBufferFromS3.h> #include <base/logger_useful.h>
# include <IO/WriteHelpers.h> #include <Common/FileCache.h>
# include <aws/s3/S3Client.h> #include <IO/WriteBufferFromS3.h>
# include <aws/s3/model/CreateMultipartUploadRequest.h> #include <IO/WriteHelpers.h>
# include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <aws/s3/model/PutObjectRequest.h>
# include <aws/s3/model/UploadPartRequest.h>
# include <base/logger_useful.h>
# include <utility> #include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <utility>
namespace ProfileEvents namespace ProfileEvents
@ -59,7 +61,8 @@ WriteBufferFromS3::WriteBufferFromS3(
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_, std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_, size_t buffer_size_,
ScheduleFunc schedule_) ScheduleFunc schedule_,
FileCachePtr cache_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0) : BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_) , bucket(bucket_)
, key(key_) , key(key_)
@ -70,6 +73,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, upload_part_size_multiply_threshold(upload_part_size_multiply_threshold_) , upload_part_size_multiply_threshold(upload_part_size_multiply_threshold_)
, max_single_part_upload_size(max_single_part_upload_size_) , max_single_part_upload_size(max_single_part_upload_size_)
, schedule(std::move(schedule_)) , schedule(std::move(schedule_))
, cache(cache_)
{ {
allocateBuffer(); allocateBuffer();
} }
@ -95,7 +99,6 @@ void WriteBufferFromS3::nextImpl()
if (!multipart_upload_id.empty() && last_part_size > upload_part_size) if (!multipart_upload_id.empty() && last_part_size > upload_part_size)
{ {
writePart(); writePart();
allocateBuffer(); allocateBuffer();
@ -126,6 +129,21 @@ WriteBufferFromS3::~WriteBufferFromS3()
} }
} }
void WriteBufferFromS3::tryWriteToCacheIfNeeded()
{
if (!cache || IFileCache::shouldBypassCache())
return;
try
{
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
void WriteBufferFromS3::preFinalize() void WriteBufferFromS3::preFinalize()
{ {
next(); next();

View File

@ -4,16 +4,19 @@
#if USE_AWS_S3 #if USE_AWS_S3
# include <memory> #include <memory>
# include <vector> #include <vector>
# include <list> #include <list>
# include <base/logger_useful.h> #include <base/logger_useful.h>
# include <base/types.h> #include <base/types.h>
# include <IO/BufferWithOwnMemory.h> #include <Common/ThreadPool.h>
# include <IO/WriteBuffer.h> #include <Common/FileCache_fwd.h>
# include <aws/core/utils/memory/stl/AWSStringStream.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
namespace Aws::S3 namespace Aws::S3
{ {
@ -51,7 +54,8 @@ public:
size_t max_single_part_upload_size_, size_t max_single_part_upload_size_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt, std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ScheduleFunc schedule_ = {}); ScheduleFunc schedule_ = {},
FileCachePtr cache_ = nullptr);
~WriteBufferFromS3() override; ~WriteBufferFromS3() override;
@ -82,6 +86,8 @@ private:
void waitForReadyBackGroundTasks(); void waitForReadyBackGroundTasks();
void waitForAllBackGroundTasks(); void waitForAllBackGroundTasks();
void tryWriteToCacheIfNeeded();
String bucket; String bucket;
String key; String key;
std::optional<std::map<String, String>> object_metadata; std::optional<std::map<String, String>> object_metadata;
@ -113,6 +119,8 @@ private:
std::condition_variable bg_tasks_condvar; std::condition_variable bg_tasks_condvar;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
FileCachePtr cache;
}; };
} }

View File

@ -122,7 +122,8 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
bool sync, bool sync,
const NamesAndTypesList * total_columns_list, const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums) MergeTreeData::DataPart::Checksums * additional_column_checksums,
const WriteSettings & write_settings)
{ {
/// Finish write and get checksums. /// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums; MergeTreeData::DataPart::Checksums checksums;
@ -156,7 +157,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync); auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync);
if (new_part->isStoredOnDisk()) if (new_part->isStoredOnDisk())
finalizer->written_files = finalizePartOnDisk(new_part, checksums); finalizer->written_files = finalizePartOnDisk(new_part, checksums, write_settings);
new_part->rows_count = rows_count; new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr); new_part->modification_time = time(nullptr);
@ -174,7 +175,8 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk( MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk(
const MergeTreeData::DataPartPtr & new_part, const MergeTreeData::DataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums) MergeTreeData::DataPart::Checksums & checksums,
const WriteSettings & write_settings)
{ {
WrittenFiles written_files; WrittenFiles written_files;
if (new_part->isProjectionPart()) if (new_part->isProjectionPart())

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/IMergedBlockOutputStream.h> #include <Storages/MergeTree/IMergedBlockOutputStream.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <IO/WriteSettings.h>
namespace DB namespace DB
@ -54,7 +55,8 @@ public:
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
bool sync, bool sync,
const NamesAndTypesList * total_columns_list = nullptr, const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr,
const WriteSettings & write_settings = {});
void finalizePart( void finalizePart(
MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::MutableDataPartPtr & new_part,
@ -71,7 +73,8 @@ private:
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>; using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
WrittenFiles finalizePartOnDisk( WrittenFiles finalizePartOnDisk(
const MergeTreeData::DataPartPtr & new_part, const MergeTreeData::DataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums); MergeTreeData::DataPart::Checksums & checksums,
const WriteSettings & write_settings);
NamesAndTypesList columns_list; NamesAndTypesList columns_list;
IMergeTreeDataPart::MinMaxIndex minmax_idx; IMergeTreeDataPart::MinMaxIndex minmax_idx;