This commit is contained in:
kssenii 2022-03-21 14:56:38 +01:00
parent eabbce69a7
commit afd0c64a1a
16 changed files with 75 additions and 49 deletions

View File

@ -359,7 +359,6 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
keyToStr(key), offset);
auto file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock);
return FileSegmentsHolder(std::move(file_segments));
}
@ -381,6 +380,7 @@ bool LRUFileCache::tryReserve(
auto is_overflow = [&]
{
/// max_size == 0 means unlimited cache size, max_element_size means unlimited number of cache elements.
return (max_size != 0 && current_size + size - removed_size > max_size)
|| (max_element_size != 0 && queue_size > max_element_size);
};

View File

@ -21,20 +21,31 @@ FileCacheFactory::CacheByBasePath FileCacheFactory::getAll()
return caches;
}
FileCachePtr FileCacheFactory::getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &)
const FileCacheSettings & FileCacheFactory::getSettings(const std::string & cache_base_path)
{
std::lock_guard lock(mutex);
auto * cache_data = getImpl(cache_base_path, lock);
if (cache_data)
return cache_data->settings;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by path: {}", cache_base_path);
}
FileCacheFactory::CacheData * FileCacheFactory::getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &)
{
auto it = caches.find(cache_base_path);
if (it == caches.end())
return nullptr;
return it->second;
return &it->second;
}
FileCachePtr FileCacheFactory::get(const std::string & cache_base_path)
{
std::lock_guard lock(mutex);
auto cache = getImpl(cache_base_path, lock);
if (cache)
return cache;
auto * cache_data = getImpl(cache_base_path, lock);
if (cache_data)
return cache_data->cache;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by path: {}", cache_base_path);
}
@ -43,12 +54,12 @@ FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_base_path, const FileCacheSettings & file_cache_settings)
{
std::lock_guard lock(mutex);
auto cache = getImpl(cache_base_path, lock);
if (cache)
return cache;
auto * cache_data = getImpl(cache_base_path, lock);
if (cache_data)
return cache_data->cache;
cache = std::make_shared<LRUFileCache>(cache_base_path, file_cache_settings);
caches.emplace(cache_base_path, cache);
auto cache = std::make_shared<LRUFileCache>(cache_base_path, file_cache_settings);
caches.emplace(cache_base_path, CacheData(cache, file_cache_settings));
return cache;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/FileCache_fwd.h>
#include <Common/FileCacheSettings.h>
#include <boost/noncopyable.hpp>
#include <unordered_map>
@ -14,7 +15,15 @@ namespace DB
*/
class FileCacheFactory final : private boost::noncopyable
{
using CacheByBasePath = std::unordered_map<std::string, FileCachePtr>;
struct CacheData
{
FileCachePtr cache;
FileCacheSettings settings;
CacheData(FileCachePtr cache_, const FileCacheSettings & settings_) : cache(cache_), settings(settings_) {}
};
using CacheByBasePath = std::unordered_map<std::string, CacheData>;
public:
static FileCacheFactory & instance();
@ -25,8 +34,10 @@ public:
CacheByBasePath getAll();
const FileCacheSettings & getSettings(const std::string & cache_base_path);
private:
FileCachePtr getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &);
CacheData * getImpl(const std::string & cache_base_path, std::lock_guard<std::mutex> &);
std::mutex mutex;
CacheByBasePath caches;

View File

@ -1,3 +1,5 @@
#pragma once
#include <Common/FileCache_fwd.h>
namespace Poco { namespace Util { class AbstractConfiguration; }}

View File

@ -18,6 +18,7 @@
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
#include <Common/getRandomASCIIString.h>
#include <Common/FileCacheFactory.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromS3.h>
@ -290,6 +291,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
});
};
bool cache_on_insert = write_settings.remote_fs_cache_on_insert || FileCacheFactory::instance().getSettings(getCachePath()).cache_on_insert;
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,
bucket,
@ -299,7 +302,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size, std::move(schedule), write_settings.remote_fs_cache_on_insert ? cache : nullptr);
buf_size, std::move(schedule), cache_on_insert ? cache : nullptr);
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{

View File

@ -92,13 +92,6 @@ void WriteBufferFromS3::nextImpl()
if (cacheEnabled())
{
std::cerr << "\n\n\n\n\n\n\nCache is enabled!\n\n\n\n\n";
/// Use max_single_part_upload_size as file segment size. Space reservation is incremental,
/// so this size does not really mean anything apart from the final file segment size limit.
/// If single part is uploaded with the smaller size, just resize file segment.
// size_t max_file_segment_size = max_single_part_upload_size;
auto cache_key = cache->hash(key);
auto file_segments_holder = cache->setDownloading(cache_key, current_download_offset, size);
@ -117,7 +110,6 @@ void WriteBufferFromS3::nextImpl()
}
else
{
/// TODO: add try catch, add complete()
break;
}
}

View File

@ -303,8 +303,8 @@ BlockIO InterpreterSystemQuery::execute()
if (query.remote_filesystem_cache_path.empty())
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache] : caches)
cache->tryRemoveAll();
for (const auto & [_, cache_data] : caches)
cache_data.cache->tryRemoveAll();
}
else
{

View File

@ -47,15 +47,16 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
const std::string & marks_path_,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec_,
size_t max_compress_block_size_) :
size_t max_compress_block_size_,
const WriteSettings & query_write_settings) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite)),
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite, query_write_settings)),
plain_hashing(*plain_file),
compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_),
compressed(compressed_buf),
marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite)), marks(*marks_file)
marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite, query_write_settings)), marks(*marks_file)
{
}
@ -156,7 +157,7 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
{
if (metadata_snapshot->hasPrimaryKey())
{
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings.query_write_settings);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
}
@ -172,7 +173,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
data_part->volume->getDisk(),
part_path + stream_name, index_helper->getSerializedFileExtension(),
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size));
default_codec, settings.max_compress_block_size, settings.query_write_settings));
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
skip_index_accumulated_marks.push_back(0);
}

View File

@ -55,7 +55,8 @@ public:
const std::string & marks_path_,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec_,
size_t max_compress_block_size_);
size_t max_compress_block_size_,
const WriteSettings & query_write_settings);
String escaped_column_name;
std::string data_file_extension;

View File

@ -115,7 +115,8 @@ void MergeTreeDataPartWriterWide::addStreams(
part_path + stream_name, DATA_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
compression_codec,
settings.max_compress_block_size);
settings.max_compress_block_size,
settings.query_write_settings);
};
ISerialization::SubstreamPath path;

View File

@ -127,7 +127,6 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
MergeTreeData::DataPart::Checksums * additional_column_checksums,
const WriteSettings & write_settings)
{
std::cerr << "\n\n\n\nCACHE ON INSERT: " << write_settings.remote_fs_cache_on_insert << "\n\n\n";
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;

View File

@ -31,9 +31,11 @@ void StorageSystemRemoteFilesystemCache::fillData(MutableColumns & res_columns,
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [cache_base_path, cache] : caches)
for (const auto & [cache_base_path, cache_data] : caches)
{
auto & cache = cache_data.cache;
auto holder = cache->getAll();
for (const auto & file_segment : holder.file_segments)
{
res_columns[0]->insert(cache_base_path);

View File

@ -2,10 +2,10 @@
-- { echo }
DROP TABLE IF EXISTS test;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
DROP TABLE IF EXISTS test;
SELECT * FROM test FORMAT Null;
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
./disks/s3/data_cache/ (0,745) 746

View File

@ -2,10 +2,10 @@
-- { echo }
DROP TABLE IF EXISTS test;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
DROP TABLE IF EXISTS test;
SELECT * FROM test FORMAT Null;
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;

View File

@ -2,15 +2,17 @@
DROP TABLE IF EXISTS test;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
-- CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=1;
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
./disks/s3/data_cache/ (0,745) 746
SELECT count() FROM system.remote_filesystem_cache;
8
SELECT * FROM test FORMAT Null;
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
./disks/s3/data_cache/ (0,745) 746
./disks/s3/data_cache/ (0,745) 746
SELECT count() size FROM system.remote_filesystem_cache;
9
SYSTEM DROP REMOTE FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 100);
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
SELECT count() size FROM system.remote_filesystem_cache;
7
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0;
SELECT count() size FROM system.remote_filesystem_cache;
14

View File

@ -1,16 +1,17 @@
-- Tags: no-parallel, no-fasttest, long
-- Tags: no-parallel, no-fasttest
-- { echo }
DROP TABLE IF EXISTS test;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
-- CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=1;
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
SELECT count() FROM system.remote_filesystem_cache;
SELECT * FROM test FORMAT Null;
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
SELECT count() size FROM system.remote_filesystem_cache;
SYSTEM DROP REMOTE FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 100);
SELECT cache_base_path, file_segment_range, size FROM system.remote_filesystem_cache;
SELECT count() size FROM system.remote_filesystem_cache;
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used
SELECT count() size FROM system.remote_filesystem_cache;