mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Allow non-evictable file segments based on predicate
This commit is contained in:
parent
9804c39de7
commit
37242a0103
@ -45,10 +45,10 @@ IFileCache::Key IFileCache::hash(const String & path)
|
||||
return sipHash128(path.data(), path.size());
|
||||
}
|
||||
|
||||
String IFileCache::getPathInLocalCache(const Key & key, size_t offset)
|
||||
String IFileCache::getPathInLocalCache(const Key & key, size_t offset, bool is_persistent)
|
||||
{
|
||||
auto key_str = keyToStr(key);
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
|
||||
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str / (std::to_string(offset) + (is_persistent ? "_persistent" : ""));
|
||||
}
|
||||
|
||||
String IFileCache::getPathInLocalCache(const Key & key)
|
||||
@ -92,7 +92,7 @@ void LRUFileCache::useCell(
|
||||
auto file_segment = cell.file_segment;
|
||||
|
||||
if (file_segment->isDownloaded()
|
||||
&& fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset())) == 0)
|
||||
&& fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent())) == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot have zero size downloaded file segments. Current file segment: {}",
|
||||
file_segment->range().toString());
|
||||
@ -205,7 +205,7 @@ FileSegments LRUFileCache::getImpl(
|
||||
}
|
||||
|
||||
FileSegments LRUFileCache::splitRangeIntoCells(
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state, std::lock_guard<std::mutex> & cache_lock)
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state, bool is_persistent, std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
assert(size > 0);
|
||||
|
||||
@ -221,7 +221,7 @@ FileSegments LRUFileCache::splitRangeIntoCells(
|
||||
current_cell_size = std::min(remaining_size, max_file_segment_size);
|
||||
remaining_size -= current_cell_size;
|
||||
|
||||
auto * cell = addCell(key, current_pos, current_cell_size, state, cache_lock);
|
||||
auto * cell = addCell(key, current_pos, current_cell_size, state, is_persistent, cache_lock);
|
||||
if (cell)
|
||||
file_segments.push_back(cell->file_segment);
|
||||
assert(cell);
|
||||
@ -233,7 +233,7 @@ FileSegments LRUFileCache::splitRangeIntoCells(
|
||||
return file_segments;
|
||||
}
|
||||
|
||||
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size)
|
||||
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent)
|
||||
{
|
||||
assertInitialized();
|
||||
|
||||
@ -250,7 +250,7 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
|
||||
|
||||
if (file_segments.empty())
|
||||
{
|
||||
file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, cache_lock);
|
||||
file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, is_persistent, cache_lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -295,7 +295,7 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
|
||||
assert(current_pos < segment_range.left);
|
||||
|
||||
auto hole_size = segment_range.left - current_pos;
|
||||
file_segments.splice(it, splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock));
|
||||
file_segments.splice(it, splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, is_persistent, cache_lock));
|
||||
|
||||
current_pos = segment_range.right + 1;
|
||||
++it;
|
||||
@ -309,7 +309,7 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
|
||||
/// segmentN
|
||||
|
||||
auto hole_size = range.right - current_pos + 1;
|
||||
file_segments.splice(file_segments.end(), splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock));
|
||||
file_segments.splice(file_segments.end(), splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, is_persistent, cache_lock));
|
||||
}
|
||||
}
|
||||
|
||||
@ -318,7 +318,8 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t
|
||||
}
|
||||
|
||||
LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state,
|
||||
const Key & key, size_t offset, size_t size,
|
||||
FileSegment::State state, bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock)
|
||||
{
|
||||
/// Create a file segment cell and put it in `files` map by [key][offset].
|
||||
@ -332,7 +333,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
|
||||
keyToStr(key), offset, size, dumpStructureImpl(key, cache_lock));
|
||||
|
||||
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, state);
|
||||
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, state, is_persistent);
|
||||
FileSegmentCell cell(std::move(file_segment), queue);
|
||||
|
||||
auto & offsets = files[key];
|
||||
@ -354,7 +355,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
||||
return &(it->second);
|
||||
}
|
||||
|
||||
FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size)
|
||||
FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent)
|
||||
{
|
||||
std::lock_guard cache_lock(mutex);
|
||||
|
||||
@ -365,7 +366,7 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
|
||||
"Cache cell already exists for key `{}` and offset {}",
|
||||
keyToStr(key), offset);
|
||||
|
||||
auto file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock);
|
||||
auto file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::DOWNLOADING, is_persistent, cache_lock);
|
||||
return FileSegmentsHolder(std::move(file_segments));
|
||||
}
|
||||
|
||||
@ -534,13 +535,15 @@ void LRUFileCache::remove(
|
||||
if (!cell)
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
bool is_persistent_file_segment = cell->file_segment->isPersistent();
|
||||
|
||||
if (cell->queue_iterator)
|
||||
queue.erase(*cell->queue_iterator);
|
||||
|
||||
auto & offsets = files[key];
|
||||
offsets.erase(offset);
|
||||
|
||||
auto cache_file_path = getPathInLocalCache(key, offset);
|
||||
auto cache_file_path = getPathInLocalCache(key, offset, is_persistent_file_segment);
|
||||
if (fs::exists(cache_file_path))
|
||||
{
|
||||
try
|
||||
@ -588,7 +591,19 @@ void LRUFileCache::loadCacheInfoIntoMemory()
|
||||
fs::directory_iterator offset_it{key_it->path()};
|
||||
for (; offset_it != fs::directory_iterator(); ++offset_it)
|
||||
{
|
||||
bool parsed = tryParse<UInt64>(offset, offset_it->path().filename());
|
||||
auto offset_with_suffix = offset_it->path().filename().string();
|
||||
auto delim_pos = offset_with_suffix.find('_');
|
||||
bool parsed;
|
||||
bool is_persistent = false;
|
||||
|
||||
if (delim_pos == std::string::npos)
|
||||
parsed = tryParse<UInt64>(offset, offset_with_suffix);
|
||||
else
|
||||
{
|
||||
parsed = tryParse<UInt64>(offset, offset_with_suffix.substr(0, delim_pos+1));
|
||||
is_persistent = offset_with_suffix.substr(delim_pos) == "persistent";
|
||||
}
|
||||
|
||||
if (!parsed)
|
||||
{
|
||||
LOG_WARNING(log, "Unexpected file: ", offset_it->path().string());
|
||||
@ -604,7 +619,7 @@ void LRUFileCache::loadCacheInfoIntoMemory()
|
||||
|
||||
if (tryReserve(key, offset, size, cache_lock))
|
||||
{
|
||||
auto * cell = addCell(key, offset, size, FileSegment::State::DOWNLOADED, cache_lock);
|
||||
auto * cell = addCell(key, offset, size, FileSegment::State::DOWNLOADED, is_persistent, cache_lock);
|
||||
if (cell)
|
||||
cells.push_back(cell);
|
||||
}
|
||||
@ -685,14 +700,14 @@ void LRUFileCache::reduceSizeToDownloaded(
|
||||
if (!cell)
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "No cell found for key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
const auto & file_segment = cell->file_segment;
|
||||
auto file_segment = cell->file_segment;
|
||||
|
||||
size_t downloaded_size = file_segment->downloaded_size;
|
||||
if (downloaded_size == file_segment->range().size())
|
||||
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
||||
"Nothing to reduce, file segment fully downloaded, key: {}, offset: {}", keyToStr(key), offset);
|
||||
|
||||
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED);
|
||||
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED, file_segment->isPersistent());
|
||||
}
|
||||
|
||||
bool LRUFileCache::isLastFileSegmentHolder(
|
||||
@ -734,7 +749,7 @@ std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)
|
||||
for (const auto & [offset, cell] : cells_by_offset)
|
||||
{
|
||||
if (cell.file_segment->state() == FileSegment::State::DOWNLOADED)
|
||||
cache_paths.push_back(getPathInLocalCache(key, offset));
|
||||
cache_paths.push_back(getPathInLocalCache(key, offset, cell.file_segment->isPersistent()));
|
||||
}
|
||||
|
||||
return cache_paths;
|
||||
|
@ -51,7 +51,7 @@ public:
|
||||
|
||||
static Key hash(const String & path);
|
||||
|
||||
String getPathInLocalCache(const Key & key, size_t offset);
|
||||
String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent);
|
||||
|
||||
String getPathInLocalCache(const Key & key);
|
||||
|
||||
@ -70,9 +70,9 @@ public:
|
||||
* As long as pointers to returned file segments are hold
|
||||
* it is guaranteed that these file segments are not removed from cache.
|
||||
*/
|
||||
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) = 0;
|
||||
virtual FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent) = 0;
|
||||
|
||||
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0;
|
||||
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent) = 0;
|
||||
|
||||
virtual FileSegments getSnapshot() const = 0;
|
||||
|
||||
@ -122,11 +122,11 @@ public:
|
||||
const String & cache_base_path_,
|
||||
const FileCacheSettings & cache_settings_);
|
||||
|
||||
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size) override;
|
||||
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent) override;
|
||||
|
||||
FileSegments getSnapshot() const override;
|
||||
|
||||
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
|
||||
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent) override;
|
||||
|
||||
void initialize() override;
|
||||
|
||||
@ -151,7 +151,7 @@ private:
|
||||
/// Pointer to file segment is always hold by the cache itself.
|
||||
/// Apart from pointer in cache, it can be hold by cache users, when they call
|
||||
/// getorSet(), but cache users always hold it via FileSegmentsHolder.
|
||||
bool releasable() const { return file_segment.unique(); }
|
||||
bool releasable() const { return file_segment.unique() && !file_segment->isPersistent(); }
|
||||
|
||||
size_t size() const { return file_segment->reserved_size; }
|
||||
|
||||
@ -181,7 +181,8 @@ private:
|
||||
|
||||
FileSegmentCell * addCell(
|
||||
const Key & key, size_t offset, size_t size,
|
||||
FileSegment::State state, std::lock_guard<std::mutex> & cache_lock);
|
||||
FileSegment::State state, bool is_persistent,
|
||||
std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
@ -209,7 +210,7 @@ private:
|
||||
void loadCacheInfoIntoMemory();
|
||||
|
||||
FileSegments splitRangeIntoCells(
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state, std::lock_guard<std::mutex> & cache_lock);
|
||||
const Key & key, size_t offset, size_t size, FileSegment::State state, bool is_persistent, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
String dumpStructureImpl(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
|
||||
|
||||
|
@ -20,7 +20,8 @@ FileSegment::FileSegment(
|
||||
size_t size_,
|
||||
const Key & key_,
|
||||
IFileCache * cache_,
|
||||
State download_state_)
|
||||
State download_state_,
|
||||
bool is_persistent_)
|
||||
: segment_range(offset_, offset_ + size_ - 1)
|
||||
, download_state(download_state_)
|
||||
, file_key(key_)
|
||||
@ -30,6 +31,7 @@ FileSegment::FileSegment(
|
||||
#else
|
||||
, log(&Poco::Logger::get("FileSegment"))
|
||||
#endif
|
||||
, is_persistent(is_persistent_)
|
||||
{
|
||||
/// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
|
||||
switch (download_state)
|
||||
@ -223,7 +225,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_, bool fin
|
||||
"Cache writer was finalized (downloaded size: {}, state: {})",
|
||||
downloaded_size, stateToString(download_state));
|
||||
|
||||
auto download_path = cache->getPathInLocalCache(key(), offset());
|
||||
auto download_path = cache->getPathInLocalCache(key(), offset(), is_persistent);
|
||||
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ public:
|
||||
|
||||
FileSegment(
|
||||
size_t offset_, size_t size_, const Key & key_,
|
||||
IFileCache * cache_, State download_state_);
|
||||
IFileCache * cache_, State download_state_, bool is_persistent_ = false);
|
||||
|
||||
State state() const;
|
||||
|
||||
@ -91,6 +91,8 @@ public:
|
||||
|
||||
size_t offset() const { return range().left; }
|
||||
|
||||
bool isPersistent() const { return is_persistent; }
|
||||
|
||||
State wait();
|
||||
|
||||
bool reserve(size_t size);
|
||||
@ -195,6 +197,8 @@ private:
|
||||
std::atomic<bool> is_downloaded{false};
|
||||
std::atomic<size_t> hits_count = 0; /// cache hits.
|
||||
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
|
||||
|
||||
bool is_persistent;
|
||||
};
|
||||
|
||||
struct FileSegmentsHolder : private boost::noncopyable
|
||||
|
@ -111,7 +111,7 @@ TEST(LRUFileCache, get)
|
||||
auto key = cache.hash("key1");
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 0, 10); /// Add range [0, 9]
|
||||
auto holder = cache.getOrSet(key, 0, 10, false); /// Add range [0, 9]
|
||||
auto segments = fromHolder(holder);
|
||||
/// Range was not present in cache. It should be added in cache as one while file segment.
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
@ -138,7 +138,7 @@ TEST(LRUFileCache, get)
|
||||
|
||||
{
|
||||
/// Want range [5, 14], but [0, 9] already in cache, so only [10, 14] will be put in cache.
|
||||
auto holder = cache.getOrSet(key, 5, 10);
|
||||
auto holder = cache.getOrSet(key, 5, 10, false);
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 2);
|
||||
|
||||
@ -156,14 +156,14 @@ TEST(LRUFileCache, get)
|
||||
/// 0 910 14
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 9, 1); /// Get [9, 9]
|
||||
auto holder = cache.getOrSet(key, 9, 1, false); /// Get [9, 9]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
assertRange(7, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 9, 2); /// Get [9, 10]
|
||||
auto holder = cache.getOrSet(key, 9, 2, false); /// Get [9, 10]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 2);
|
||||
assertRange(8, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED);
|
||||
@ -171,15 +171,15 @@ TEST(LRUFileCache, get)
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 10, 1); /// Get [10, 10]
|
||||
auto holder = cache.getOrSet(key, 10, 1, false); /// Get [10, 10]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
assertRange(10, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED);
|
||||
}
|
||||
|
||||
complete(cache.getOrSet(key, 17, 4)); /// Get [17, 20]
|
||||
complete(cache.getOrSet(key, 24, 3)); /// Get [24, 26]
|
||||
complete(cache.getOrSet(key, 27, 1)); /// Get [27, 27]
|
||||
complete(cache.getOrSet(key, 17, 4, false)); /// Get [17, 20]
|
||||
complete(cache.getOrSet(key, 24, 3, false)); /// Get [24, 26]
|
||||
complete(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27]
|
||||
|
||||
/// Current cache: [__________][_____] [____] [___][]
|
||||
/// ^ ^^ ^ ^ ^ ^ ^^^
|
||||
@ -187,7 +187,7 @@ TEST(LRUFileCache, get)
|
||||
///
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 0, 26); /// Get [0, 25]
|
||||
auto holder = cache.getOrSet(key, 0, 26, false); /// Get [0, 25]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 6);
|
||||
|
||||
@ -221,14 +221,14 @@ TEST(LRUFileCache, get)
|
||||
/// as max elements size is reached, next attempt to put something in cache should fail.
|
||||
/// This will also check that [27, 27] was indeed evicted.
|
||||
|
||||
auto holder1 = cache.getOrSet(key, 27, 1);
|
||||
auto holder1 = cache.getOrSet(key, 27, 1, false);
|
||||
auto segments_1 = fromHolder(holder1); /// Get [27, 27]
|
||||
ASSERT_EQ(segments_1.size(), 1);
|
||||
assertRange(17, segments_1[0], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY);
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 12, 10); /// Get [12, 21]
|
||||
auto holder = cache.getOrSet(key, 12, 10, false); /// Get [12, 21]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 4);
|
||||
|
||||
@ -252,7 +252,7 @@ TEST(LRUFileCache, get)
|
||||
ASSERT_EQ(cache.getStat().size, 5);
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 23, 5); /// Get [23, 28]
|
||||
auto holder = cache.getOrSet(key, 23, 5, false); /// Get [23, 28]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
@ -273,12 +273,12 @@ TEST(LRUFileCache, get)
|
||||
/// 17 21 2324 26 28
|
||||
|
||||
{
|
||||
auto holder5 = cache.getOrSet(key, 2, 3); /// Get [2, 4]
|
||||
auto holder5 = cache.getOrSet(key, 2, 3,false); /// Get [2, 4]
|
||||
auto s5 = fromHolder(holder5);
|
||||
ASSERT_EQ(s5.size(), 1);
|
||||
assertRange(25, s5[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::EMPTY);
|
||||
|
||||
auto holder1 = cache.getOrSet(key, 30, 2); /// Get [30, 31]
|
||||
auto holder1 = cache.getOrSet(key, 30, 2, false); /// Get [30, 31]
|
||||
auto s1 = fromHolder(holder1);
|
||||
ASSERT_EQ(s1.size(), 1);
|
||||
assertRange(26, s1[0], DB::FileSegment::Range(30, 31), DB::FileSegment::State::EMPTY);
|
||||
@ -294,20 +294,20 @@ TEST(LRUFileCache, get)
|
||||
/// ^ ^ ^ ^ ^ ^ ^ ^
|
||||
/// 2 4 23 24 26 27 30 31
|
||||
|
||||
auto holder2 = cache.getOrSet(key, 23, 1); /// Get [23, 23]
|
||||
auto holder2 = cache.getOrSet(key, 23, 1, false); /// Get [23, 23]
|
||||
auto s2 = fromHolder(holder2);
|
||||
ASSERT_EQ(s2.size(), 1);
|
||||
|
||||
auto holder3 = cache.getOrSet(key, 24, 3); /// Get [24, 26]
|
||||
auto holder3 = cache.getOrSet(key, 24, 3, false); /// Get [24, 26]
|
||||
auto s3 = fromHolder(holder3);
|
||||
ASSERT_EQ(s3.size(), 1);
|
||||
|
||||
auto holder4 = cache.getOrSet(key, 27, 1); /// Get [27, 27]
|
||||
auto holder4 = cache.getOrSet(key, 27, 1, false); /// Get [27, 27]
|
||||
auto s4 = fromHolder(holder4);
|
||||
ASSERT_EQ(s4.size(), 1);
|
||||
|
||||
/// All cache is now unreleasable because pointers are still hold
|
||||
auto holder6 = cache.getOrSet(key, 0, 40);
|
||||
auto holder6 = cache.getOrSet(key, 0, 40, false);
|
||||
auto f = fromHolder(holder6);
|
||||
ASSERT_EQ(f.size(), 9);
|
||||
|
||||
@ -328,7 +328,7 @@ TEST(LRUFileCache, get)
|
||||
}
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 2, 3); /// Get [2, 4]
|
||||
auto holder = cache.getOrSet(key, 2, 3, false); /// Get [2, 4]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 1);
|
||||
assertRange(31, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED);
|
||||
@ -339,7 +339,7 @@ TEST(LRUFileCache, get)
|
||||
/// 2 4 23 24 26 27 30 31
|
||||
|
||||
{
|
||||
auto holder = cache.getOrSet(key, 25, 5); /// Get [25, 29]
|
||||
auto holder = cache.getOrSet(key, 25, 5, false); /// Get [25, 29]
|
||||
auto segments = fromHolder(holder);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
@ -363,7 +363,7 @@ TEST(LRUFileCache, get)
|
||||
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
|
||||
thread_status_1.attachQueryContext(query_context_1);
|
||||
|
||||
auto holder_2 = cache.getOrSet(key, 25, 5); /// Get [25, 29] once again.
|
||||
auto holder_2 = cache.getOrSet(key, 25, 5, false); /// Get [25, 29] once again.
|
||||
auto segments_2 = fromHolder(holder_2);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
|
||||
@ -406,7 +406,7 @@ TEST(LRUFileCache, get)
|
||||
/// and notify_all() is also called from destructor of holder.
|
||||
|
||||
std::optional<DB::FileSegmentsHolder> holder;
|
||||
holder.emplace(cache.getOrSet(key, 3, 23)); /// Get [3, 25]
|
||||
holder.emplace(cache.getOrSet(key, 3, 23, false)); /// Get [3, 25]
|
||||
|
||||
auto segments = fromHolder(*holder);
|
||||
ASSERT_EQ(segments.size(), 3);
|
||||
@ -432,7 +432,7 @@ TEST(LRUFileCache, get)
|
||||
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
|
||||
thread_status_1.attachQueryContext(query_context_1);
|
||||
|
||||
auto holder_2 = cache.getOrSet(key, 3, 23); /// Get [3, 25] once again
|
||||
auto holder_2 = cache.getOrSet(key, 3, 23, false); /// Get [3, 25] once again
|
||||
auto segments_2 = fromHolder(*holder);
|
||||
ASSERT_EQ(segments_2.size(), 3);
|
||||
|
||||
@ -481,7 +481,7 @@ TEST(LRUFileCache, get)
|
||||
|
||||
ASSERT_EQ(cache2.getStat().downloaded_size, 5);
|
||||
|
||||
auto holder1 = cache2.getOrSet(key, 2, 28); /// Get [2, 29]
|
||||
auto holder1 = cache2.getOrSet(key, 2, 28, false); /// Get [2, 29]
|
||||
auto segments1 = fromHolder(holder1);
|
||||
ASSERT_EQ(segments1.size(), 5);
|
||||
|
||||
@ -500,7 +500,7 @@ TEST(LRUFileCache, get)
|
||||
auto cache2 = DB::LRUFileCache(caches_dir / "cache2", settings2);
|
||||
cache2.initialize();
|
||||
|
||||
auto holder1 = cache2.getOrSet(key, 0, 25); /// Get [0, 24]
|
||||
auto holder1 = cache2.getOrSet(key, 0, 25, false); /// Get [0, 24]
|
||||
auto segments1 = fromHolder(holder1);
|
||||
|
||||
ASSERT_EQ(segments1.size(), 3);
|
||||
|
@ -565,6 +565,7 @@ class IColumn;
|
||||
M(UInt64, filesystem_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \
|
||||
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
|
||||
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "", 0) \
|
||||
M(Bool, filesystem_cache_do_not_evict_index_and_marks_files, true, "", 0) \
|
||||
\
|
||||
M(UInt64, http_max_tries, 10, "Max attempts to read via http.", 0) \
|
||||
M(UInt64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
|
||||
|
@ -20,6 +20,11 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
class CachedWriteBuffer final : public WriteBufferFromFileDecorator
|
||||
{
|
||||
public:
|
||||
@ -45,7 +50,7 @@ public:
|
||||
|
||||
size_t remaining_size = size;
|
||||
|
||||
auto file_segments_holder = cache->setDownloading(key, current_download_offset, size);
|
||||
auto file_segments_holder = cache->setDownloading(key, current_download_offset, size, false);
|
||||
auto & file_segments = file_segments_holder.file_segments;
|
||||
|
||||
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end(); ++file_segment_it)
|
||||
@ -88,6 +93,13 @@ DiskCache::DiskCache(
|
||||
{
|
||||
}
|
||||
|
||||
static bool isFilePersistent(const String & path)
|
||||
{
|
||||
return path.ends_with("idx") // index files.
|
||||
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") /// mark files.
|
||||
|| path.ends_with("txt") || path.ends_with("dat");
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskCache::readFile(
|
||||
const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
|
||||
{
|
||||
@ -97,6 +109,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskCache::readFile(
|
||||
if (IFileCache::isReadOnly())
|
||||
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
|
||||
|
||||
if (settings.filesystem_cache_do_not_evict_index_and_marks_files && isFilePersistent(path))
|
||||
read_settings.cache_file_as_persistent = true;
|
||||
|
||||
return DiskDecorator::readFile(path, read_settings, read_hint, file_size);
|
||||
}
|
||||
|
||||
@ -201,13 +216,6 @@ void registerDiskCache(DiskFactory & factory)
|
||||
FileCacheSettings file_cache_settings;
|
||||
file_cache_settings.loadFromConfig(config, config_prefix);
|
||||
|
||||
// auto check_non_relesable_path = [] (const String & path)
|
||||
// {
|
||||
// return path.ends_with("idx") // index files.
|
||||
// || path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") /// mark files.
|
||||
// || path.ends_with("txt") || path.ends_with("dat");
|
||||
// };
|
||||
|
||||
auto cache = FileCacheFactory::instance().getOrCreate(cache_base_path, file_cache_settings);
|
||||
cache->initialize();
|
||||
|
||||
|
@ -41,12 +41,13 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
|
||||
, settings(settings_)
|
||||
, read_until_position(read_until_position_)
|
||||
, remote_file_reader_creator(remote_file_reader_creator_)
|
||||
, is_persistent(settings_.cache_file_as_persistent)
|
||||
{
|
||||
}
|
||||
|
||||
void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
|
||||
{
|
||||
file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size));
|
||||
file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size, is_persistent));
|
||||
|
||||
/**
|
||||
* Segments in returned list are ordered in ascending order and represent a full contiguous
|
||||
@ -63,7 +64,7 @@ void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
|
||||
|
||||
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getCacheReadBuffer(size_t offset) const
|
||||
{
|
||||
auto path = cache->getPathInLocalCache(cache_key, offset);
|
||||
auto path = cache->getPathInLocalCache(cache_key, offset, is_persistent);
|
||||
auto buf = std::make_shared<ReadBufferFromFile>(path, settings.local_fs_buffer_size);
|
||||
if (buf->size() == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {}", path);
|
||||
|
@ -102,6 +102,8 @@ private:
|
||||
size_t first_offset = 0;
|
||||
String nextimpl_step_log_info;
|
||||
String last_caller_id;
|
||||
|
||||
bool is_persistent = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -9,11 +9,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
static String getDiskMetadataPath(
|
||||
const String & name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
|
@ -80,6 +80,8 @@ struct ReadSettings
|
||||
bool enable_filesystem_cache = true;
|
||||
size_t filesystem_cache_max_wait_sec = 1;
|
||||
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
|
||||
bool filesystem_cache_do_not_evict_index_and_marks_files = true;
|
||||
bool cache_file_as_persistent = false;
|
||||
|
||||
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
|
||||
|
||||
|
@ -34,7 +34,6 @@ const int S3_WARN_MAX_PARTS = 10000;
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
struct WriteBufferFromS3::UploadPartTask
|
||||
|
@ -3317,6 +3317,7 @@ ReadSettings Context::getReadSettings() const
|
||||
res.enable_filesystem_cache = settings.enable_filesystem_cache;
|
||||
res.filesystem_cache_max_wait_sec = settings.filesystem_cache_max_wait_sec;
|
||||
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
|
||||
res.filesystem_cache_do_not_evict_index_and_marks_files = settings.filesystem_cache_do_not_evict_index_and_marks_files;
|
||||
|
||||
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
|
||||
|
||||
|
@ -23,6 +23,7 @@ NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes()
|
||||
{"cache_hits", std::make_shared<DataTypeUInt64>()},
|
||||
{"references", std::make_shared<DataTypeUInt64>()},
|
||||
{"downloaded_size", std::make_shared<DataTypeUInt64>()},
|
||||
{"persistent", std::make_shared<DataTypeNumber<UInt8>>()}
|
||||
};
|
||||
}
|
||||
|
||||
@ -43,7 +44,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
|
||||
for (const auto & file_segment : file_segments)
|
||||
{
|
||||
res_columns[0]->insert(cache_base_path);
|
||||
res_columns[1]->insert(cache->getPathInLocalCache(file_segment->key(), file_segment->offset()));
|
||||
res_columns[1]->insert(cache->getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent()));
|
||||
|
||||
const auto & range = file_segment->range();
|
||||
res_columns[2]->insert(range.left);
|
||||
@ -53,6 +54,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
|
||||
res_columns[6]->insert(file_segment->getHitsCount());
|
||||
res_columns[7]->insert(file_segment->getRefCount());
|
||||
res_columns[8]->insert(file_segment->getDownloadedSize());
|
||||
res_columns[9]->insert(file_segment->isPersistent());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user