ClickHouse/src/Common/FileCache.h
KinderRiven 4e78bbd414 fix
2022-06-06 03:21:56 +08:00

419 lines
14 KiB
C++

#pragma once
#include <atomic>
#include <chrono>
#include <list>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <boost/functional/hash.hpp>
#include <boost/noncopyable.hpp>
#include <map>
#include "FileCache_fwd.h"
#include <IO/ReadSettings.h>
#include <Common/logger_useful.h>
#include <Common/FileSegment.h>
#include <Core/Types.h>
namespace DB
{
/**
* Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
*/
class IFileCache : private boost::noncopyable
{
friend class FileSegment;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
using Downloader = std::unique_ptr<SeekableReadBuffer>;
IFileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_);
virtual ~IFileCache() = default;
/// Restore cache from local filesystem.
virtual void initialize() = 0;
virtual void remove(const Key & key) = 0;
virtual void remove() = 0;
static bool isReadOnly();
/// Cache capacity in bytes.
size_t capacity() const { return max_size; }
static Key hash(const String & path);
String getPathInLocalCache(const Key & key, size_t offset);
String getPathInLocalCache(const Key & key);
const String & getBasePath() const { return cache_base_path; }
virtual std::vector<String> tryGetCachePaths(const Key & key) = 0;
/**
* Given an `offset` and `size` representing [offset, offset + size) bytes interval,
* return list of cached non-overlapping non-empty
* file segments `[segment1, ..., segmentN]` which intersect with given interval.
*
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* As long as pointers to returned file segments are hold
* it is guaranteed that these file segments are not removed from cache.
*/
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0;
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* If file segment has state EMPTY, then it is also marked as "detached". E.g. it is "detached"
* from cache (not owned by cache), and as a result will never change it's state and will be destructed
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
* it's state (and become DOWNLOADED).
*/
virtual FileSegmentsHolder get(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegments getSnapshot() const = 0;
/// For debug.
virtual String dumpStructure(const Key & key) = 0;
virtual size_t getUsedCacheSize() const = 0;
virtual size_t getFileSegmentsNum() const = 0;
virtual void createOrSetQueryContext(const ReadSettings & settings) = 0;
virtual void tryReleaseQueryContext() = 0;
protected:
String cache_base_path;
size_t max_size;
size_t max_element_size;
size_t max_file_segment_size;
bool is_initialized = false;
mutable std::mutex mutex;
virtual bool tryReserve(
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock) = 0;
virtual void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
virtual bool isLastFileSegmentHolder(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
/// If file segment was partially downloaded and then space reservation fails (because of no
/// space left), then update corresponding cache cell metadata (file segment size).
virtual void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
void assertInitialized() const;
};
using FileCachePtr = std::shared_ptr<IFileCache>;
class LRUFileCache final : public IFileCache
{
public:
LRUFileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_);
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
FileSegmentsHolder get(const Key & key, size_t offset, size_t size) override;
FileSegments getSnapshot() const override;
void initialize() override;
void remove(const Key & key) override;
void remove() override;
std::vector<String> tryGetCachePaths(const Key & key) override;
size_t getUsedCacheSize() const override;
size_t getFileSegmentsNum() const override;
void createOrSetQueryContext(const ReadSettings & settings) override;
void tryReleaseQueryContext() override;
private:
class LRUQueue
{
public:
struct FileKeyAndOffset
{
Key key;
size_t offset;
size_t size;
size_t hits = 0;
FileKeyAndOffset(const Key & key_, size_t offset_, size_t size_) : key(key_), offset(offset_), size(size_) {}
};
using Iterator = typename std::list<FileKeyAndOffset>::iterator;
size_t getTotalWeight(std::lock_guard<std::mutex> & /* cache_lock */) const { return cache_size; }
size_t getElementsNum(std::lock_guard<std::mutex> & /* cache_lock */) const { return queue.size(); }
Iterator add(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock);
void remove(Iterator queue_it, std::lock_guard<std::mutex> & cache_lock);
void moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & cache_lock);
/// Space reservation for a file segment is incremental, so we need to be able to increment size of the queue entry.
void incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & cache_lock);
void assertCorrectness(LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
String toString(std::lock_guard<std::mutex> & cache_lock) const;
bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) const;
Iterator begin() { return queue.begin(); }
Iterator end() { return queue.end(); }
void removeAll(std::lock_guard<std::mutex> & cache_lock);
private:
std::list<FileKeyAndOffset> queue;
size_t cache_size = 0;
};
struct FileSegmentCell : private boost::noncopyable
{
FileSegmentPtr file_segment;
/// Iterator is put here on first reservation attempt, if successful.
std::optional<LRUQueue::Iterator> queue_iterator;
/// Pointer to file segment is always hold by the cache itself.
/// Apart from pointer in cache, it can be hold by cache users, when they call
/// getorSet(), but cache users always hold it via FileSegmentsHolder.
bool releasable() const { return file_segment.unique(); }
size_t size() const { return file_segment->reserved_size; }
FileSegmentCell(FileSegmentPtr file_segment_, LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment))
, queue_iterator(other.queue_iterator) {}
};
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;
using CachedFiles = std::unordered_map<Key, FileSegmentsByOffset>;
using AccessKeyAndOffset = std::pair<Key, size_t>;
struct KeyAndOffsetHash
{
std::size_t operator()(const AccessKeyAndOffset & key) const
{
return std::hash<UInt128>()(key.first) ^ std::hash<UInt64>()(key.second);
}
};
using AccessRecord = std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
CachedFiles files;
LRUQueue queue;
LRUQueue stash_queue;
AccessRecord records;
size_t max_stash_element_size;
size_t enable_cache_hits_threshold;
struct QueryContext
{
LRUQueue lru_queue;
AccessRecord records;
size_t cache_size = 0;
size_t max_cache_size;
size_t ref_count = 0;
bool skip_download_if_exceeds_query_cache;
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_)
: max_cache_size(max_cache_size_), skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) { }
void remove(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
{
lru_queue.remove(record->second, cache_lock);
records.erase({key, offset});
}
}
cache_size -= size;
}
void reserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record == records.end())
{
auto queue_iter = lru_queue.add(key, offset, 0, cache_lock);
record = records.insert({{key, offset}, queue_iter}).first;
}
record->second->size += size;
}
cache_size += size;
}
void use(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock)
{
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
lru_queue.moveToEnd(record->second, cache_lock);
}
}
size_t getRefCount() const { return ref_count; }
void incrementRefCount() { ref_count++; }
void decrementRefCount() { ref_count--; }
size_t getMaxCacheSize() { return max_cache_size; }
size_t getCacheSize() { return cache_size; }
LRUQueue & queue() { return lru_queue; }
bool isSkipDownloadIfExceed() { return skip_download_if_exceeds_query_cache; }
};
using QueryContextPtr = std::shared_ptr<QueryContext>;
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
enum class ReserveResult
{
FINISHED,
NO_ENOUGH_SPACE,
NO_NEED,
};
QueryContextMap query_map;
Poco::Logger * log;
FileSegments getImpl(
const Key & key, const FileSegment::Range & range,
std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell * getCell(
const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell * addCell(
const Key & key, size_t offset, size_t size,
FileSegment::State state, std::lock_guard<std::mutex> & cache_lock);
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock);
QueryContextPtr getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock) const;
QueryContextPtr getQueryContext(const String & query_id, std::lock_guard<std::mutex> & cache_lock) const;
bool tryReserve(
const Key & key, size_t offset, size_t size,
std::lock_guard<std::mutex> & cache_lock) override;
bool tryReserveForMainList(
const Key & key, size_t offset, size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
/// Limit the maximum cache size for current query.
LRUFileCache::ReserveResult tryReserveForQuery(
const Key & key, size_t offset, size_t size,
QueryContextPtr query_context,
std::lock_guard<std::mutex> & cache_lock);
void remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) override;
bool isLastFileSegmentHolder(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) override;
void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) override;
size_t getAvailableCacheSize() const;
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);
FileSegments splitRangeIntoCells(
const Key & key, size_t offset, size_t size, FileSegment::State state, std::lock_guard<std::mutex> & cache_lock);
String dumpStructureUnlocked(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock);
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
public:
String dumpStructure(const Key & key_) override;
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);
void assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock);
};
}