This commit is contained in:
KinderRiven 2022-06-29 17:44:38 +08:00 committed by KinderRiven
parent 164fa1ab0e
commit 1b01cc8ed9
4 changed files with 67 additions and 47 deletions

View File

@ -2,7 +2,6 @@
#include <Common/randomSeed.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <Common/FileCacheSettings.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -40,11 +39,6 @@ FileCache::FileCache(
{
}
String FileCache::Key::toString() const
{
return getHexUIntLowercase(key);
}
FileCache::Key FileCache::hash(const String & path)
{
return Key(sipHash128(path.data(), path.size()));
@ -323,8 +317,11 @@ FileSegments FileCache::getImpl(
if (range.left <= prev_cell_range.right)
{
/// segment{k-1} segment{k}
/// [________] [_____
/// [___________
/// ^
/// range.left
useCell(prev_cell, result, cache_lock);
}
}
@ -562,7 +559,7 @@ FileCache::FileSegmentCell * FileCache::addCell(
if (stash_priority->getElementsNum(cache_lock) > max_stash_element_size)
{
auto remove_priority_iter = stash_priority->getNewIterator(cache_lock)->getWriteIterator();
auto remove_priority_iter = stash_priority->getLowestPriorityWriteIterator(cache_lock);
stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()});
remove_priority_iter->remove(cache_lock);
}
@ -648,7 +645,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
auto * cell_for_reserve = getCell(key, offset, cache_lock);
std::vector<IFileCachePriority::WriteIterator> ghost;
std::vector<std::tuple<Key, size_t, size_t>> ghost;
std::vector<FileSegmentCell *> trash;
std::vector<FileSegmentCell *> to_evict;
@ -660,7 +657,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
};
/// Select the cache from the LRU queue held by query for expulsion.
for (auto iter = query_context->getPriority()->getNewIterator(cache_lock); iter->valid(); iter->next())
for (auto iter = query_context->getPriority()->getLowestPriorityWriteIterator(cache_lock); iter->valid();)
{
if (!is_overflow())
break;
@ -671,8 +668,10 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
{
/// The cache corresponding to this record may be swapped out by
/// other queries, so it has become invalid.
ghost.push_back(iter->getWriteIterator());
removed_size += iter->size();
ghost.push_back({iter->key(), iter->offset(), iter->size()});
/// next()
iter->remove(cache_lock);
}
else
{
@ -700,6 +699,8 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
removed_size += cell_size;
--queue_size;
}
iter->next();
}
}
@ -718,8 +719,8 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
remove_file_segment(file_segment, cell->size());
}
for (auto & iter : ghost)
query_context->remove(iter->key(), iter->offset(), iter->size(), cache_lock);
for (auto & entry : ghost)
query_context->remove(std::get<0>(entry), std::get<1>(entry), std::get<2>(entry), cache_lock);
if (is_overflow())
return false;
@ -770,7 +771,7 @@ bool FileCache::tryReserveForMainList(
std::vector<FileSegmentCell *> to_evict;
std::vector<FileSegmentCell *> trash;
for (auto it = main_priority->getNewIterator(cache_lock); it->valid(); it->next())
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
{
auto entry_key = it->key();
auto entry_offset = it->offset();
@ -926,9 +927,9 @@ void FileCache::removeIfReleasable(bool remove_persistent_files)
std::lock_guard cache_lock(mutex);
std::vector<FileSegmentPtr> to_remove;
for (auto it = main_priority->getNewIterator(cache_lock); it->valid(); it->next())
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
{
auto key = it->key();
const auto & key = it->key();
auto offset = it->offset();
auto * cell = getCell(key, offset, cache_lock);
@ -1247,7 +1248,7 @@ String FileCache::dumpStructure(const Key & key)
return dumpStructureUnlocked(key, cache_lock);
}
String FileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mutex> & cache_lock)
String FileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mutex> &)
{
WriteBufferFromOwnString result;
const auto & cells_by_offset = files[key];
@ -1255,7 +1256,6 @@ String FileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mu
for (const auto & [offset, cell] : cells_by_offset)
result << cell.file_segment->getInfoForLog() << "\n";
result << "\n\nPriority: " << main_priority->toString(cache_lock);
return result.str();
}
@ -1291,9 +1291,9 @@ void FileCache::assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock)
void FileCache::assertPriorityCorrectness(std::lock_guard<std::mutex> & cache_lock)
{
[[maybe_unused]] size_t total_size = 0;
for (auto it = main_priority->getNewIterator(cache_lock); it->valid(); it->next())
for (auto it = main_priority->getLowestPriorityReadIterator(cache_lock); it->valid(); it->next())
{
auto key = it->key();
const auto & key = it->key();
auto offset = it->offset();
auto size = it->size();

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/Types.h>
#include <Common/hex.h>
namespace DB
{
@ -7,9 +8,11 @@ namespace DB
struct FileCacheKey
{
UInt128 key;
String toString() const;
String toString() const { return getHexUIntLowercase(key); }
FileCacheKey() = default;
explicit FileCacheKey(const UInt128 & key_) : key(key_) { }
bool operator==(const FileCacheKey & other) const { return key == other.key; }

View File

@ -21,13 +21,13 @@ class IFileCachePriority
public:
class IIterator;
friend class IIterator;
friend class FileCache;
using Key = FileCacheKey;
using ReadIterator = std::shared_ptr<const IIterator>;
using WriteIterator = std::shared_ptr<IIterator>;
friend class FileCache;
using Key = FileCacheKey;
struct FileCacheRecord
{
Key key;
@ -47,7 +47,7 @@ public:
public:
virtual ~IIterator() = default;
virtual Key key() const = 0;
virtual const Key & key() const = 0;
virtual size_t offset() const = 0;
@ -64,17 +64,11 @@ public:
/// of the cache record according to different cache algorithms.
virtual void use(std::lock_guard<std::mutex> &) = 0;
/// Deletes an existing cached record.
/// Deletes an existing cached record. And to avoid pointer suspension
/// the iterator should automatically point to the next record.
virtual void remove(std::lock_guard<std::mutex> &) = 0;
/// Get an iterator to handle write operations. Write iterators should only
/// be allowed to call remove, use and incrementSize methods.
virtual WriteIterator getWriteIterator() const = 0;
virtual void incrementSize(size_t, std::lock_guard<std::mutex> &) = 0;
/// Repoint the iterator to the record with the lowest priority.
virtual void seekToLowestPriority() const = 0;
};
public:
@ -84,6 +78,7 @@ public:
/// logical exception if the cache block already exists.
virtual WriteIterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock) = 0;
/// This method is used for assertions in debug mode. So we do not care about complexity here.
/// Query whether a cache record exists. If it exists, return true. If not, return false.
virtual bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) = 0;
@ -91,14 +86,15 @@ public:
/// Returns an iterator pointing to the lowest priority cached record.
/// We can traverse all cached records through the iterator's next().
virtual ReadIterator getNewIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
virtual ReadIterator getLowestPriorityReadIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
/// The same as getLowestPriorityReadIterator(), but it is writeable.
virtual WriteIterator getLowestPriorityWriteIterator(std::lock_guard<std::mutex> & cache_lock) = 0;
virtual size_t getElementsNum(std::lock_guard<std::mutex> & cache_lock) const = 0;
size_t getCacheSize(std::lock_guard<std::mutex> &) const { return cache_size; }
virtual std::string toString(std::lock_guard<std::mutex> & cache_lock) const = 0;
protected:
size_t max_cache_size = 0;
size_t cache_size = 0;

View File

@ -5,11 +5,16 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Based on the LRU algorithm implementation, the record with the lowest priority is stored at
/// the head of the queue, and the record with the highest priority is stored at the tail.
class LRUFileCachePriority : public IFileCachePriority
{
public:
private:
using LRUQueue = std::list<FileCacheRecord>;
using LRUQueueIterator = typename LRUQueue::iterator;
@ -23,9 +28,9 @@ public:
void next() const override { queue_iter++; }
bool valid() const override { return (file_cache->queue.size() && (queue_iter != file_cache->queue.end())); }
bool valid() const override { return queue_iter != file_cache->queue.end(); }
Key key() const override { return queue_iter->key; }
const Key & key() const override { return queue_iter->key; }
size_t offset() const override { return queue_iter->offset; }
@ -33,14 +38,12 @@ public:
size_t hits() const override { return queue_iter->hits; }
WriteIterator getWriteIterator() const override { return std::make_shared<LRUFileCacheIterator>(file_cache, queue_iter); }
void seekToLowestPriority() const override { queue_iter = file_cache->queue.begin(); }
void remove(std::lock_guard<std::mutex> &) override
{
file_cache->cache_size -= queue_iter->size;
file_cache->queue.erase(queue_iter);
auto remove_iter = queue_iter;
queue_iter++;
file_cache->cache_size -= remove_iter->size;
file_cache->queue.erase(remove_iter);
}
void incrementSize(size_t size_increment, std::lock_guard<std::mutex> &) override
@ -65,6 +68,18 @@ public:
WriteIterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> &) override
{
#ifndef NDEBUG
for (const auto & entry : queue)
{
if (entry.key() == key && entry.offset() == offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
entry.key().toString(),
entry.offset(),
entry.size());
}
#endif
auto iter = queue.insert(queue.end(), FileCacheRecord(key, offset, size));
cache_size += size;
return std::make_shared<LRUFileCacheIterator>(this, iter);
@ -86,14 +101,20 @@ public:
cache_size = 0;
}
ReadIterator getNewIterator(std::lock_guard<std::mutex> &) override
ReadIterator getLowestPriorityReadIterator(std::lock_guard<std::mutex> &) override
{
return std::make_shared<const LRUFileCacheIterator>(this, queue.begin());
}
size_t getElementsNum(std::lock_guard<std::mutex> &) const override { return queue.size(); }
WriteIterator getLowestPriorityWriteIterator(std::lock_guard<std::mutex> &) override
{
return std::make_shared<LRUFileCacheIterator>(this, queue.begin());
}
std::string toString(std::lock_guard<std::mutex> &) const override { return {}; }
size_t getElementsNum(std::lock_guard<std::mutex> &) const override
{
return queue.size();
}
private:
LRUQueue queue;