2022-01-13 11:57:56 +00:00
|
|
|
#include "FileCache.h"
|
|
|
|
|
|
|
|
#include <Common/randomSeed.h>
|
|
|
|
#include <Common/SipHash.h>
|
|
|
|
#include <Common/hex.h>
|
2022-03-14 19:15:07 +00:00
|
|
|
#include <Common/FileCacheSettings.h>
|
2022-01-13 11:57:56 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
2022-01-21 15:39:34 +00:00
|
|
|
#include <IO/WriteBufferFromFile.h>
|
|
|
|
#include <IO/ReadSettings.h>
|
|
|
|
#include <IO/WriteBufferFromString.h>
|
|
|
|
#include <IO/Operators.h>
|
2022-01-13 11:57:56 +00:00
|
|
|
#include <pcg-random/pcg_random.hpp>
|
|
|
|
#include <filesystem>
|
|
|
|
|
|
|
|
namespace fs = std::filesystem;
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
2022-01-21 15:39:34 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2022-02-18 15:38:23 +00:00
|
|
|
extern const int REMOTE_FS_OBJECT_CACHE_ERROR;
|
2022-01-23 17:33:22 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2022-01-21 15:39:34 +00:00
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2022-02-18 15:38:23 +00:00
|
|
|
String keyToStr(const IFileCache::Key & key)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
return getHexUIntLowercase(key);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-05 16:00:59 +00:00
|
|
|
static bool isQueryInitialized()
|
|
|
|
{
|
2022-06-07 03:50:56 +00:00
|
|
|
return CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() && CurrentThread::getQueryId().size != 0;
|
2022-06-05 16:00:59 +00:00
|
|
|
}
|
|
|
|
|
2022-02-18 15:38:23 +00:00
|
|
|
IFileCache::IFileCache(
|
2022-01-26 18:43:23 +00:00
|
|
|
const String & cache_base_path_,
|
2022-03-14 19:15:07 +00:00
|
|
|
const FileCacheSettings & cache_settings_)
|
2022-01-26 18:43:23 +00:00
|
|
|
: cache_base_path(cache_base_path_)
|
2022-03-21 11:30:25 +00:00
|
|
|
, max_size(cache_settings_.max_size)
|
|
|
|
, max_element_size(cache_settings_.max_elements)
|
|
|
|
, max_file_segment_size(cache_settings_.max_file_segment_size)
|
2022-06-08 06:19:50 +00:00
|
|
|
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2022-02-18 15:38:23 +00:00
|
|
|
IFileCache::Key IFileCache::hash(const String & path)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
return sipHash128(path.data(), path.size());
|
|
|
|
}
|
|
|
|
|
2022-02-18 15:38:23 +00:00
|
|
|
String IFileCache::getPathInLocalCache(const Key & key, size_t offset)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
auto key_str = keyToStr(key);
|
|
|
|
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
|
|
|
|
}
|
|
|
|
|
2022-02-18 15:38:23 +00:00
|
|
|
String IFileCache::getPathInLocalCache(const Key & key)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
auto key_str = keyToStr(key);
|
|
|
|
return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
|
|
|
|
}
|
|
|
|
|
2022-04-07 16:46:46 +00:00
|
|
|
bool IFileCache::isReadOnly()
|
2022-01-24 22:07:02 +00:00
|
|
|
{
|
2022-06-06 08:48:30 +00:00
|
|
|
return (!isQueryInitialized());
|
2022-01-24 22:07:02 +00:00
|
|
|
}
|
|
|
|
|
2022-03-09 09:36:52 +00:00
|
|
|
void IFileCache::assertInitialized() const
|
|
|
|
{
|
|
|
|
if (!is_initialized)
|
|
|
|
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
IFileCache::QueryContextPtr IFileCache::getCurrentQueryContext(std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
|
|
|
if (!isQueryInitialized())
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
return getQueryContext(CurrentThread::getQueryId().toString(), cache_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
IFileCache::QueryContextPtr IFileCache::getQueryContext(const String & query_id, std::lock_guard<std::mutex> &)
|
|
|
|
{
|
|
|
|
auto query_iter = query_map.find(query_id);
|
|
|
|
return (query_iter == query_map.end()) ? nullptr : query_iter->second;
|
|
|
|
}
|
|
|
|
|
|
|
|
void IFileCache::removeQueryContext(const String & query_id)
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
auto query_iter = query_map.find(query_id);
|
|
|
|
|
|
|
|
if (query_iter == query_map.end())
|
2022-06-08 06:19:50 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to release query context that does not exist");
|
2022-06-07 11:10:57 +00:00
|
|
|
|
|
|
|
query_map.erase(query_iter);
|
|
|
|
}
|
|
|
|
|
|
|
|
IFileCache::QueryContextPtr IFileCache::getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
2022-06-07 17:42:46 +00:00
|
|
|
if (query_id.empty())
|
|
|
|
return nullptr;
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
auto context = getQueryContext(query_id, cache_lock);
|
|
|
|
if (!context)
|
|
|
|
{
|
2022-06-08 06:19:50 +00:00
|
|
|
auto query_iter = query_map.insert({query_id, std::make_shared<QueryContext>(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache)}).first;
|
2022-06-07 11:10:57 +00:00
|
|
|
context = query_iter->second;
|
|
|
|
}
|
|
|
|
return context;
|
|
|
|
}
|
|
|
|
|
|
|
|
IFileCache::QueryContextHolder IFileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings)
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
2022-06-07 17:42:46 +00:00
|
|
|
|
2022-06-08 12:13:20 +00:00
|
|
|
/// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero,
|
|
|
|
/// we create context query for current query.
|
2022-06-08 12:42:16 +00:00
|
|
|
if (enable_filesystem_query_cache_limit && settings.max_query_cache_size)
|
2022-06-08 12:13:20 +00:00
|
|
|
{
|
|
|
|
auto context = getOrSetQueryContext(query_id, settings, cache_lock);
|
|
|
|
return QueryContextHolder(query_id, this, context);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
return QueryContextHolder();
|
2022-06-07 11:10:57 +00:00
|
|
|
}
|
|
|
|
|
2022-06-07 17:42:46 +00:00
|
|
|
IFileCache::QueryContextHolder::QueryContextHolder(const String & query_id_, IFileCache * cache_, IFileCache::QueryContextPtr context_)
|
|
|
|
: query_id(query_id_), cache(cache_), context(context_)
|
2022-06-07 11:10:57 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
IFileCache::QueryContextHolder::~QueryContextHolder()
|
|
|
|
{
|
2022-06-07 17:42:46 +00:00
|
|
|
/// If only the query_map and the current holder hold the context_query,
|
|
|
|
/// the query has been completed and the query_context is released.
|
|
|
|
if (context && context.use_count() == 2)
|
2022-06-07 11:10:57 +00:00
|
|
|
cache->removeQueryContext(query_id);
|
|
|
|
}
|
|
|
|
|
2022-03-14 19:15:07 +00:00
|
|
|
LRUFileCache::LRUFileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_)
|
|
|
|
: IFileCache(cache_base_path_, cache_settings_)
|
2022-05-25 08:54:28 +00:00
|
|
|
, max_stash_element_size(cache_settings_.max_elements)
|
|
|
|
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
|
2022-02-18 09:06:13 +00:00
|
|
|
, log(&Poco::Logger::get("LRUFileCache"))
|
2022-03-01 17:12:34 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
void LRUFileCache::initialize()
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-04-28 09:11:25 +00:00
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
if (!is_initialized)
|
|
|
|
{
|
|
|
|
if (fs::exists(cache_base_path))
|
2022-04-28 10:45:01 +00:00
|
|
|
loadCacheInfoIntoMemory(cache_lock);
|
2022-04-28 09:11:25 +00:00
|
|
|
else
|
|
|
|
fs::create_directories(cache_base_path);
|
|
|
|
is_initialized = true;
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void LRUFileCache::useCell(
|
2022-05-04 15:12:35 +00:00
|
|
|
const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-01-30 11:35:28 +00:00
|
|
|
auto file_segment = cell.file_segment;
|
2022-02-23 10:12:14 +00:00
|
|
|
|
|
|
|
if (file_segment->isDownloaded()
|
2022-02-18 15:38:23 +00:00
|
|
|
&& fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset())) == 0)
|
2022-01-30 11:35:28 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Cannot have zero size downloaded file segments. Current file segment: {}",
|
|
|
|
file_segment->range().toString());
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
result.push_back(cell.file_segment);
|
2022-01-21 15:39:34 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* A cell receives a queue iterator on first successful space reservation attempt
|
|
|
|
* (space is reserved incrementally on each read buffer nextImpl() call).
|
|
|
|
*/
|
|
|
|
if (cell.queue_iterator)
|
|
|
|
{
|
|
|
|
/// Move to the end of the queue. The iterator remains valid.
|
2022-05-04 15:12:35 +00:00
|
|
|
queue.moveToEnd(*cell.queue_iterator, cache_lock);
|
2022-01-21 15:39:34 +00:00
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
LRUFileCache::FileSegmentCell * LRUFileCache::getCell(
|
2022-01-24 22:07:02 +00:00
|
|
|
const Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
auto it = files.find(key);
|
|
|
|
if (it == files.end())
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
auto & offsets = it->second;
|
|
|
|
auto cell_it = offsets.find(offset);
|
|
|
|
if (cell_it == offsets.end())
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
return &cell_it->second;
|
|
|
|
}
|
|
|
|
|
|
|
|
FileSegments LRUFileCache::getImpl(
|
2022-01-24 22:07:02 +00:00
|
|
|
const Key & key, const FileSegment::Range & range, std::lock_guard<std::mutex> & cache_lock)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
/// Given range = [left, right] and non-overlapping ordered set of file segments,
|
|
|
|
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
|
|
|
|
|
|
|
|
auto it = files.find(key);
|
|
|
|
if (it == files.end())
|
|
|
|
return {};
|
|
|
|
|
|
|
|
const auto & file_segments = it->second;
|
|
|
|
if (file_segments.empty())
|
|
|
|
{
|
2022-02-18 15:38:23 +00:00
|
|
|
auto key_path = getPathInLocalCache(key);
|
|
|
|
|
|
|
|
files.erase(key);
|
|
|
|
|
|
|
|
if (fs::exists(key_path))
|
|
|
|
fs::remove(key_path);
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
|
|
|
FileSegments result;
|
|
|
|
auto segment_it = file_segments.lower_bound(range.left);
|
|
|
|
if (segment_it == file_segments.end())
|
|
|
|
{
|
|
|
|
/// N - last cached segment for given file key, segment{N}.offset < range.left:
|
|
|
|
/// segment{N} segment{N}
|
|
|
|
/// [________ [_______]
|
|
|
|
/// [__________] OR [________]
|
|
|
|
/// ^ ^
|
|
|
|
/// range.left range.left
|
|
|
|
|
2022-02-18 15:38:23 +00:00
|
|
|
const auto & cell = file_segments.rbegin()->second;
|
2022-01-13 11:57:56 +00:00
|
|
|
if (cell.file_segment->range().right < range.left)
|
|
|
|
return {};
|
|
|
|
|
|
|
|
useCell(cell, result, cache_lock);
|
|
|
|
}
|
|
|
|
else /// segment_it <-- segmment{k}
|
|
|
|
{
|
|
|
|
if (segment_it != file_segments.begin())
|
|
|
|
{
|
|
|
|
const auto & prev_cell = std::prev(segment_it)->second;
|
|
|
|
const auto & prev_cell_range = prev_cell.file_segment->range();
|
|
|
|
|
|
|
|
if (range.left <= prev_cell_range.right)
|
|
|
|
{
|
|
|
|
/// segment{k-1} segment{k}
|
|
|
|
/// [________] [_____
|
|
|
|
/// [___________
|
|
|
|
/// ^
|
|
|
|
/// range.left
|
|
|
|
|
|
|
|
useCell(prev_cell, result, cache_lock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// segment{k} ... segment{k-1} segment{k} segment{k}
|
|
|
|
/// [______ [______] [____ [________
|
|
|
|
/// [_________ OR [________ OR [______] ^
|
|
|
|
/// ^ ^ ^ segment{k}.offset
|
|
|
|
/// range.left range.left range.right
|
|
|
|
|
|
|
|
while (segment_it != file_segments.end())
|
|
|
|
{
|
|
|
|
const auto & cell = segment_it->second;
|
|
|
|
if (range.right < cell.file_segment->range().left)
|
|
|
|
break;
|
2022-01-21 15:39:34 +00:00
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
useCell(cell, result, cache_lock);
|
|
|
|
++segment_it;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2022-03-21 11:30:25 +00:00
|
|
|
FileSegments LRUFileCache::splitRangeIntoCells(
|
|
|
|
const Key & key, size_t offset, size_t size, FileSegment::State state, std::lock_guard<std::mutex> & cache_lock)
|
2022-02-18 09:06:13 +00:00
|
|
|
{
|
|
|
|
assert(size > 0);
|
|
|
|
|
|
|
|
auto current_pos = offset;
|
|
|
|
auto end_pos_non_included = offset + size;
|
|
|
|
|
|
|
|
size_t current_cell_size;
|
|
|
|
size_t remaining_size = size;
|
|
|
|
|
|
|
|
FileSegments file_segments;
|
|
|
|
while (current_pos < end_pos_non_included)
|
|
|
|
{
|
|
|
|
current_cell_size = std::min(remaining_size, max_file_segment_size);
|
|
|
|
remaining_size -= current_cell_size;
|
|
|
|
|
2022-03-21 11:30:25 +00:00
|
|
|
auto * cell = addCell(key, current_pos, current_cell_size, state, cache_lock);
|
2022-02-18 09:06:13 +00:00
|
|
|
if (cell)
|
|
|
|
file_segments.push_back(cell->file_segment);
|
2022-03-21 11:30:25 +00:00
|
|
|
assert(cell);
|
2022-02-18 09:06:13 +00:00
|
|
|
|
|
|
|
current_pos += current_cell_size;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(file_segments.empty() || offset + size - 1 == file_segments.back()->range().right);
|
|
|
|
return file_segments;
|
|
|
|
}
|
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
void LRUFileCache::fillHolesWithEmptyFileSegments(
|
2022-05-04 15:12:35 +00:00
|
|
|
FileSegments & file_segments,
|
|
|
|
const Key & key,
|
|
|
|
const FileSegment::Range & range,
|
|
|
|
bool fill_with_detached_file_segments,
|
|
|
|
std::lock_guard<std::mutex> & cache_lock)
|
2022-04-11 15:51:49 +00:00
|
|
|
{
|
|
|
|
/// There are segments [segment1, ..., segmentN]
|
|
|
|
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
|
|
|
|
/// intersect with given range.
|
|
|
|
|
|
|
|
/// It can have holes:
|
|
|
|
/// [____________________] -- requested range
|
|
|
|
/// [____] [_] [_________] -- intersecting cache [segment1, ..., segmentN]
|
|
|
|
///
|
|
|
|
/// For each such hole create a cell with file segment state EMPTY.
|
|
|
|
|
|
|
|
auto it = file_segments.begin();
|
|
|
|
auto segment_range = (*it)->range();
|
|
|
|
|
|
|
|
size_t current_pos;
|
|
|
|
if (segment_range.left < range.left)
|
|
|
|
{
|
|
|
|
/// [_______ -- requested range
|
|
|
|
/// [_______
|
|
|
|
/// ^
|
|
|
|
/// segment1
|
|
|
|
|
|
|
|
current_pos = segment_range.right + 1;
|
|
|
|
++it;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
current_pos = range.left;
|
|
|
|
|
|
|
|
while (current_pos <= range.right && it != file_segments.end())
|
|
|
|
{
|
|
|
|
segment_range = (*it)->range();
|
|
|
|
|
|
|
|
if (current_pos == segment_range.left)
|
|
|
|
{
|
|
|
|
current_pos = segment_range.right + 1;
|
|
|
|
++it;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(current_pos < segment_range.left);
|
|
|
|
|
|
|
|
auto hole_size = segment_range.left - current_pos;
|
|
|
|
|
|
|
|
if (fill_with_detached_file_segments)
|
|
|
|
{
|
|
|
|
auto file_segment = std::make_shared<FileSegment>(current_pos, hole_size, key, this, FileSegment::State::EMPTY);
|
2022-05-03 17:17:54 +00:00
|
|
|
{
|
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
|
|
|
file_segment->markAsDetached(segment_lock);
|
|
|
|
}
|
2022-04-11 15:51:49 +00:00
|
|
|
file_segments.insert(it, file_segment);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
file_segments.splice(it, splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock));
|
|
|
|
}
|
|
|
|
|
|
|
|
current_pos = segment_range.right + 1;
|
|
|
|
++it;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (current_pos <= range.right)
|
|
|
|
{
|
|
|
|
/// ________] -- requested range
|
|
|
|
/// _____]
|
|
|
|
/// ^
|
|
|
|
/// segmentN
|
|
|
|
|
|
|
|
auto hole_size = range.right - current_pos + 1;
|
|
|
|
|
|
|
|
if (fill_with_detached_file_segments)
|
|
|
|
{
|
|
|
|
auto file_segment = std::make_shared<FileSegment>(current_pos, hole_size, key, this, FileSegment::State::EMPTY);
|
2022-05-03 17:17:54 +00:00
|
|
|
{
|
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
|
|
|
file_segment->markAsDetached(segment_lock);
|
|
|
|
}
|
2022-04-11 15:51:49 +00:00
|
|
|
file_segments.insert(file_segments.end(), file_segment);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-05-04 15:12:35 +00:00
|
|
|
file_segments.splice(
|
|
|
|
file_segments.end(), splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock));
|
2022-04-11 15:51:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size)
|
|
|
|
{
|
2022-03-09 09:36:52 +00:00
|
|
|
assertInitialized();
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
FileSegment::Range range(offset, offset + size - 1);
|
|
|
|
|
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
|
2022-03-21 18:48:13 +00:00
|
|
|
#ifndef NDEBUG
|
|
|
|
assertCacheCorrectness(key, cache_lock);
|
|
|
|
#endif
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
/// Get all segments which intersect with the given range.
|
|
|
|
auto file_segments = getImpl(key, range, cache_lock);
|
|
|
|
|
|
|
|
if (file_segments.empty())
|
|
|
|
{
|
2022-03-21 11:30:25 +00:00
|
|
|
file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-04-11 15:51:49 +00:00
|
|
|
fillHolesWithEmptyFileSegments(file_segments, key, range, false, cache_lock);
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
assert(!file_segments.empty());
|
|
|
|
return FileSegmentsHolder(std::move(file_segments));
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
FileSegmentsHolder LRUFileCache::get(const Key & key, size_t offset, size_t size)
|
|
|
|
{
|
|
|
|
assertInitialized();
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
FileSegment::Range range(offset, offset + size - 1);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
std::lock_guard cache_lock(mutex);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
#ifndef NDEBUG
|
|
|
|
assertCacheCorrectness(key, cache_lock);
|
|
|
|
#endif
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
/// Get all segments which intersect with the given range.
|
|
|
|
auto file_segments = getImpl(key, range, cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-04-11 15:51:49 +00:00
|
|
|
if (file_segments.empty())
|
|
|
|
{
|
|
|
|
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, FileSegment::State::EMPTY);
|
2022-05-03 17:17:54 +00:00
|
|
|
{
|
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
|
|
|
file_segment->markAsDetached(segment_lock);
|
|
|
|
}
|
2022-04-11 15:51:49 +00:00
|
|
|
file_segments = { file_segment };
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
fillHolesWithEmptyFileSegments(file_segments, key, range, true, cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return FileSegmentsHolder(std::move(file_segments));
|
|
|
|
}
|
|
|
|
|
2022-01-21 15:39:34 +00:00
|
|
|
LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
|
2022-01-26 18:43:23 +00:00
|
|
|
const Key & key, size_t offset, size_t size, FileSegment::State state,
|
2022-03-16 12:27:58 +00:00
|
|
|
std::lock_guard<std::mutex> & cache_lock)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-01-22 22:56:24 +00:00
|
|
|
/// Create a file segment cell and put it in `files` map by [key][offset].
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
if (!size)
|
|
|
|
return nullptr; /// Empty files are not cached.
|
|
|
|
|
|
|
|
if (files[key].contains(offset))
|
2022-03-16 12:27:58 +00:00
|
|
|
throw Exception(
|
2022-05-03 09:45:22 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
2022-03-14 16:33:29 +00:00
|
|
|
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
|
2022-05-05 14:11:26 +00:00
|
|
|
keyToStr(key), offset, size, dumpStructureUnlocked(key, cache_lock));
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-05-25 08:54:28 +00:00
|
|
|
auto skip_or_download = [&]() -> FileSegmentPtr
|
|
|
|
{
|
2022-05-26 08:36:05 +00:00
|
|
|
if (state == FileSegment::State::EMPTY && enable_cache_hits_threshold)
|
2022-05-25 08:54:28 +00:00
|
|
|
{
|
|
|
|
auto record = records.find({key, offset});
|
|
|
|
|
|
|
|
if (record == records.end())
|
|
|
|
{
|
|
|
|
auto queue_iter = stash_queue.add(key, offset, 0, cache_lock);
|
|
|
|
records.insert({{key, offset}, queue_iter});
|
|
|
|
|
|
|
|
if (stash_queue.getElementsNum(cache_lock) > max_stash_element_size)
|
|
|
|
{
|
|
|
|
auto remove_queue_iter = stash_queue.begin();
|
|
|
|
records.erase({remove_queue_iter->key, remove_queue_iter->offset});
|
|
|
|
stash_queue.remove(remove_queue_iter, cache_lock);
|
|
|
|
}
|
2022-05-25 09:24:38 +00:00
|
|
|
|
2022-05-25 08:54:28 +00:00
|
|
|
/// For segments that do not reach the download threshold, we do not download them, but directly read them
|
2022-05-26 08:36:05 +00:00
|
|
|
return std::make_shared<FileSegment>(offset, size, key, this, FileSegment::State::SKIP_CACHE);
|
2022-05-25 08:54:28 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto queue_iter = record->second;
|
|
|
|
queue_iter->hits++;
|
|
|
|
stash_queue.moveToEnd(queue_iter, cache_lock);
|
2022-05-25 09:24:38 +00:00
|
|
|
|
|
|
|
state = queue_iter->hits >= enable_cache_hits_threshold ? FileSegment::State::EMPTY : FileSegment::State::SKIP_CACHE;
|
|
|
|
return std::make_shared<FileSegment>(offset, size, key, this, state);
|
2022-05-25 08:54:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
return std::make_shared<FileSegment>(offset, size, key, this, state);
|
|
|
|
};
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-05-25 08:54:28 +00:00
|
|
|
FileSegmentCell cell(skip_or_download(), this, cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
auto & offsets = files[key];
|
|
|
|
|
|
|
|
if (offsets.empty())
|
|
|
|
{
|
2022-02-18 15:38:23 +00:00
|
|
|
auto key_path = getPathInLocalCache(key);
|
2022-01-13 11:57:56 +00:00
|
|
|
if (!fs::exists(key_path))
|
|
|
|
fs::create_directories(key_path);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto [it, inserted] = offsets.insert({offset, std::move(cell)});
|
|
|
|
if (!inserted)
|
2022-03-16 12:27:58 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Failed to insert into cache key: `{}`, offset: {}, size: {}",
|
|
|
|
keyToStr(key), offset, size);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
return &(it->second);
|
|
|
|
}
|
|
|
|
|
2022-05-10 17:50:43 +00:00
|
|
|
FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size)
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
|
|
|
|
#ifndef NDEBUG
|
|
|
|
assertCacheCorrectness(key, cache_lock);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
auto * cell = getCell(key, offset, cache_lock);
|
|
|
|
if (cell)
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
|
|
|
"Cache cell already exists for key `{}` and offset {}",
|
|
|
|
keyToStr(key), offset);
|
|
|
|
|
|
|
|
auto file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock);
|
|
|
|
return FileSegmentsHolder(std::move(file_segments));
|
|
|
|
}
|
|
|
|
|
2022-06-05 08:21:36 +00:00
|
|
|
bool LRUFileCache::tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
2022-06-08 06:19:50 +00:00
|
|
|
auto query_context = enable_filesystem_query_cache_limit ? getCurrentQueryContext(cache_lock) : nullptr;
|
2022-06-05 16:00:59 +00:00
|
|
|
|
2022-06-06 08:48:30 +00:00
|
|
|
/// If the context can be found, subsequent cache replacements are made through the Query context.
|
2022-06-08 06:19:50 +00:00
|
|
|
if (query_context)
|
2022-06-05 16:00:59 +00:00
|
|
|
{
|
|
|
|
auto res = tryReserveForQuery(key, offset, size, query_context, cache_lock);
|
2022-06-07 03:50:56 +00:00
|
|
|
switch (res)
|
2022-06-05 16:00:59 +00:00
|
|
|
{
|
2022-06-08 12:13:20 +00:00
|
|
|
case ReserveResult::FITS_IN_QUERY_LIMIT_AND_RESERVATION_COMPLETED :
|
2022-06-07 03:50:56 +00:00
|
|
|
{
|
|
|
|
/// When the maximum cache size of the query is reached, the cache will be
|
|
|
|
/// evicted from the history cache accessed by the current query.
|
|
|
|
return true;
|
|
|
|
}
|
2022-06-08 12:13:20 +00:00
|
|
|
case ReserveResult::EXCEEDS_QUERY_LIMIT :
|
2022-06-05 16:00:59 +00:00
|
|
|
{
|
|
|
|
/// The query currently does not have enough space to reserve.
|
|
|
|
/// It returns false and reads data directly from the remote fs.
|
|
|
|
return false;
|
|
|
|
}
|
2022-06-08 12:13:20 +00:00
|
|
|
case ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST :
|
2022-06-05 16:00:59 +00:00
|
|
|
{
|
|
|
|
/// When the maximum cache capacity of the request is not reached, the cache
|
|
|
|
/// block is evicted from the main LRU queue.
|
|
|
|
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
|
|
|
|
}
|
|
|
|
}
|
2022-06-07 03:50:56 +00:00
|
|
|
__builtin_unreachable();
|
2022-06-05 16:00:59 +00:00
|
|
|
}
|
|
|
|
else
|
2022-06-07 17:42:46 +00:00
|
|
|
{
|
2022-06-05 16:00:59 +00:00
|
|
|
return tryReserveForMainList(key, offset, size, query_context, cache_lock);
|
2022-06-07 17:42:46 +00:00
|
|
|
}
|
2022-06-05 16:00:59 +00:00
|
|
|
}
|
2022-06-05 08:21:36 +00:00
|
|
|
|
2022-06-05 16:00:59 +00:00
|
|
|
LRUFileCache::ReserveResult LRUFileCache::tryReserveForQuery(const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
|
|
|
/// The maximum cache capacity of the request is not reached, thus the
|
|
|
|
//// cache block is evicted from the main LRU queue by tryReserveForMainList().
|
2022-06-10 04:28:22 +00:00
|
|
|
if (query_context->getCacheSize() + size <= query_context->getMaxCacheSize())
|
2022-06-05 08:21:36 +00:00
|
|
|
{
|
2022-06-08 12:13:20 +00:00
|
|
|
return ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST;
|
2022-06-05 08:21:36 +00:00
|
|
|
}
|
2022-06-05 18:36:23 +00:00
|
|
|
/// When skip_download_if_exceeds_query_cache is true, there is no need
|
|
|
|
/// to evict old data, skip the cache and read directly from remote fs.
|
2022-06-05 19:21:56 +00:00
|
|
|
else if (query_context->isSkipDownloadIfExceed())
|
2022-06-05 18:36:23 +00:00
|
|
|
{
|
2022-06-08 12:13:20 +00:00
|
|
|
return ReserveResult::EXCEEDS_QUERY_LIMIT;
|
2022-06-05 18:36:23 +00:00
|
|
|
}
|
2022-06-05 16:00:59 +00:00
|
|
|
/// The maximum cache size of the query is reached, the cache will be
|
|
|
|
/// evicted from the history cache accessed by the current query.
|
2022-06-05 08:21:36 +00:00
|
|
|
else
|
|
|
|
{
|
2022-06-05 16:00:59 +00:00
|
|
|
size_t removed_size = 0;
|
|
|
|
size_t queue_size = queue.getElementsNum(cache_lock);
|
|
|
|
|
|
|
|
auto * cell_for_reserve = getCell(key, offset, cache_lock);
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
std::vector<IFileCache::LRUQueue::Iterator> ghost;
|
2022-06-05 16:00:59 +00:00
|
|
|
std::vector<FileSegmentCell *> trash;
|
|
|
|
std::vector<FileSegmentCell *> to_evict;
|
|
|
|
|
|
|
|
auto is_overflow = [&]
|
|
|
|
{
|
|
|
|
return (max_size != 0 && queue.getTotalWeight(cache_lock) + size - removed_size > max_size)
|
|
|
|
|| (max_element_size != 0 && queue_size > max_element_size)
|
2022-06-08 12:13:20 +00:00
|
|
|
|| (query_context->getCacheSize() + size - removed_size > query_context->getMaxCacheSize());
|
2022-06-05 16:00:59 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
/// Select the cache from the LRU queue held by query for expulsion.
|
|
|
|
for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++)
|
2022-06-05 08:21:36 +00:00
|
|
|
{
|
2022-06-05 16:00:59 +00:00
|
|
|
if (!is_overflow())
|
|
|
|
break;
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
auto * cell = getCell(iter->key, iter->offset, cache_lock);
|
2022-06-05 16:00:59 +00:00
|
|
|
|
|
|
|
if (!cell)
|
|
|
|
{
|
2022-06-05 17:46:27 +00:00
|
|
|
/// The cache corresponding to this record may be swapped out by
|
|
|
|
/// other queries, so it has become invalid.
|
|
|
|
ghost.push_back(iter);
|
|
|
|
removed_size += iter->size;
|
2022-06-05 16:00:59 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
size_t cell_size = cell->size();
|
|
|
|
assert(iter->size == cell_size);
|
|
|
|
|
|
|
|
if (cell->releasable())
|
|
|
|
{
|
|
|
|
auto & file_segment = cell->file_segment;
|
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
|
|
|
|
|
|
|
switch (file_segment->download_state)
|
|
|
|
{
|
|
|
|
case FileSegment::State::DOWNLOADED:
|
|
|
|
{
|
|
|
|
to_evict.push_back(cell);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
{
|
|
|
|
trash.push_back(cell);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
removed_size += cell_size;
|
|
|
|
--queue_size;
|
|
|
|
}
|
|
|
|
}
|
2022-06-05 08:21:36 +00:00
|
|
|
}
|
2022-06-05 16:00:59 +00:00
|
|
|
|
|
|
|
assert(trash.empty());
|
|
|
|
for (auto & cell : trash)
|
2022-06-05 08:21:36 +00:00
|
|
|
{
|
2022-06-05 16:00:59 +00:00
|
|
|
auto file_segment = cell->file_segment;
|
|
|
|
if (file_segment)
|
2022-06-05 08:21:36 +00:00
|
|
|
{
|
2022-06-05 18:36:23 +00:00
|
|
|
query_context->remove(file_segment->key(), file_segment->offset(), cell->size(), cache_lock);
|
2022-06-05 17:29:49 +00:00
|
|
|
|
2022-06-05 16:00:59 +00:00
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
|
|
|
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
2022-06-05 08:21:36 +00:00
|
|
|
}
|
2022-06-05 16:00:59 +00:00
|
|
|
}
|
|
|
|
|
2022-06-05 17:46:27 +00:00
|
|
|
for (auto & iter : ghost)
|
2022-06-05 18:36:23 +00:00
|
|
|
query_context->remove(iter->key, iter->offset, iter->size, cache_lock);
|
2022-06-05 17:46:27 +00:00
|
|
|
|
2022-06-05 16:00:59 +00:00
|
|
|
if (is_overflow())
|
|
|
|
{
|
2022-06-08 12:13:20 +00:00
|
|
|
return ReserveResult::EXCEEDS_QUERY_LIMIT;
|
2022-06-05 16:00:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (cell_for_reserve)
|
|
|
|
{
|
|
|
|
auto queue_iterator = cell_for_reserve->queue_iterator;
|
|
|
|
if (queue_iterator)
|
|
|
|
queue.incrementSize(*queue_iterator, size, cache_lock);
|
2022-06-05 08:21:36 +00:00
|
|
|
else
|
2022-06-05 16:00:59 +00:00
|
|
|
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
|
2022-06-05 08:21:36 +00:00
|
|
|
}
|
2022-06-05 16:00:59 +00:00
|
|
|
|
|
|
|
for (auto & cell : to_evict)
|
|
|
|
{
|
|
|
|
auto file_segment = cell->file_segment;
|
|
|
|
if (file_segment)
|
|
|
|
{
|
2022-06-05 18:36:23 +00:00
|
|
|
query_context->remove(file_segment->key(), file_segment->offset(), cell->size(), cache_lock);
|
2022-06-05 17:29:49 +00:00
|
|
|
|
2022-06-05 16:00:59 +00:00
|
|
|
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
|
|
|
|
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
query_context->reserve(key, offset, size, cache_lock);
|
2022-06-08 12:13:20 +00:00
|
|
|
return ReserveResult::FITS_IN_QUERY_LIMIT_NEED_RESERVE_FROM_MAIN_LIST;
|
2022-06-05 08:21:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool LRUFileCache::tryReserveForMainList(
|
2022-06-05 16:00:59 +00:00
|
|
|
const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard<std::mutex> & cache_lock)
|
2022-01-21 15:39:34 +00:00
|
|
|
{
|
2022-01-23 16:51:18 +00:00
|
|
|
auto removed_size = 0;
|
2022-05-04 15:12:35 +00:00
|
|
|
size_t queue_size = queue.getElementsNum(cache_lock);
|
2022-01-23 16:51:18 +00:00
|
|
|
assert(queue_size <= max_element_size);
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-01-30 11:35:28 +00:00
|
|
|
/// Since space reservation is incremental, cache cell already exists if it's state is EMPTY.
|
2022-01-23 16:51:18 +00:00
|
|
|
/// And it cache cell does not exist on startup -- as we first check for space and then add a cell.
|
2022-05-05 14:11:26 +00:00
|
|
|
auto * cell_for_reserve = getCell(key, offset, cache_lock);
|
2022-01-22 22:56:24 +00:00
|
|
|
|
2022-01-23 16:51:18 +00:00
|
|
|
/// A cell acquires a LRUQueue iterator on first successful space reservation attempt.
|
2022-02-21 12:54:03 +00:00
|
|
|
/// cell_for_reserve can be nullptr here when we call tryReserve() from loadCacheInfoIntoMemory().
|
2022-01-23 16:51:18 +00:00
|
|
|
if (!cell_for_reserve || !cell_for_reserve->queue_iterator)
|
|
|
|
queue_size += 1;
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
auto is_overflow = [&]
|
|
|
|
{
|
2022-03-21 13:56:38 +00:00
|
|
|
/// max_size == 0 means unlimited cache size, max_element_size means unlimited number of cache elements.
|
2022-05-04 15:12:35 +00:00
|
|
|
return (max_size != 0 && queue.getTotalWeight(cache_lock) + size - removed_size > max_size)
|
2022-01-13 11:57:56 +00:00
|
|
|
|| (max_element_size != 0 && queue_size > max_element_size);
|
|
|
|
};
|
|
|
|
|
2022-02-23 10:12:14 +00:00
|
|
|
std::vector<FileSegmentCell *> to_evict;
|
2022-05-04 15:12:35 +00:00
|
|
|
std::vector<FileSegmentCell *> trash;
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-05-25 13:05:15 +00:00
|
|
|
for (const auto & [entry_key, entry_offset, entry_size, _] : queue)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-05-04 15:12:35 +00:00
|
|
|
if (!is_overflow())
|
2022-05-05 14:11:26 +00:00
|
|
|
break;
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-05-05 14:11:26 +00:00
|
|
|
auto * cell = getCell(entry_key, entry_offset, cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
if (!cell)
|
2022-05-05 14:11:26 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
|
|
|
"Cache became inconsistent. Key: {}, offset: {}",
|
|
|
|
keyToStr(key), offset);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
size_t cell_size = cell->size();
|
2022-05-05 14:11:26 +00:00
|
|
|
assert(entry_size == cell_size);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
/// It is guaranteed that cell is not removed from cache as long as
|
|
|
|
/// pointer to corresponding file segment is hold by any other thread.
|
|
|
|
|
|
|
|
if (cell->releasable())
|
|
|
|
{
|
2022-02-23 10:12:14 +00:00
|
|
|
auto & file_segment = cell->file_segment;
|
|
|
|
|
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
|
|
|
|
|
|
|
switch (file_segment->download_state)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
case FileSegment::State::DOWNLOADED:
|
|
|
|
{
|
|
|
|
/// Cell will actually be removed only if
|
|
|
|
/// we managed to reserve enough space.
|
|
|
|
|
2022-02-23 10:12:14 +00:00
|
|
|
to_evict.push_back(cell);
|
2022-01-21 15:39:34 +00:00
|
|
|
break;
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
default:
|
|
|
|
{
|
2022-05-04 15:12:35 +00:00
|
|
|
trash.push_back(cell);
|
2022-01-13 11:57:56 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
removed_size += cell_size;
|
|
|
|
--queue_size;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
|
2022-05-05 14:11:26 +00:00
|
|
|
/// This case is very unlikely, can happen in case of exception from
|
|
|
|
/// file_segment->complete(), which would be a logical error.
|
2022-05-04 15:12:35 +00:00
|
|
|
assert(trash.empty());
|
|
|
|
for (auto & cell : trash)
|
|
|
|
{
|
|
|
|
auto file_segment = cell->file_segment;
|
|
|
|
if (file_segment)
|
|
|
|
{
|
2022-05-05 14:11:26 +00:00
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
2022-05-04 15:12:35 +00:00
|
|
|
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (is_overflow())
|
|
|
|
return false;
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
/// cache cell is nullptr on server startup because we first check for space and then add a cell.
|
|
|
|
if (cell_for_reserve)
|
|
|
|
{
|
|
|
|
/// queue_iteratir is std::nullopt here if no space has been reserved yet, a cache cell
|
|
|
|
/// acquires queue iterator on first successful space reservation attempt.
|
|
|
|
/// If queue iterator already exists, we need to update the size after each space reservation.
|
|
|
|
auto queue_iterator = cell_for_reserve->queue_iterator;
|
|
|
|
if (queue_iterator)
|
|
|
|
queue.incrementSize(*queue_iterator, size, cache_lock);
|
|
|
|
else
|
2022-05-05 14:11:26 +00:00
|
|
|
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
2022-01-23 16:51:18 +00:00
|
|
|
|
2022-02-23 10:12:14 +00:00
|
|
|
for (auto & cell : to_evict)
|
|
|
|
{
|
2022-03-01 16:00:54 +00:00
|
|
|
auto file_segment = cell->file_segment;
|
2022-02-24 14:20:51 +00:00
|
|
|
if (file_segment)
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
|
|
|
|
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
|
|
|
}
|
2022-02-23 10:12:14 +00:00
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
if (queue.getTotalWeight(cache_lock) > (1ull << 63))
|
2022-02-18 15:38:23 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-06-05 16:00:59 +00:00
|
|
|
if (query_context)
|
|
|
|
query_context->reserve(key, offset, size, cache_lock);
|
2022-06-05 17:29:49 +00:00
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2022-03-07 13:30:57 +00:00
|
|
|
void LRUFileCache::remove(const Key & key)
|
|
|
|
{
|
2022-03-09 09:36:52 +00:00
|
|
|
assertInitialized();
|
|
|
|
|
2022-03-07 13:30:57 +00:00
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
|
|
|
|
auto it = files.find(key);
|
|
|
|
if (it == files.end())
|
|
|
|
return;
|
|
|
|
|
|
|
|
auto & offsets = it->second;
|
|
|
|
|
|
|
|
std::vector<FileSegmentCell *> to_remove;
|
|
|
|
to_remove.reserve(offsets.size());
|
|
|
|
|
|
|
|
for (auto & [offset, cell] : offsets)
|
|
|
|
to_remove.push_back(&cell);
|
|
|
|
|
2022-06-06 18:51:58 +00:00
|
|
|
bool some_cells_were_skipped = false;
|
2022-03-07 13:30:57 +00:00
|
|
|
for (auto & cell : to_remove)
|
|
|
|
{
|
2022-06-06 11:32:58 +00:00
|
|
|
/// In ordinary case we remove data from cache when it's not used by anyone.
|
|
|
|
/// But if we have multiple replicated zero-copy tables on the same server
|
|
|
|
/// it became possible to start removing something from cache when it is used
|
|
|
|
/// by other "zero-copy" tables. That is why it's not an error.
|
2022-03-10 09:56:48 +00:00
|
|
|
if (!cell->releasable())
|
2022-06-06 18:51:58 +00:00
|
|
|
{
|
|
|
|
some_cells_were_skipped = true;
|
2022-06-06 11:32:58 +00:00
|
|
|
continue;
|
2022-06-06 18:51:58 +00:00
|
|
|
}
|
2022-03-10 09:56:48 +00:00
|
|
|
|
2022-03-07 13:30:57 +00:00
|
|
|
auto file_segment = cell->file_segment;
|
|
|
|
if (file_segment)
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
|
|
|
|
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
auto key_path = getPathInLocalCache(key);
|
|
|
|
|
2022-06-06 18:51:58 +00:00
|
|
|
if (!some_cells_were_skipped)
|
2022-06-06 11:32:58 +00:00
|
|
|
{
|
|
|
|
files.erase(key);
|
2022-03-07 13:30:57 +00:00
|
|
|
|
2022-06-06 11:32:58 +00:00
|
|
|
if (fs::exists(key_path))
|
|
|
|
fs::remove(key_path);
|
|
|
|
}
|
2022-03-07 13:30:57 +00:00
|
|
|
}
|
|
|
|
|
2022-05-10 17:50:43 +00:00
|
|
|
void LRUFileCache::remove()
|
2022-03-21 11:30:25 +00:00
|
|
|
{
|
|
|
|
/// Try remove all cached files by cache_base_path.
|
|
|
|
/// Only releasable file segments are evicted.
|
|
|
|
|
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
std::vector<FileSegment *> to_remove;
|
2022-03-21 11:30:25 +00:00
|
|
|
for (auto it = queue.begin(); it != queue.end();)
|
|
|
|
{
|
2022-05-25 13:05:15 +00:00
|
|
|
const auto & [key, offset, size, _] = *it++;
|
2022-03-21 11:30:25 +00:00
|
|
|
auto * cell = getCell(key, offset, cache_lock);
|
2022-04-25 14:43:15 +00:00
|
|
|
if (!cell)
|
2022-05-04 15:12:35 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Cache is in inconsistent state: LRU queue contains entries with no cache cell");
|
2022-04-25 14:43:15 +00:00
|
|
|
|
2022-05-10 17:50:43 +00:00
|
|
|
if (cell->releasable())
|
2022-03-21 11:30:25 +00:00
|
|
|
{
|
|
|
|
auto file_segment = cell->file_segment;
|
|
|
|
if (file_segment)
|
|
|
|
{
|
2022-05-04 12:18:52 +00:00
|
|
|
std::lock_guard segment_lock(file_segment->mutex);
|
2022-05-10 17:50:43 +00:00
|
|
|
file_segment->detach(cache_lock, segment_lock);
|
2022-03-21 11:30:25 +00:00
|
|
|
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-05-26 08:36:05 +00:00
|
|
|
|
|
|
|
/// Remove all access information.
|
|
|
|
records.clear();
|
|
|
|
stash_queue.removeAll(cache_lock);
|
2022-03-21 11:30:25 +00:00
|
|
|
}
|
|
|
|
|
2022-01-21 15:39:34 +00:00
|
|
|
void LRUFileCache::remove(
|
2022-02-23 10:12:14 +00:00
|
|
|
Key key, size_t offset,
|
|
|
|
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-01-21 15:39:34 +00:00
|
|
|
LOG_TEST(log, "Remove. Key: {}, offset: {}", keyToStr(key), offset);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-01-23 17:33:22 +00:00
|
|
|
auto * cell = getCell(key, offset, cache_lock);
|
|
|
|
if (!cell)
|
2022-05-03 09:45:22 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset);
|
2022-01-23 17:33:22 +00:00
|
|
|
|
|
|
|
if (cell->queue_iterator)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
queue.remove(*cell->queue_iterator, cache_lock);
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-01-21 15:39:34 +00:00
|
|
|
auto & offsets = files[key];
|
|
|
|
offsets.erase(offset);
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-02-18 15:38:23 +00:00
|
|
|
auto cache_file_path = getPathInLocalCache(key, offset);
|
2022-01-13 11:57:56 +00:00
|
|
|
if (fs::exists(cache_file_path))
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
fs::remove(cache_file_path);
|
|
|
|
|
2022-03-01 17:12:34 +00:00
|
|
|
if (is_initialized && offsets.empty())
|
2022-02-18 15:38:23 +00:00
|
|
|
{
|
|
|
|
auto key_path = getPathInLocalCache(key);
|
|
|
|
|
|
|
|
files.erase(key);
|
|
|
|
|
|
|
|
if (fs::exists(key_path))
|
|
|
|
fs::remove(key_path);
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2022-02-18 15:38:23 +00:00
|
|
|
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
2022-01-13 11:57:56 +00:00
|
|
|
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
|
|
|
|
keyToStr(key), offset, cache_file_path, getCurrentExceptionMessage(false));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-28 10:47:02 +00:00
|
|
|
void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
Key key;
|
2022-03-30 15:12:23 +00:00
|
|
|
UInt64 offset = 0;
|
|
|
|
size_t size = 0;
|
2022-05-04 15:12:35 +00:00
|
|
|
std::vector<std::pair<LRUQueue::Iterator, std::weak_ptr<FileSegment>>> queue_entries;
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
/// cache_base_path / key_prefix / key / offset
|
|
|
|
|
|
|
|
fs::directory_iterator key_prefix_it{cache_base_path};
|
|
|
|
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it)
|
|
|
|
{
|
|
|
|
fs::directory_iterator key_it{key_prefix_it->path()};
|
|
|
|
for (; key_it != fs::directory_iterator(); ++key_it)
|
|
|
|
{
|
2022-01-21 15:39:34 +00:00
|
|
|
key = unhexUInt<UInt128>(key_it->path().filename().string().data());
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
fs::directory_iterator offset_it{key_it->path()};
|
|
|
|
for (; offset_it != fs::directory_iterator(); ++offset_it)
|
|
|
|
{
|
|
|
|
bool parsed = tryParse<UInt64>(offset, offset_it->path().filename());
|
|
|
|
if (!parsed)
|
2022-01-23 16:51:18 +00:00
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Unexpected file: ", offset_it->path().string());
|
2022-02-15 09:11:33 +00:00
|
|
|
continue; /// Or just remove? Some unexpected file.
|
2022-01-23 16:51:18 +00:00
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
|
|
|
size = offset_it->file_size();
|
2022-01-23 16:51:18 +00:00
|
|
|
if (!size)
|
|
|
|
{
|
|
|
|
fs::remove(offset_it->path());
|
|
|
|
continue;
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-01-23 16:51:18 +00:00
|
|
|
if (tryReserve(key, offset, size, cache_lock))
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-01-23 16:51:18 +00:00
|
|
|
auto * cell = addCell(key, offset, size, FileSegment::State::DOWNLOADED, cache_lock);
|
|
|
|
if (cell)
|
2022-04-26 18:31:42 +00:00
|
|
|
queue_entries.emplace_back(*cell->queue_iterator, cell->file_segment);
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_WARNING(log,
|
|
|
|
"Cache capacity changed (max size: {}, available: {}), cached file `{}` does not fit in cache anymore (size: {})",
|
2022-05-05 14:11:26 +00:00
|
|
|
max_size, getAvailableCacheSizeUnlocked(cache_lock), key_it->path().string(), size);
|
2022-01-23 16:51:18 +00:00
|
|
|
fs::remove(offset_it->path());
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
|
|
|
|
pcg64 generator(randomSeed());
|
2022-04-26 18:31:42 +00:00
|
|
|
std::shuffle(queue_entries.begin(), queue_entries.end(), generator);
|
|
|
|
for (const auto & [it, file_segment] : queue_entries)
|
2022-03-11 11:57:57 +00:00
|
|
|
{
|
|
|
|
/// Cell cache size changed and, for example, 1st file segment fits into cache
|
|
|
|
/// and 2nd file segment will fit only if first was evicted, then first will be removed and
|
|
|
|
/// cell is nullptr here.
|
2022-04-26 18:31:42 +00:00
|
|
|
if (file_segment.expired())
|
|
|
|
continue;
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
queue.moveToEnd(it, cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
2022-05-04 15:12:35 +00:00
|
|
|
#ifndef NDEBUG
|
|
|
|
assertCacheCorrectness(cache_lock);
|
|
|
|
#endif
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
void LRUFileCache::reduceSizeToDownloaded(
|
2022-02-23 10:12:14 +00:00
|
|
|
const Key & key, size_t offset,
|
|
|
|
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
2022-01-21 15:39:34 +00:00
|
|
|
{
|
2022-01-22 22:56:24 +00:00
|
|
|
/**
|
|
|
|
* In case file was partially downloaded and it's download cannot be continued
|
|
|
|
* because of no space left in cache, we need to be able to cut cell's size to downloaded_size.
|
|
|
|
*/
|
2022-01-21 15:39:34 +00:00
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
auto * cell = getCell(key, offset, cache_lock);
|
2022-01-21 15:39:34 +00:00
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
if (!cell)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"No cell found for key: {}, offset: {}",
|
|
|
|
keyToStr(key), offset);
|
|
|
|
}
|
2022-01-21 15:39:34 +00:00
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
const auto & file_segment = cell->file_segment;
|
2022-01-21 15:39:34 +00:00
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
size_t downloaded_size = file_segment->downloaded_size;
|
|
|
|
if (downloaded_size == file_segment->range().size())
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Nothing to reduce, file segment fully downloaded, key: {}, offset: {}",
|
|
|
|
keyToStr(key), offset);
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED);
|
2022-01-21 15:39:34 +00:00
|
|
|
}
|
|
|
|
|
2022-01-23 16:51:18 +00:00
|
|
|
bool LRUFileCache::isLastFileSegmentHolder(
|
2022-02-23 10:12:14 +00:00
|
|
|
const Key & key, size_t offset,
|
|
|
|
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
|
2022-01-21 15:39:34 +00:00
|
|
|
{
|
2022-01-23 16:51:18 +00:00
|
|
|
auto * cell = getCell(key, offset, cache_lock);
|
|
|
|
|
|
|
|
if (!cell)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cell found for key: {}, offset: {}", keyToStr(key), offset);
|
|
|
|
|
2022-02-15 09:11:33 +00:00
|
|
|
/// The caller of this method is last file segment holder if use count is 2 (the second pointer is cache itself)
|
2022-01-23 16:51:18 +00:00
|
|
|
return cell->file_segment.use_count() == 2;
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
2022-04-07 16:46:46 +00:00
|
|
|
FileSegments LRUFileCache::getSnapshot() const
|
2022-03-21 11:30:25 +00:00
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
|
|
|
|
FileSegments file_segments;
|
2022-03-23 12:01:18 +00:00
|
|
|
|
|
|
|
for (const auto & [key, cells_by_offset] : files)
|
2022-03-21 11:30:25 +00:00
|
|
|
{
|
2022-03-23 12:01:18 +00:00
|
|
|
for (const auto & [offset, cell] : cells_by_offset)
|
2022-04-07 23:58:55 +00:00
|
|
|
file_segments.push_back(FileSegment::getSnapshot(cell.file_segment, cache_lock));
|
2022-03-21 11:30:25 +00:00
|
|
|
}
|
2022-04-07 16:46:46 +00:00
|
|
|
return file_segments;
|
2022-03-21 11:30:25 +00:00
|
|
|
}
|
|
|
|
|
2022-03-23 12:01:18 +00:00
|
|
|
std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
|
|
|
|
|
|
|
std::vector<String> cache_paths;
|
2022-03-30 11:47:44 +00:00
|
|
|
|
2022-03-23 12:01:18 +00:00
|
|
|
const auto & cells_by_offset = files[key];
|
|
|
|
|
|
|
|
for (const auto & [offset, cell] : cells_by_offset)
|
2022-03-23 17:11:52 +00:00
|
|
|
{
|
2022-03-23 12:01:18 +00:00
|
|
|
if (cell.file_segment->state() == FileSegment::State::DOWNLOADED)
|
|
|
|
cache_paths.push_back(getPathInLocalCache(key, offset));
|
2022-03-23 17:11:52 +00:00
|
|
|
}
|
2022-03-23 12:01:18 +00:00
|
|
|
|
|
|
|
return cache_paths;
|
|
|
|
}
|
|
|
|
|
2022-05-03 17:55:35 +00:00
|
|
|
size_t LRUFileCache::getUsedCacheSize() const
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
2022-05-05 14:11:26 +00:00
|
|
|
return getUsedCacheSizeUnlocked(cache_lock);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
|
|
|
|
2022-05-05 14:11:26 +00:00
|
|
|
size_t LRUFileCache::getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
return queue.getTotalWeight(cache_lock);
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t LRUFileCache::getAvailableCacheSize() const
|
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
2022-05-05 14:11:26 +00:00
|
|
|
return getAvailableCacheSizeUnlocked(cache_lock);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
|
|
|
|
2022-05-05 14:11:26 +00:00
|
|
|
size_t LRUFileCache::getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
2022-05-05 14:11:26 +00:00
|
|
|
return max_size - getUsedCacheSizeUnlocked(cache_lock);
|
2022-05-03 17:55:35 +00:00
|
|
|
}
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
size_t LRUFileCache::getFileSegmentsNum() const
|
2022-05-03 17:55:35 +00:00
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
2022-05-05 14:11:26 +00:00
|
|
|
return getFileSegmentsNumUnlocked(cache_lock);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
|
|
|
|
2022-05-05 14:11:26 +00:00
|
|
|
size_t LRUFileCache::getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
return queue.getElementsNum(cache_lock);
|
2022-05-03 17:55:35 +00:00
|
|
|
}
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
LRUFileCache::FileSegmentCell::FileSegmentCell(
|
|
|
|
FileSegmentPtr file_segment_,
|
|
|
|
LRUFileCache * cache,
|
|
|
|
std::lock_guard<std::mutex> & cache_lock)
|
2022-01-22 22:56:24 +00:00
|
|
|
: file_segment(file_segment_)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-01-22 22:56:24 +00:00
|
|
|
/**
|
|
|
|
* Cell can be created with either DOWNLOADED or EMPTY file segment's state.
|
2022-01-23 16:51:18 +00:00
|
|
|
* File segment acquires DOWNLOADING state and creates LRUQueue iterator on first
|
2022-01-22 22:56:24 +00:00
|
|
|
* successful getOrSetDownaloder call.
|
|
|
|
*/
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-01-22 22:56:24 +00:00
|
|
|
switch (file_segment->download_state)
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-01-22 22:56:24 +00:00
|
|
|
case FileSegment::State::DOWNLOADED:
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
2022-05-04 15:12:35 +00:00
|
|
|
queue_iterator = cache->queue.add(file_segment->key(), file_segment->offset(), file_segment->range().size(), cache_lock);
|
2022-01-13 11:57:56 +00:00
|
|
|
break;
|
|
|
|
}
|
2022-05-25 08:54:28 +00:00
|
|
|
case FileSegment::State::SKIP_CACHE:
|
2022-01-22 22:56:24 +00:00
|
|
|
case FileSegment::State::EMPTY:
|
2022-03-21 11:30:25 +00:00
|
|
|
case FileSegment::State::DOWNLOADING:
|
2022-01-13 11:57:56 +00:00
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
default:
|
2022-02-18 15:38:23 +00:00
|
|
|
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
2022-03-21 11:30:25 +00:00
|
|
|
"Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}",
|
2022-01-26 18:43:23 +00:00
|
|
|
FileSegment::stateToString(file_segment->download_state));
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
2022-01-21 15:39:34 +00:00
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
IFileCache::LRUQueue::Iterator IFileCache::LRUQueue::add(
|
2022-05-04 15:12:35 +00:00
|
|
|
const IFileCache::Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & /* cache_lock */)
|
|
|
|
{
|
|
|
|
#ifndef NDEBUG
|
2022-05-25 14:58:47 +00:00
|
|
|
for (const auto & [entry_key, entry_offset, entry_size, entry_hits] : queue)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
2022-05-05 22:07:22 +00:00
|
|
|
if (entry_key == key && entry_offset == offset)
|
2022-05-04 15:12:35 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Attempt to add duplicate queue entry to queue. (Key: {}, offset: {}, size: {})",
|
|
|
|
keyToStr(key), offset, size);
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
cache_size += size;
|
|
|
|
return queue.insert(queue.end(), FileKeyAndOffset(key, offset, size));
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
void IFileCache::LRUQueue::remove(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
cache_size -= queue_it->size;
|
|
|
|
queue.erase(queue_it);
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
void IFileCache::LRUQueue::removeAll(std::lock_guard<std::mutex> & /* cache_lock */)
|
2022-05-26 08:36:05 +00:00
|
|
|
{
|
|
|
|
queue.clear();
|
|
|
|
cache_size = 0;
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
void IFileCache::LRUQueue::moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & /* cache_lock */)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
queue.splice(queue.end(), queue, queue_it);
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
void IFileCache::LRUQueue::incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & /* cache_lock */)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
cache_size += size_increment;
|
|
|
|
queue_it->size += size_increment;
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
bool IFileCache::LRUQueue::contains(
|
2022-05-04 15:12:35 +00:00
|
|
|
const IFileCache::Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */) const
|
|
|
|
{
|
|
|
|
/// This method is used for assertions in debug mode.
|
|
|
|
/// So we do not care about complexity here.
|
2022-05-25 13:05:15 +00:00
|
|
|
for (const auto & [entry_key, entry_offset, size, _] : queue)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
2022-05-05 22:07:22 +00:00
|
|
|
if (key == entry_key && offset == entry_offset)
|
2022-05-04 15:12:35 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
String IFileCache::LRUQueue::toString(std::lock_guard<std::mutex> & /* cache_lock */) const
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
String result;
|
2022-05-25 13:05:15 +00:00
|
|
|
for (const auto & [key, offset, size, _] : queue)
|
2022-05-04 15:12:35 +00:00
|
|
|
{
|
|
|
|
if (!result.empty())
|
|
|
|
result += ", ";
|
|
|
|
result += fmt::format("{}: [{}, {}]", keyToStr(key), offset, offset + size - 1);
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2022-03-21 18:48:13 +00:00
|
|
|
String LRUFileCache::dumpStructure(const Key & key)
|
2022-01-21 15:39:34 +00:00
|
|
|
{
|
|
|
|
std::lock_guard cache_lock(mutex);
|
2022-05-05 14:11:26 +00:00
|
|
|
return dumpStructureUnlocked(key, cache_lock);
|
2022-03-16 12:27:58 +00:00
|
|
|
}
|
2022-01-21 15:39:34 +00:00
|
|
|
|
2022-05-05 14:11:26 +00:00
|
|
|
String LRUFileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mutex> & cache_lock)
|
2022-01-21 15:39:34 +00:00
|
|
|
{
|
|
|
|
WriteBufferFromOwnString result;
|
2022-03-21 18:48:13 +00:00
|
|
|
const auto & cells_by_offset = files[key];
|
|
|
|
|
|
|
|
for (const auto & [offset, cell] : cells_by_offset)
|
|
|
|
result << cell.file_segment->getInfoForLog() << "\n";
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
result << "\n\nQueue: " << queue.toString(cache_lock);
|
2022-03-21 18:48:13 +00:00
|
|
|
return result.str();
|
|
|
|
}
|
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
void LRUFileCache::assertCacheCellsCorrectness(
|
|
|
|
const FileSegmentsByOffset & cells_by_offset, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
|
2022-03-21 18:48:13 +00:00
|
|
|
{
|
|
|
|
for (const auto & [_, cell] : cells_by_offset)
|
2022-01-21 15:39:34 +00:00
|
|
|
{
|
2022-03-21 18:48:13 +00:00
|
|
|
const auto & file_segment = cell.file_segment;
|
|
|
|
file_segment->assertCorrectness();
|
2022-05-04 15:12:35 +00:00
|
|
|
|
|
|
|
if (file_segment->reserved_size != 0)
|
|
|
|
{
|
|
|
|
assert(cell.queue_iterator);
|
2022-05-30 14:06:58 +00:00
|
|
|
assert(queue.contains(file_segment->key(), file_segment->offset(), cache_lock));
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
2022-01-21 15:39:34 +00:00
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
|
2022-05-04 15:12:35 +00:00
|
|
|
void LRUFileCache::assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
|
|
|
assertCacheCellsCorrectness(files[key], cache_lock);
|
2022-06-07 11:10:57 +00:00
|
|
|
assertQueueCorrectness(cache_lock);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void LRUFileCache::assertCacheCorrectness(std::lock_guard<std::mutex> & cache_lock)
|
|
|
|
{
|
|
|
|
for (const auto & [key, cells_by_offset] : files)
|
|
|
|
assertCacheCellsCorrectness(files[key], cache_lock);
|
2022-06-07 11:10:57 +00:00
|
|
|
assertQueueCorrectness(cache_lock);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
void LRUFileCache::assertQueueCorrectness(std::lock_guard<std::mutex> & cache_lock)
|
2022-06-05 08:21:36 +00:00
|
|
|
{
|
2022-06-07 11:10:57 +00:00
|
|
|
[[maybe_unused]] size_t total_size = 0;
|
|
|
|
for (auto it = queue.begin(); it != queue.end();)
|
|
|
|
{
|
|
|
|
auto & [key, offset, size, _] = *it++;
|
2022-06-05 16:00:59 +00:00
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
auto * cell = getCell(key, offset, cache_lock);
|
|
|
|
if (!cell)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Cache is in inconsistent state: LRU queue contains entries with no cache cell (assertCorrectness())");
|
|
|
|
}
|
2022-06-05 08:21:36 +00:00
|
|
|
|
2022-06-07 11:10:57 +00:00
|
|
|
assert(cell->size() == size);
|
|
|
|
total_size += size;
|
|
|
|
}
|
2022-06-05 08:21:36 +00:00
|
|
|
|
2022-06-07 19:19:07 +00:00
|
|
|
assert(total_size == queue.getTotalWeight(cache_lock));
|
|
|
|
assert(queue.getTotalWeight(cache_lock) <= max_size);
|
2022-06-07 19:23:40 +00:00
|
|
|
assert(queue.getElementsNum(cache_lock) <= max_element_size);
|
2022-05-04 15:12:35 +00:00
|
|
|
}
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|