This commit is contained in:
kssenii 2023-12-08 15:21:42 +01:00
parent ff65d0e727
commit ea63819c76
12 changed files with 151 additions and 72 deletions

View File

@ -1173,7 +1173,7 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos(const Key & key)
return file_segments;
}
IFileCachePriority::QueueEntriesDumps FileCache::dumpQueue()
std::vector<FileSegment::Info> FileCache::dumpQueue()
{
assertInitialized();
return main_priority->dump(*this, lockCache());

View File

@ -126,7 +126,7 @@ public:
std::vector<FileSegment::Info> getFileSegmentInfos(const Key & key);
IFileCachePriority::QueueEntriesDumps dumpQueue();
std::vector<FileSegment::Info> dumpQueue();
void deactivateBackgroundOperations();

View File

@ -849,6 +849,7 @@ FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment, File
.cache_hits = file_segment->hits_count,
.references = static_cast<uint64_t>(file_segment.use_count()),
.is_unbound = file_segment->is_unbound,
.queue_entry_type = file_segment->queue_iterator ? file_segment->queue_iterator->getType() : QueueEntryType::None,
};
}
@ -914,10 +915,6 @@ void FileSegment::increasePriority()
return;
}
/// Priority can be increased only for downloaded file segments.
if (download_state != State::DOWNLOADED)
return;
auto it = getQueueIterator();
if (it)
{

View File

@ -54,6 +54,7 @@ public:
using Priority = IFileCachePriority;
using State = FileSegmentState;
using Info = FileSegmentInfo;
using QueueEntryType = FileCacheQueueEntryType;
FileSegment(
const Key & key_,

View File

@ -53,6 +53,14 @@ namespace DB
Temporary,
};
enum class FileCacheQueueEntryType
{
None,
LRU,
SLRU_Protected,
SLRU_Probationary,
};
std::string toString(FileSegmentKind kind);
struct FileSegmentInfo
@ -69,5 +77,6 @@ namespace DB
uint64_t cache_hits;
uint64_t references;
bool is_unbound;
FileCacheQueueEntryType queue_entry_type;
};
}

View File

@ -17,6 +17,7 @@ class IFileCachePriority : private boost::noncopyable
{
public:
using Key = FileCacheKey;
using QueueEntryType = FileCacheQueueEntryType;
struct Entry
{
@ -45,6 +46,8 @@ public:
virtual void remove(const CacheGuard::Lock &) = 0;
virtual void invalidate() = 0;
virtual QueueEntryType getType() const = 0;
};
using IteratorPtr = std::shared_ptr<Iterator>;
@ -72,13 +75,7 @@ public:
virtual void shuffle(const CacheGuard::Lock &) = 0;
struct QueueEntryDump
{
FileSegmentInfo info;
bool is_protected = false;
};
using QueueEntriesDumps = std::vector<QueueEntryDump>;
virtual QueueEntriesDumps dump(FileCache & cache, const CacheGuard::Lock &) = 0;
virtual std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) = 0;
using FinalizeEvictionFunc = std::function<void(const CacheGuard::Lock & lk)>;
virtual bool collectCandidatesForEviction(

View File

@ -277,9 +277,9 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
return LRUIterator(this, it.iterator);
}
IFileCachePriority::QueueEntriesDumps LRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
std::vector<FileSegmentInfo> LRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
{
QueueEntriesDumps res;
std::vector<FileSegmentInfo> res;
iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)
{
res.emplace_back(FileSegment::getInfo(segment_metadata->file_segment, cache));

View File

@ -44,7 +44,7 @@ public:
void shuffle(const CacheGuard::Lock &) override;
QueueEntriesDumps dump(FileCache & cache, const CacheGuard::Lock &) override;
std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) override;
void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); }
@ -99,6 +99,8 @@ public:
void updateSize(int64_t size) override;
QueueEntryType getType() const override { return QueueEntryType::LRU; }
private:
void assertValid() const;

View File

@ -222,16 +222,10 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
iterator.is_protected = true;
}
IFileCachePriority::QueueEntriesDumps SLRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
std::vector<FileSegmentInfo> SLRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
{
auto res = probationary_queue.dump(cache, lock);
for (auto & entry : res)
entry.is_protected = false;
auto part_res = protected_queue.dump(cache, lock);
for (auto & entry : part_res)
entry.is_protected = true;
res.insert(res.end(), part_res.begin(), part_res.end());
return res;
}

View File

@ -43,7 +43,7 @@ public:
void shuffle(const CacheGuard::Lock &) override;
QueueEntriesDumps dump(FileCache & cache, const CacheGuard::Lock &) override;
std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) override;
private:
LRUFileCachePriority protected_queue;
@ -72,7 +72,7 @@ public:
void updateSize(int64_t size) override;
bool isProtected() const { return is_protected; }
QueueEntryType getType() const override { return is_protected ? QueueEntryType::SLRU_Protected : QueueEntryType::SLRU_Probationary; }
private:
void assertValid() const;

View File

@ -1,6 +0,0 @@
CREATE DICTIONARY dict (`id` String, `timestamp` DateTime)
PRIMARY KEY id SOURCE(CLICKHOUSE(QUERY 'SELECT \'test\' as id, now() as timestamp')) LAYOUT(DIRECT());
CREATE table t (id LowCardinality(String)) engine = MergeTree() ORDER BY id;
CREATE VIEW v AS select dictGet(dict, 'timestamp', id) from t;

View File

@ -84,11 +84,12 @@ using HolderPtr = FileSegmentsHolderPtr;
fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
std::string cache_base_path2 = caches_dir / "cache2" / "";
void assertEqual(const FileSegmentsHolderPtr & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "File segments: ";
std::cerr << "\nFile segments: ";
for (const auto & file_segment : *file_segments)
std::cerr << file_segment->range().toString() << ", ";
@ -116,9 +117,12 @@ void assertEqual(const FileSegmentsHolderPtr & file_segments, const Ranges & exp
void assertEqual(const std::vector<FileSegment::Info> & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "File segments: ";
std::cerr << "\nFile segments: ";
for (const auto & file_segment : file_segments)
std::cerr << FileSegment::Range(file_segment.range_left, file_segment.range_right).toString() << ", ";
std::cerr << "\nExpected: ";
for (const auto & r : expected_ranges)
std::cerr << r.toString() << ", ";
ASSERT_EQ(file_segments.size(), expected_ranges.size());
@ -142,51 +146,29 @@ void assertEqual(const std::vector<FileSegment::Info> & file_segments, const Ran
}
}
void assertEqual(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
void assertProtectedOrProbationary(const std::vector<FileSegmentInfo> & file_segments, const Ranges & expected, bool assert_protected)
{
std::cerr << "File segments: ";
for (const auto & f : file_segments)
{
auto range = FileSegment::Range(f.info.range_left, f.info.range_right);
std::cerr << range.toString() << ", ";
}
ASSERT_EQ(file_segments.size(), expected_ranges.size());
if (!expected_states.empty())
ASSERT_EQ(file_segments.size(), expected_states.size());
auto get_expected_state = [&](size_t i)
{
if (expected_states.empty())
return State::DOWNLOADED;
else
return expected_states[i];
};
size_t i = 0;
for (const auto & f : file_segments)
{
auto range = FileSegment::Range(f.info.range_left, f.info.range_right);
ASSERT_EQ(range, expected_ranges[i]);
ASSERT_EQ(f.info.state, get_expected_state(i));
++i;
}
}
void assertProtectedOrProbationary(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected, bool assert_protected)
{
std::cerr << "File segments: ";
std::cerr << "\nFile segments: ";
std::vector<Range> res;
for (const auto & f : file_segments)
{
auto range = FileSegment::Range(f.info.range_left, f.info.range_right);
std::cerr << range.toString() << ", ";
if ((f.is_protected && assert_protected) || (!f.is_protected && !assert_protected))
auto range = FileSegment::Range(f.range_left, f.range_right);
bool is_protected = (f.queue_entry_type == FileCacheQueueEntryType::SLRU_Protected);
bool is_probationary = (f.queue_entry_type == FileCacheQueueEntryType::SLRU_Probationary);
ASSERT_TRUE(is_probationary || is_protected);
std::cerr << fmt::format("{} (protected: {})", range.toString(), is_protected) << ", ";
if ((is_protected && assert_protected) || (!is_protected && !assert_protected))
{
res.push_back(range);
}
}
std::cerr << "\nExpected: ";
for (const auto & range : expected)
{
std::cerr << range.toString() << ", ";
}
ASSERT_EQ(res.size(), expected.size());
for (size_t i = 0; i < res.size(); ++i)
@ -195,13 +177,15 @@ void assertProtectedOrProbationary(const IFileCachePriority::QueueEntriesDumps &
}
}
void assertProtected(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected)
void assertProtected(const std::vector<FileSegmentInfo> & file_segments, const Ranges & expected)
{
std::cerr << "\nAssert protected";
assertProtectedOrProbationary(file_segments, expected, true);
}
void assertProbationary(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected)
void assertProbationary(const std::vector<FileSegmentInfo> & file_segments, const Ranges & expected)
{
std::cerr << "\nAssert probationary";
assertProtectedOrProbationary(file_segments, expected, false);
}
@ -251,6 +235,13 @@ void increasePriority(const HolderPtr & holder)
it->increasePriority();
}
void increasePriority(const HolderPtr & holder, size_t pos)
{
FileSegments::iterator it = holder->begin();
std::advance(it, pos);
(*it)->increasePriority();
}
class FileCacheTest : public ::testing::Test
{
public:
@ -285,7 +276,10 @@ public:
if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
if (fs::exists(cache_base_path2))
fs::remove_all(cache_base_path2);
fs::create_directories(cache_base_path);
fs::create_directories(cache_base_path2);
}
void TearDown() override
@ -1232,4 +1226,95 @@ TEST_F(FileCacheTest, SLRUPolicy)
ASSERT_EQ(cache.getFileSegmentsNum(), 5);
ASSERT_EQ(cache.getUsedCacheSize(), 22);
}
{
ReadSettings read_settings;
read_settings.enable_filesystem_cache = true;
read_settings.local_fs_method = LocalFSReadMethod::pread;
auto write_file = [](const std::string & filename, const std::string & s)
{
std::string file_path = fs::current_path() / filename;
auto wb = std::make_unique<WriteBufferFromFile>(file_path, DBMS_DEFAULT_BUFFER_SIZE);
wb->write(s.data(), s.size());
wb->next();
wb->finalize();
return file_path;
};
DB::FileCacheSettings settings2;
settings2.base_path = cache_base_path2;
settings2.max_file_segment_size = 5;
settings2.max_size = 30;
settings2.max_elements = 6;
settings2.boundary_alignment = 1;
settings2.cache_policy = "SLRU";
settings2.slru_size_ratio = 0.5;
auto cache = std::make_shared<DB::FileCache>("slru_2", settings2);
cache->initialize();
auto read_and_check = [&](const std::string & file, const FileCacheKey & key, const std::string & expect_result)
{
auto read_buffer_creator = [&]()
{
return createReadBufferFromFileBase(file, read_settings, std::nullopt, std::nullopt);
};
auto cached_buffer = std::make_shared<CachedOnDiskReadBufferFromFile>(
file, key, cache, read_buffer_creator, read_settings, "test", expect_result.size(), false, false, std::nullopt, nullptr);
WriteBufferFromOwnString result;
copyData(*cached_buffer, result);
ASSERT_EQ(result.str(), expect_result);
};
std::string data1(15, '*');
auto file1 = write_file("test1", data1);
auto key1 = cache->createKeyForPath(file1);
read_and_check(file1, key1, data1);
assertEqual(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
assertProbationary(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
assertProtected(cache->dumpQueue(), Ranges{});
read_and_check(file1, key1, data1);
assertEqual(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
assertProbationary(cache->dumpQueue(), Ranges{});
assertProtected(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
std::string data2(10, '*');
auto file2 = write_file("test2", data2);
auto key2 = cache->createKeyForPath(file2);
read_and_check(file2, key2, data2);
auto dump = cache->dumpQueue();
assertEqual(dump, { Range(0, 4), Range(5, 9), Range(0, 4), Range(5, 9), Range(10, 14) });
ASSERT_EQ(dump[0].key, key2);
ASSERT_EQ(dump[1].key, key2);
ASSERT_EQ(dump[2].key, key1);
ASSERT_EQ(dump[3].key, key1);
ASSERT_EQ(dump[4].key, key1);
assertProbationary(cache->dumpQueue(), { Range(0, 4), Range(5, 9) });
assertProtected(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
read_and_check(file2, key2, data2);
dump = cache->dumpQueue();
assertEqual(dump, { Range(0, 4), Range(5, 9), Range(10, 14), Range(0, 4), Range(5, 9) });
ASSERT_EQ(dump[0].key, key1);
ASSERT_EQ(dump[1].key, key1);
ASSERT_EQ(dump[2].key, key1);
ASSERT_EQ(dump[3].key, key2);
ASSERT_EQ(dump[4].key, key2);
assertProbationary(cache->dumpQueue(), { Range(0, 4), Range(5, 9) });
assertProtected(cache->dumpQueue(), { Range(10, 14), Range(0, 4), Range(5, 9) });
}
}