Adjustments after merge with master

This commit is contained in:
kssenii 2023-12-07 19:49:30 +01:00
parent 725571461d
commit ff65d0e727
12 changed files with 152 additions and 97 deletions

View File

@ -1173,10 +1173,10 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos(const Key & key)
return file_segments; return file_segments;
} }
std::vector<FileSegment::Info> FileCache::dumpQueue() IFileCachePriority::QueueEntriesDumps FileCache::dumpQueue()
{ {
assertInitialized(); assertInitialized();
return main_priority->dump(lockCache()); return main_priority->dump(*this, lockCache());
} }
std::vector<String> FileCache::tryGetCachePaths(const Key & key) std::vector<String> FileCache::tryGetCachePaths(const Key & key)

View File

@ -126,7 +126,7 @@ public:
std::vector<FileSegment::Info> getFileSegmentInfos(const Key & key); std::vector<FileSegment::Info> getFileSegmentInfos(const Key & key);
std::vector<FileSegment::Info> dumpQueue(); IFileCachePriority::QueueEntriesDumps dumpQueue();
void deactivateBackgroundOperations(); void deactivateBackgroundOperations();

View File

@ -480,7 +480,7 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
bool is_file_segment_size_exceeded; bool is_file_segment_size_exceeded;
{ {
auto lock = segment_guard.lock(); auto lock = lockFileSegment();
assertNotDetachedUnlocked(lock); assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("reserve", lock); assertIsDownloaderUnlocked("reserve", lock);

View File

@ -11,8 +11,8 @@
#include <IO/OpenedFileCache.h> #include <IO/OpenedFileCache.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/FileSegmentInfo.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h> #include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <queue>
namespace Poco { class Logger; } namespace Poco { class Logger; }
@ -28,23 +28,6 @@ namespace DB
class ReadBufferFromFileBase; class ReadBufferFromFileBase;
struct FileCacheReserveStat; struct FileCacheReserveStat;
/*
* FileSegmentKind is used to specify the eviction policy for file segments.
*/
enum class FileSegmentKind
{
/* `Regular` file segment is still in cache after usage, and can be evicted
* (unless there're some holders).
*/
Regular,
/* `Temporary` file segment is removed right after releasing.
* Also corresponding files are removed during cache loading (if any).
*/
Temporary,
};
String toString(FileSegmentKind kind);
struct CreateFileSegmentSettings struct CreateFileSegmentSettings
{ {
@ -69,40 +52,8 @@ public:
using Downloader = std::string; using Downloader = std::string;
using DownloaderId = std::string; using DownloaderId = std::string;
using Priority = IFileCachePriority; using Priority = IFileCachePriority;
using State = FileSegmentState;
enum class State using Info = FileSegmentInfo;
{
DOWNLOADED,
/**
* When file segment is first created and returned to user, it has state EMPTY.
* EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully
* by any owner of EMPTY state file segment.
*/
EMPTY,
/**
* A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader
* because each cache user might acquire multiple file segments and read them one by one,
* so only user which actually needs to read this segment earlier than others - becomes a downloader.
*/
DOWNLOADING,
/**
* Space reservation for a file segment is incremental, i.e. downloader reads buffer_size bytes
* from remote fs -> tries to reserve buffer_size bytes to put them to cache -> writes to cache
* on successful reservation and stops cache write otherwise. Those, who waited for the same file
* segment, will read downloaded part from cache and remaining part directly from remote fs.
*/
PARTIALLY_DOWNLOADED_NO_CONTINUATION,
/**
* If downloader did not finish download of current file segment for any reason apart from running
* out of cache space, then download can be continued by other owners of this file segment.
*/
PARTIALLY_DOWNLOADED,
/**
* If file segment cannot possibly be downloaded (first space reservation attempt failed), mark
* this file segment as out of cache scope.
*/
DETACHED,
};
FileSegment( FileSegment(
const Key & key_, const Key & key_,
@ -205,22 +156,7 @@ public:
/// exception. /// exception.
void detach(const FileSegmentGuard::Lock &, const LockedKey &); void detach(const FileSegmentGuard::Lock &, const LockedKey &);
struct Info static FileSegmentInfo getInfo(const FileSegmentPtr & file_segment, FileCache & cache);
{
FileSegment::Key key;
size_t offset;
std::string path;
uint64_t range_left;
uint64_t range_right;
FileSegmentKind kind;
State state;
uint64_t size;
uint64_t downloaded_size;
uint64_t cache_hits;
uint64_t references;
bool is_unbound;
};
static Info getInfo(const FileSegmentPtr & file_segment, FileCache & cache);
bool isDetached() const; bool isDetached() const;

View File

@ -0,0 +1,73 @@
#pragma once
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Interpreters/Cache/FileCacheKey.h>
namespace DB
{
enum class FileSegmentState
{
DOWNLOADED,
/**
* When file segment is first created and returned to user, it has state EMPTY.
* EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully
* by any owner of EMPTY state file segment.
*/
EMPTY,
/**
* A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader
* because each cache user might acquire multiple file segments and read them one by one,
* so only user which actually needs to read this segment earlier than others - becomes a downloader.
*/
DOWNLOADING,
/**
* Space reservation for a file segment is incremental, i.e. downloader reads buffer_size bytes
* from remote fs -> tries to reserve buffer_size bytes to put them to cache -> writes to cache
* on successful reservation and stops cache write otherwise. Those, who waited for the same file
* segment, will read downloaded part from cache and remaining part directly from remote fs.
*/
PARTIALLY_DOWNLOADED_NO_CONTINUATION,
/**
* If downloader did not finish download of current file segment for any reason apart from running
* out of cache space, then download can be continued by other owners of this file segment.
*/
PARTIALLY_DOWNLOADED,
/**
* If file segment cannot possibly be downloaded (first space reservation attempt failed), mark
* this file segment as out of cache scope.
*/
DETACHED,
};
enum class FileSegmentKind
{
/**
* `Regular` file segment is still in cache after usage, and can be evicted
* (unless there're some holders).
*/
Regular,
/**
* Temporary` file segment is removed right after releasing.
* Also corresponding files are removed during cache loading (if any).
*/
Temporary,
};
std::string toString(FileSegmentKind kind);
struct FileSegmentInfo
{
FileCacheKey key;
size_t offset;
std::string path;
uint64_t range_left;
uint64_t range_right;
FileSegmentKind kind;
FileSegmentState state;
uint64_t size;
uint64_t downloaded_size;
uint64_t cache_hits;
uint64_t references;
bool is_unbound;
};
}

View File

@ -3,8 +3,9 @@
#include <memory> #include <memory>
#include <Core/Types.h> #include <Core/Types.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileSegmentInfo.h>
#include <Interpreters/Cache/Guards.h> #include <Interpreters/Cache/Guards.h>
#include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h> #include <Interpreters/Cache/FileCache_fwd_internal.h>
namespace DB namespace DB
@ -71,7 +72,13 @@ public:
virtual void shuffle(const CacheGuard::Lock &) = 0; virtual void shuffle(const CacheGuard::Lock &) = 0;
virtual FileSegments dump(const CacheGuard::Lock &) = 0; struct QueueEntryDump
{
FileSegmentInfo info;
bool is_protected = false;
};
using QueueEntriesDumps = std::vector<QueueEntryDump>;
virtual QueueEntriesDumps dump(FileCache & cache, const CacheGuard::Lock &) = 0;
using FinalizeEvictionFunc = std::function<void(const CacheGuard::Lock & lk)>; using FinalizeEvictionFunc = std::function<void(const CacheGuard::Lock & lk)>;
virtual bool collectCandidatesForEviction( virtual bool collectCandidatesForEviction(

View File

@ -277,12 +277,12 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
return LRUIterator(this, it.iterator); return LRUIterator(this, it.iterator);
} }
FileSegments LRUFileCachePriority::dump(const CacheGuard::Lock & lock) IFileCachePriority::QueueEntriesDumps LRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
{ {
FileSegments res; QueueEntriesDumps res;
iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata) iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)
{ {
res.push_back(FileSegment::getSnapshot(segment_metadata->file_segment)); res.emplace_back(FileSegment::getInfo(segment_metadata->file_segment, cache));
return IterationResult::CONTINUE; return IterationResult::CONTINUE;
}, lock); }, lock);
return res; return res;

View File

@ -44,7 +44,7 @@ public:
void shuffle(const CacheGuard::Lock &) override; void shuffle(const CacheGuard::Lock &) override;
FileSegments dump(const CacheGuard::Lock &) override; QueueEntriesDumps dump(FileCache & cache, const CacheGuard::Lock &) override;
void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); } void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); }

View File

@ -222,10 +222,16 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
iterator.is_protected = true; iterator.is_protected = true;
} }
FileSegments SLRUFileCachePriority::dump(const CacheGuard::Lock & lock) IFileCachePriority::QueueEntriesDumps SLRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
{ {
auto res = probationary_queue.dump(lock); auto res = probationary_queue.dump(cache, lock);
auto part_res = protected_queue.dump(lock); for (auto & entry : res)
entry.is_protected = false;
auto part_res = protected_queue.dump(cache, lock);
for (auto & entry : part_res)
entry.is_protected = true;
res.insert(res.end(), part_res.begin(), part_res.end()); res.insert(res.end(), part_res.begin(), part_res.end());
return res; return res;
} }

View File

@ -43,7 +43,7 @@ public:
void shuffle(const CacheGuard::Lock &) override; void shuffle(const CacheGuard::Lock &) override;
FileSegments dump(const CacheGuard::Lock &) override; QueueEntriesDumps dump(FileCache & cache, const CacheGuard::Lock &) override;
private: private:
LRUFileCachePriority protected_queue; LRUFileCachePriority protected_queue;

View File

@ -0,0 +1,6 @@
CREATE DICTIONARY dict (`id` String, `timestamp` DateTime)
PRIMARY KEY id SOURCE(CLICKHOUSE(QUERY 'SELECT \'test\' as id, now() as timestamp')) LAYOUT(DIRECT());
CREATE table t (id LowCardinality(String)) engine = MergeTree() ORDER BY id;
CREATE VIEW v AS select dictGet(dict, 'timestamp', id) from t;

View File

@ -142,22 +142,49 @@ void assertEqual(const std::vector<FileSegment::Info> & file_segments, const Ran
} }
} }
void assertProtectedOrProbationary(const FileSegments & file_segments, const Ranges & expected, bool assert_protected) void assertEqual(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "File segments: ";
for (const auto & f : file_segments)
{
auto range = FileSegment::Range(f.info.range_left, f.info.range_right);
std::cerr << range.toString() << ", ";
}
ASSERT_EQ(file_segments.size(), expected_ranges.size());
if (!expected_states.empty())
ASSERT_EQ(file_segments.size(), expected_states.size());
auto get_expected_state = [&](size_t i)
{
if (expected_states.empty())
return State::DOWNLOADED;
else
return expected_states[i];
};
size_t i = 0;
for (const auto & f : file_segments)
{
auto range = FileSegment::Range(f.info.range_left, f.info.range_right);
ASSERT_EQ(range, expected_ranges[i]);
ASSERT_EQ(f.info.state, get_expected_state(i));
++i;
}
}
void assertProtectedOrProbationary(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected, bool assert_protected)
{ {
std::cerr << "File segments: "; std::cerr << "File segments: ";
std::vector<Range> res; std::vector<Range> res;
for (const auto & f : file_segments) for (const auto & f : file_segments)
{ {
std::cerr << f->range().toString() << ", "; auto range = FileSegment::Range(f.info.range_left, f.info.range_right);
if (auto it = f->getQueueIterator()) std::cerr << range.toString() << ", ";
if ((f.is_protected && assert_protected) || (!f.is_protected && !assert_protected))
{ {
if (auto * slru_it = dynamic_cast<SLRUFileCachePriority::SLRUIterator *>(it.get())) res.push_back(range);
{
if ((slru_it->isProtected() && assert_protected) || (!slru_it->isProtected() && !assert_protected))
{
res.push_back(f->range());
}
}
} }
} }
@ -168,12 +195,12 @@ void assertProtectedOrProbationary(const FileSegments & file_segments, const Ran
} }
} }
void assertProtected(const FileSegments & file_segments, const Ranges & expected) void assertProtected(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected)
{ {
assertProtectedOrProbationary(file_segments, expected, true); assertProtectedOrProbationary(file_segments, expected, true);
} }
void assertProbationary(const FileSegments & file_segments, const Ranges & expected) void assertProbationary(const IFileCachePriority::QueueEntriesDumps & file_segments, const Ranges & expected)
{ {
assertProtectedOrProbationary(file_segments, expected, false); assertProtectedOrProbationary(file_segments, expected, false);
} }
@ -1151,7 +1178,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
add_range(0, 10); add_range(0, 10);
add_range(10, 5); add_range(10, 5);
assertEqual(cache.getSnapshot(key), { Range(0, 9), Range(10, 14) }); assertEqual(cache.getFileSegmentInfos(key), { Range(0, 9), Range(10, 14) });
assertEqual(cache.dumpQueue(), { Range(0, 9), Range(10, 14) }); assertEqual(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
ASSERT_EQ(cache.getFileSegmentsNum(), 2); ASSERT_EQ(cache.getFileSegmentsNum(), 2);
@ -1181,7 +1208,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
assertProbationary(cache.dumpQueue(), { Range(17, 20), Range(24, 26), Range(27, 27) }); assertProbationary(cache.dumpQueue(), { Range(17, 20), Range(24, 26), Range(27, 27) });
assertProtected(cache.dumpQueue(), { Range(0, 9), Range(10, 14) }); assertProtected(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
assertEqual(cache.getSnapshot(key), { Range(0, 9), Range(10, 14), Range(17, 20), Range(24, 26), Range(27, 27) }); assertEqual(cache.getFileSegmentInfos(key), { Range(0, 9), Range(10, 14), Range(17, 20), Range(24, 26), Range(27, 27) });
ASSERT_EQ(cache.getFileSegmentsNum(), 5); ASSERT_EQ(cache.getFileSegmentsNum(), 5);
ASSERT_EQ(cache.getUsedCacheSize(), 23); ASSERT_EQ(cache.getUsedCacheSize(), 23);
@ -1201,7 +1228,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
assertProbationary(cache.dumpQueue(), { Range(24, 26), Range(10, 14) }); assertProbationary(cache.dumpQueue(), { Range(24, 26), Range(10, 14) });
assertProtected(cache.dumpQueue(), { Range(0, 9), Range(27, 27), Range(28, 30) }); assertProtected(cache.dumpQueue(), { Range(0, 9), Range(27, 27), Range(28, 30) });
assertEqual(cache.getSnapshot(key), { Range(0, 9), Range(10, 14), Range(24, 26), Range(27, 27), Range(28, 30) }); assertEqual(cache.getFileSegmentInfos(key), { Range(0, 9), Range(10, 14), Range(24, 26), Range(27, 27), Range(28, 30) });
ASSERT_EQ(cache.getFileSegmentsNum(), 5); ASSERT_EQ(cache.getFileSegmentsNum(), 5);
ASSERT_EQ(cache.getUsedCacheSize(), 22); ASSERT_EQ(cache.getUsedCacheSize(), 22);
} }