Refactoring

This commit is contained in:
kssenii 2023-01-18 20:52:16 +01:00
parent 9bb283f050
commit b1426ddbdd
12 changed files with 367 additions and 458 deletions

View File

@ -26,17 +26,18 @@ FileCache::FileCache(
const String & cache_base_path_, const String & cache_base_path_,
const FileCacheSettings & cache_settings_) const FileCacheSettings & cache_settings_)
: cache_base_path(cache_base_path_) : cache_base_path(cache_base_path_)
, max_size(cache_settings_.max_size)
, max_element_size(cache_settings_.max_elements)
, max_file_segment_size(cache_settings_.max_file_segment_size) , max_file_segment_size(cache_settings_.max_file_segment_size)
, allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files) , allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files)
, enable_bypass_cache_with_threshold(cache_settings_.enable_bypass_cache_with_threashold) , bypass_cache_threshold(cache_settings_.enable_bypass_cache_with_threashold ? cache_settings_.bypass_cache_threashold : 0)
, bypass_cache_threshold(enable_bypass_cache_with_threshold ? cache_settings_.bypass_cache_threashold : 0)
, log(&Poco::Logger::get("FileCache")) , log(&Poco::Logger::get("FileCache"))
, main_priority(std::make_unique<LRUFileCachePriority>())
, stash(std::make_unique<HitsCountStash>(cache_settings_.max_elements, cache_settings_.enable_cache_hits_threshold, std::make_unique<LRUFileCachePriority>()))
, query_limit(cache_settings_.enable_filesystem_query_cache_limit ? std::make_unique<QueryLimit>() : nullptr)
{ {
main_priority = std::make_unique<LRUFileCachePriority>(cache_settings_.max_size, cache_settings_.max_elements);
if (cache_settings_.cache_hits_threshold)
stash = std::make_unique<HitsCountStash>(cache_settings_.cache_hits_threshold, cache_settings_.max_elements);
if (cache_settings_.enable_filesystem_query_cache_limit)
query_limit = std::make_unique<QueryLimit>();
} }
FileCache::Key FileCache::createKeyForPath(const String & path) FileCache::Key FileCache::createKeyForPath(const String & path)
@ -497,7 +498,7 @@ FileCache::CacheCells::iterator FileCache::addCell(
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & settings, const CreateFileSegmentSettings & settings,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
CachePriorityQueueGuard::Lock * queue_lock) CachePriorityQueueGuard::LockPtr * queue_lock)
{ {
/// Create a file segment cell and put it in `files` map by [key][offset]. /// Create a file segment cell and put it in `files` map by [key][offset].
@ -516,30 +517,32 @@ FileCache::CacheCells::iterator FileCache::addCell(
/// `stash` - a queue of "stashed" key-offset pairs. Implements counting of /// `stash` - a queue of "stashed" key-offset pairs. Implements counting of
/// cache entries and allows caching only if cache hit threadhold is reached. /// cache entries and allows caching only if cache hit threadhold is reached.
if (stash && stash->cache_hits_threshold && state == FileSegment::State::EMPTY) if (stash && state == FileSegment::State::EMPTY)
{ {
// auto stash_lock = stash.lock(); if (!queue_lock)
// KeyAndOffset stash_key(key, offset); throw Exception(ErrorCodes::LOGICAL_ERROR, "Using stash requires queue_lock");
// auto record_it = stash.records.find(stash_key); KeyAndOffset stash_key(key, offset);
// if (record_it == stash.records.end())
// {
// auto & stash_queue = *stash.queue;
// auto & stash_records = stash.records;
// stash_records.insert({stash_key, stash_queue.add(key, offset, 0, key_transaction.getCreator(), lock)}); auto record_it = stash->records.find(stash_key);
if (record_it == stash->records.end())
{
auto stash_queue = LockedCachePriority(*queue_lock, *stash->queue);
auto & stash_records = stash->records;
// if (stash_queue.getElementsNum(lock) > stash.max_stash_queue_size) stash_records.emplace(stash_key, stash_queue.add(key, offset, 0, key_transaction.getCreator()));
// stash_records.erase(stash_queue.pop(lock));
// result_state = FileSegment::State::SKIP_CACHE; if (stash_queue.getElementsCount() > stash->queue->getElementsLimit())
// } stash_queue.pop();
// else
// { result_state = FileSegment::State::SKIP_CACHE;
// result_state = record_it->second->use() >= stash.cache_hits_threshold }
// ? FileSegment::State::EMPTY else
// : FileSegment::State::SKIP_CACHE; {
// } result_state = LockedCachePriorityIterator(*queue_lock, record_it->second).use() >= stash->hits_threshold
? FileSegment::State::EMPTY
: FileSegment::State::SKIP_CACHE;
}
} }
else else
{ {
@ -549,7 +552,9 @@ FileCache::CacheCells::iterator FileCache::addCell(
auto file_segment = std::make_shared<FileSegment>( auto file_segment = std::make_shared<FileSegment>(
offset, size, key, key_transaction.getCreator(), this, result_state, settings); offset, size, key, key_transaction.getCreator(), this, result_state, settings);
FileSegmentCell cell(std::move(file_segment), key_transaction, *main_priority, queue_lock); std::optional<LockedCachePriority> locked_queue(queue_lock ? LockedCachePriority(*queue_lock, *main_priority) : std::optional<LockedCachePriority>{});
FileSegmentCell cell(std::move(file_segment), key_transaction, locked_queue ? &*locked_queue : nullptr);
auto [cell_it, inserted] = key_transaction.getOffsets().emplace(offset, std::move(cell)); auto [cell_it, inserted] = key_transaction.getOffsets().emplace(offset, std::move(cell));
assert(inserted); assert(inserted);
@ -560,9 +565,9 @@ FileCache::CacheCells::iterator FileCache::addCell(
bool FileCache::tryReserve(const Key & key, size_t offset, size_t size) bool FileCache::tryReserve(const Key & key, size_t offset, size_t size)
{ {
assertInitialized(); assertInitialized();
auto queue_lock = main_priority->lock(); auto lock = priority_queue_guard.lock();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW);
return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock); return tryReserveUnlocked(key, offset, size, key_transaction, lock);
} }
bool FileCache::tryReserveUnlocked( bool FileCache::tryReserveUnlocked(
@ -570,24 +575,20 @@ bool FileCache::tryReserveUnlocked(
size_t offset, size_t offset,
size_t size, size_t size,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock & queue_lock) CachePriorityQueueGuard::LockPtr queue_lock)
{ {
auto query_context = query_limit ? query_limit->tryGetQueryContext(queue_lock) : nullptr; auto query_context = query_limit ? query_limit->tryGetQueryContext(queue_lock) : nullptr;
bool reserved; bool reserved;
if (query_context) if (query_context)
{ {
const bool query_limit_exceeded = query_context->getCacheSize() + size > query_context->getMaxCacheSize(); const bool query_limit_exceeded = query_context->getSize() + size > query_context->getSizeLimit();
if (!query_limit_exceeded) reserved = (!query_limit_exceeded || query_context->recacheOnQueryLimitExceeded())
reserved = tryReserveInCache(key, offset, size, query_context, key_transaction, queue_lock); && tryReserveImpl(query_context->getPriority(), key, offset, size, key_transaction, query_context.get(), queue_lock);
else if (query_context->isSkipDownloadIfExceed())
reserved = false;
else
reserved = tryReserveInQueryCache(key, offset, size, query_context, key_transaction, queue_lock);
} }
else else
{ {
reserved = tryReserveInCache(key, offset, size, nullptr, key_transaction, queue_lock); reserved = tryReserveImpl(*main_priority, key, offset, size, key_transaction, nullptr, queue_lock);
} }
if (reserved && !key_transaction->getOffsets().created_base_directory) if (reserved && !key_transaction->getOffsets().created_base_directory)
@ -595,148 +596,14 @@ bool FileCache::tryReserveUnlocked(
fs::create_directories(getPathInLocalCache(key)); fs::create_directories(getPathInLocalCache(key));
key_transaction->getOffsets().created_base_directory = true; key_transaction->getOffsets().created_base_directory = true;
} }
return reserved; return reserved;
} }
bool FileCache::tryReserveInQueryCache(
const Key & key,
size_t offset,
size_t size,
QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr,
const CachePriorityQueueGuard::Lock & queue_lock)
{
LOG_TEST(log, "Reserving query cache space {} for {}:{}", size, key.toString(), offset);
auto & query_priority = query_context->getPriority();
if (query_priority.getElementsNum(queue_lock)) {}
// struct Segment
// {
// Key key;
// size_t offset;
// size_t size;
//
// Segment(Key key_, size_t offset_, size_t size_)
// : key(key_), offset(offset_), size(size_) {}
// };
//
// std::vector<Segment> ghost;
// size_t queue_size = main_priority->getElementsNum(lock);
// size_t removed_size = 0;
// auto is_overflow = [&]
// {
// return (max_size != 0 && main_priority->getCacheSize() + size - removed_size > max_size)
// || (max_element_size != 0 && queue_size > max_element_size)
// || (query_context->getCacheSize() + size - removed_size > query_context->getMaxCacheSize());
// };
// query_priority.iterate([&](const QueueEntry & entry) -> IterationResult
// {
// if (!is_overflow())
// return IterationResult::BREAK;
// // auto * cell = key_transaction.getOffsets().tryGet(iter->offset());
// return IterationResult::CONTINUE;
// }, lock);
//
// auto * cell = key_transaction.getOffsets().tryGet(iter->offset());
//
// if (!cell)
// {
// /// The cache corresponding to this record may be swapped out by
// /// other queries, so it has become invalid.
// removed_size += iter->size();
// ghost.push_back(Segment(iter->key(), iter->offset(), iter->size()));
// /// next()
// iter->removeAndGetNext();
// }
// else
// {
// size_t cell_size = cell->size();
// assert(iter->size() == cell_size);
//
// if (cell->releasable())
// {
// auto & file_segment = cell->file_segment;
//
// if (file_segment->isPersistent() && allow_persistent_files)
// continue;
//
// switch (file_segment->state())
// {
// case FileSegment::State::DOWNLOADED:
// {
// to_evict.push_back(cell);
// break;
// }
// default:
// {
// trash.push_back(cell);
// break;
// }
// }
// removed_size += cell_size;
// --queue_size;
// }
//
// iter->next();
// }
// }
//
// // auto remove_file_segment = [&](FileSegmentPtr file_segment, size_t file_segment_size)
// // {
// // /// FIXME: key transaction is incorrect
// // query_context->remove(file_segment->key(), file_segment->offset(), file_segment_size, key_transaction);
// // remove(file_segment);
// // };
//
// assert(trash.empty());
// //for (auto & cell : trash)
// //{
// // if (auto file_segment = cell->file_segment)
// // // remove_file_segment(file_segment, cell->size());
// //}
//
// for (auto & entry : ghost)
// /// FIXME: key transaction is incorrect
// query_context->remove(entry.key, entry.offset, entry.size, key_transaction);
//
// if (is_overflow())
// return false;
//
// if (cell_for_reserve)
// {
// auto queue_iterator = cell_for_reserve->queue_iterator;
// if (queue_iterator)
// queue_iterator->incrementSize(size);
// else
// cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction.getCreator());
// }
//
// // for (auto & cell : to_evict)
// // {
// // if (auto file_segment = cell->file_segment)
// // remove_file_segment(file_segment, cell->size());
// // }
//
// query_context->reserve(key, offset, size, key_transaction);
//
// if (reserved && !key_transaction.getOffsets().created_base_directory)
// {
// fs::create_directories(getPathInLocalCache(key));
// key_transaction.getOffsets().created_base_directory = true;
// }
return true;
}
void FileCache::iterateAndCollectKeyLocks( void FileCache::iterateAndCollectKeyLocks(
IFileCachePriority & priority, LockedCachePriority & priority,
IterateAndCollectLocksFunc && func, IterateAndCollectLocksFunc && func,
KeyTransactionsMap & locked_map, KeyTransactionsMap & locked_map)
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
priority.iterate([&, func = std::move(func)](const IFileCachePriority::Entry & entry) priority.iterate([&, func = std::move(func)](const IFileCachePriority::Entry & entry)
{ {
@ -754,25 +621,37 @@ void FileCache::iterateAndCollectKeyLocks(
locked_map.emplace(entry.key, current); locked_map.emplace(entry.key, current);
return res.iteration_result; return res.iteration_result;
}, queue_lock); });
} }
bool FileCache::tryReserveInCache( bool FileCache::tryReserveImpl(
IFileCachePriority & priority_queue,
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock & queue_lock) QueryLimit::LockedQueryContext * query_context,
CachePriorityQueueGuard::LockPtr priority_lock)
{ {
/// Iterate cells in the priority of `priority_queue`.
/// If some entry is in `priority_queue` it must be guaranteed to have a
/// corresponding cache cell in key_transaction->offsets() and in
/// query_context->records (if query_context != nullptr).
/// When we evict some entry, then it must be removed from both:
/// main_priority and query_context::priority (if query_context != nullptr).
/// If we successfulkly reserved space, entry must be added to both:
/// cells and query_context::records (if query_context != nullptr);
LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset); LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset);
size_t queue_size = main_priority->getElementsNum(queue_lock); LockedCachePriority locked_priority_queue(priority_lock, priority_queue);
chassert(queue_size <= max_element_size); LockedCachePriority locked_main_priority(priority_lock, *main_priority);
auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset); size_t queue_size = locked_priority_queue.getElementsCount();
chassert(queue_size <= locked_priority_queue.getElementsLimit());
/// A cell acquires a LRUQueue iterator on first successful space reservation attempt. /// A cell acquires a LRUQueue iterator on first successful space reservation attempt.
auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset);
if (!cell_for_reserve || !cell_for_reserve->queue_iterator) if (!cell_for_reserve || !cell_for_reserve->queue_iterator)
queue_size += 1; queue_size += 1;
@ -781,8 +660,9 @@ bool FileCache::tryReserveInCache(
{ {
/// max_size == 0 means unlimited cache size, /// max_size == 0 means unlimited cache size,
/// max_element_size means unlimited number of cache elements. /// max_element_size means unlimited number of cache elements.
return (max_size != 0 && main_priority->getCacheSize(queue_lock) + size - removed_size > max_size) return (main_priority->getSizeLimit() != 0 && locked_main_priority.getSize() + size - removed_size > main_priority->getSizeLimit())
|| (max_element_size != 0 && queue_size > max_element_size); || (main_priority->getElementsLimit() != 0 && queue_size > main_priority->getElementsLimit())
|| (query_context && query_context->getSize() + size - removed_size > query_context->getSizeLimit());
}; };
KeyTransactionsMap locked; KeyTransactionsMap locked;
@ -792,7 +672,7 @@ bool FileCache::tryReserveInCache(
using IterationResult = IFileCachePriority::IterationResult; using IterationResult = IFileCachePriority::IterationResult;
iterateAndCollectKeyLocks( iterateAndCollectKeyLocks(
*main_priority, locked_priority_queue,
[&](const QueueEntry & entry, KeyTransaction & locked_key) -> IterateAndLockResult [&](const QueueEntry & entry, KeyTransaction & locked_key) -> IterateAndLockResult
{ {
if (!is_overflow()) if (!is_overflow())
@ -800,7 +680,9 @@ bool FileCache::tryReserveInCache(
auto * cell = locked_key.getOffsets().get(entry.offset); auto * cell = locked_key.getOffsets().get(entry.offset);
chassert(cell->queue_iterator);
chassert(entry.size == cell->size()); chassert(entry.size == cell->size());
const size_t cell_size = cell->size(); const size_t cell_size = cell->size();
bool remove_current_it = false; bool remove_current_it = false;
@ -832,7 +714,7 @@ bool FileCache::tryReserveInCache(
{ {
remove_current_it = true; remove_current_it = true;
cell->queue_iterator = {}; cell->queue_iterator = {};
locked_key.remove(file_segment, queue_lock); locked_key.remove(file_segment, priority_lock);
break; break;
} }
} }
@ -845,41 +727,48 @@ bool FileCache::tryReserveInCache(
return { IterationResult::REMOVE_AND_CONTINUE, save_key_transaction }; return { IterationResult::REMOVE_AND_CONTINUE, save_key_transaction };
return { IterationResult::CONTINUE, save_key_transaction }; return { IterationResult::CONTINUE, save_key_transaction };
}, locked, queue_lock); }, locked);
if (is_overflow()) if (is_overflow())
return false; return false;
if (cell_for_reserve)
{
/// queue_iteratir is std::nullopt here if no space has been reserved yet, a cache cell
/// acquires queue iterator on first successful space reservation attempt.
/// If queue iterator already exists, we need to update the size after each space reservation.
auto queue_iterator = cell_for_reserve->queue_iterator;
if (queue_iterator)
queue_iterator->incrementSize(size, queue_lock);
else
{
/// Space reservation is incremental, so cache cell is created first (with state empty),
/// and queue_iterator is assigned on first space reservation attempt.
cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction->getCreator(), queue_lock);
}
}
for (auto & [_, transaction] : locked) for (auto & [_, transaction] : locked)
{ {
for (const auto & offset_to_delete : transaction->delete_offsets) for (const auto & offset_to_delete : transaction->delete_offsets)
{ {
auto * cell = transaction->getOffsets().get(offset_to_delete); auto * cell = transaction->getOffsets().get(offset_to_delete);
transaction->remove(cell->file_segment, queue_lock); transaction->remove(cell->file_segment, priority_lock);
if (query_context)
query_context->remove(key, offset);
} }
} }
if (main_priority->getCacheSize(queue_lock) > (1ull << 63)) if (cell_for_reserve)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); {
/// queue_iteratir is std::nullopt here if no space has been reserved yet, a cache cell
/// acquires queue iterator on first successful space reservation attempt.
/// If queue iterator already exists, we need to update the size after each space reservation.
if (cell_for_reserve->queue_iterator)
LockedCachePriorityIterator(priority_lock, cell_for_reserve->queue_iterator).incrementSize(size);
else
{
/// Space reservation is incremental, so cache cell is created first (with state empty),
/// and queue_iterator is assigned on first space reservation attempt.
cell_for_reserve->queue_iterator = locked_main_priority.add(key, offset, size, key_transaction->getCreator());
}
}
if (query_context) if (query_context)
query_context->reserve(key, offset, size, *key_transaction, queue_lock); {
auto queue_iterator = query_context->tryGet(key, offset);
if (queue_iterator)
LockedCachePriorityIterator(priority_lock, queue_iterator).incrementSize(size);
else
query_context->add(key, offset, LockedCachePriority(priority_lock, query_context->getPriority()).add(key, offset, size, key_transaction->getCreator()));
}
if (locked_main_priority.getSize() > (1ull << 63))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
return true; return true;
} }
@ -888,7 +777,7 @@ void FileCache::removeIfExists(const Key & key)
{ {
assertInitialized(); assertInitialized();
auto queue_lock = main_priority->lock(); auto queue_lock = priority_queue_guard.lock();
auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL);
if (!key_transaction) if (!key_transaction)
return; return;
@ -927,8 +816,8 @@ void FileCache::removeAllReleasable()
/// `remove_persistent_files` defines whether non-evictable by some criteria files /// `remove_persistent_files` defines whether non-evictable by some criteria files
/// (they do not comply with the cache eviction policy) should also be removed. /// (they do not comply with the cache eviction policy) should also be removed.
auto queue_lock = main_priority->lock(); auto queue_lock = priority_queue_guard.lock();
main_priority->iterate([&](const QueueEntry & entry) -> IterationResult LockedCachePriority(queue_lock, *main_priority).iterate([&](const QueueEntry & entry) -> IterationResult
{ {
auto key_transaction = entry.createKeyTransaction(); auto key_transaction = entry.createKeyTransaction();
auto * cell = key_transaction->getOffsets().get(entry.offset); auto * cell = key_transaction->getOffsets().get(entry.offset);
@ -940,14 +829,13 @@ void FileCache::removeAllReleasable()
return IterationResult::REMOVE_AND_CONTINUE; return IterationResult::REMOVE_AND_CONTINUE;
} }
return IterationResult::CONTINUE; return IterationResult::CONTINUE;
}, queue_lock); });
if (stash) if (stash)
{ {
/// Remove all access information. /// Remove all access information.
auto lock = stash->queue->lock();
stash->records.clear(); stash->records.clear();
stash->queue->removeAll(lock); LockedCachePriority(queue_lock, *stash->queue).removeAll();
} }
} }
@ -966,7 +854,7 @@ KeyTransaction::~KeyTransaction()
cleanupKeyDirectory(); cleanupKeyDirectory();
} }
void KeyTransaction::remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock & queue_lock) void KeyTransaction::remove(FileSegmentPtr file_segment, CachePriorityQueueGuard::LockPtr queue_lock)
{ {
/// We must hold pointer to file segment while removing it. /// We must hold pointer to file segment while removing it.
chassert(file_segment->key() == key); chassert(file_segment->key() == key);
@ -982,7 +870,7 @@ bool KeyTransaction::isLastHolder(size_t offset)
void KeyTransaction::remove( void KeyTransaction::remove(
size_t offset, size_t offset,
const FileSegmentGuard::Lock & segment_lock, const FileSegmentGuard::Lock & segment_lock,
const CachePriorityQueueGuard::Lock & queue_lock) CachePriorityQueueGuard::LockPtr queue_lock)
{ {
LOG_DEBUG( LOG_DEBUG(
log, "Remove from cache. Key: {}, offset: {}", log, "Remove from cache. Key: {}, offset: {}",
@ -991,7 +879,7 @@ void KeyTransaction::remove(
auto * cell = offsets->get(offset); auto * cell = offsets->get(offset);
if (cell->queue_iterator) if (cell->queue_iterator)
cell->queue_iterator->remove(queue_lock); LockedCachePriorityIterator(queue_lock, cell->queue_iterator).remove();
const auto cache_file_path = cell->file_segment->getPathInLocalCache(); const auto cache_file_path = cell->file_segment->getPathInLocalCache();
cell->file_segment->detach(segment_lock, *this); cell->file_segment->detach(segment_lock, *this);
@ -1041,7 +929,8 @@ void KeyTransaction::cleanupKeyDirectory() const
void FileCache::loadMetadata() void FileCache::loadMetadata()
{ {
auto queue_lock = main_priority->lock(); auto queue_lock = priority_queue_guard.lock();
LockedCachePriority priority_queue(queue_lock, *main_priority);
UInt64 offset = 0; UInt64 offset = 0;
size_t size = 0; size_t size = 0;
@ -1137,7 +1026,7 @@ void FileCache::loadMetadata()
log, log,
"Cache capacity changed (max size: {}, used: {}), " "Cache capacity changed (max size: {}, used: {}), "
"cached file `{}` does not fit in cache anymore (size: {})", "cached file `{}` does not fit in cache anymore (size: {})",
max_size, main_priority->getCacheSize(queue_lock), key_it->path().string(), size); priority_queue.getSizeLimit(), priority_queue.getSize(), key_it->path().string(), size);
fs::remove(offset_it->path()); fs::remove(offset_it->path());
} }
@ -1148,7 +1037,7 @@ void FileCache::loadMetadata()
/// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority. /// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
pcg64 generator(randomSeed()); pcg64 generator(randomSeed());
std::shuffle(queue_entries.begin(), queue_entries.end(), generator); std::shuffle(queue_entries.begin(), queue_entries.end(), generator);
for (const auto & [it, file_segment] : queue_entries) for (auto & [it, file_segment] : queue_entries)
{ {
/// Cell cache size changed and, for example, 1st file segment fits into cache /// Cell cache size changed and, for example, 1st file segment fits into cache
/// and 2nd file segment will fit only if first was evicted, then first will be removed and /// and 2nd file segment will fit only if first was evicted, then first will be removed and
@ -1156,14 +1045,14 @@ void FileCache::loadMetadata()
if (file_segment.expired()) if (file_segment.expired())
continue; continue;
it->use(queue_lock); LockedCachePriorityIterator(queue_lock, it).use();
} }
} }
void KeyTransaction::reduceSizeToDownloaded( void KeyTransaction::reduceSizeToDownloaded(
size_t offset, size_t offset,
const FileSegmentGuard::Lock & segment_lock, const FileSegmentGuard::Lock & segment_lock,
const CachePriorityQueueGuard::Lock & queue_lock) CachePriorityQueueGuard::LockPtr queue_lock)
{ {
/** /**
* In case file was partially downloaded and it's download cannot be continued * In case file was partially downloaded and it's download cannot be continued
@ -1184,7 +1073,7 @@ void KeyTransaction::reduceSizeToDownloaded(
file_segment->getInfoForLogUnlocked(segment_lock)); file_segment->getInfoForLogUnlocked(segment_lock));
} }
[[maybe_unused]] const auto & entry = **cell->queue_iterator; [[maybe_unused]] const auto & entry = *LockedCachePriorityIterator(queue_lock, cell->queue_iterator);
assert(file_segment->downloaded_size <= file_segment->reserved_size); assert(file_segment->downloaded_size <= file_segment->reserved_size);
assert(entry.size == file_segment->reserved_size); assert(entry.size == file_segment->reserved_size);
assert(entry.size >= file_segment->downloaded_size); assert(entry.size >= file_segment->downloaded_size);
@ -1192,7 +1081,7 @@ void KeyTransaction::reduceSizeToDownloaded(
if (file_segment->reserved_size > file_segment->downloaded_size) if (file_segment->reserved_size > file_segment->downloaded_size)
{ {
int64_t extra_size = static_cast<ssize_t>(cell->file_segment->reserved_size) - static_cast<ssize_t>(file_segment->downloaded_size); int64_t extra_size = static_cast<ssize_t>(cell->file_segment->reserved_size) - static_cast<ssize_t>(file_segment->downloaded_size);
cell->queue_iterator->incrementSize(-extra_size, queue_lock); LockedCachePriorityIterator(queue_lock, cell->queue_iterator).incrementSize(-extra_size);
} }
CreateFileSegmentSettings create_settings(file_segment->getKind()); CreateFileSegmentSettings create_settings(file_segment->getKind());
@ -1235,13 +1124,13 @@ FileSegmentsHolderPtr FileCache::dumpQueue()
using IterationResult = IFileCachePriority::IterationResult; using IterationResult = IFileCachePriority::IterationResult;
FileSegments file_segments; FileSegments file_segments;
main_priority->iterate([&](const QueueEntry & entry) LockedCachePriority(priority_queue_guard.lock(), *main_priority).iterate([&](const QueueEntry & entry)
{ {
auto tx = entry.createKeyTransaction(); auto tx = entry.createKeyTransaction();
auto * cell = tx->getOffsets().get(entry.offset); auto * cell = tx->getOffsets().get(entry.offset);
file_segments.push_back(FileSegment::getSnapshot(cell->file_segment)); file_segments.push_back(FileSegment::getSnapshot(cell->file_segment));
return IterationResult::CONTINUE; return IterationResult::CONTINUE;
}, main_priority->lock()); });
return std::make_unique<FileSegmentsHolder>(std::move(file_segments)); return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
} }
@ -1266,21 +1155,18 @@ std::vector<String> FileCache::tryGetCachePaths(const Key & key)
size_t FileCache::getUsedCacheSize() const size_t FileCache::getUsedCacheSize() const
{ {
auto lock = main_priority->lock(); return LockedCachePriority(priority_queue_guard.lock(), *main_priority).getSize();
return main_priority->getCacheSize(lock);
} }
size_t FileCache::getFileSegmentsNum() const size_t FileCache::getFileSegmentsNum() const
{ {
auto lock = main_priority->lock(); return LockedCachePriority(priority_queue_guard.lock(), *main_priority).getElementsCount();
return main_priority->getElementsNum(lock);
} }
FileCache::FileSegmentCell::FileSegmentCell( FileCache::FileSegmentCell::FileSegmentCell(
FileSegmentPtr file_segment_, FileSegmentPtr file_segment_,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
IFileCachePriority & priority_queue, LockedCachePriority * locked_queue)
CachePriorityQueueGuard::Lock * queue_lock)
: file_segment(file_segment_) : file_segment(file_segment_)
{ {
/** /**
@ -1293,17 +1179,15 @@ FileCache::FileSegmentCell::FileSegmentCell(
{ {
case FileSegment::State::DOWNLOADED: case FileSegment::State::DOWNLOADED:
{ {
if (!queue_lock) if (!locked_queue)
{ {
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Adding file segment with state DOWNLOADED requires locked queue lock"); "Adding file segment with state DOWNLOADED requires locked queue lock");
} }
queue_iterator = priority_queue.add( queue_iterator = locked_queue->add(
file_segment->key(), file_segment->offset(), file_segment->range().size(), file_segment->key(), file_segment->offset(), file_segment->range().size(), key_transaction.getCreator());
key_transaction.getCreator(), *queue_lock);
/// TODO: add destructor
break; break;
} }
case FileSegment::State::SKIP_CACHE: case FileSegment::State::SKIP_CACHE:
@ -1384,10 +1268,11 @@ void FileCache::assertCacheCorrectness()
using QueueEntry = IFileCachePriority::Entry; using QueueEntry = IFileCachePriority::Entry;
using IterationResult = IFileCachePriority::IterationResult; using IterationResult = IFileCachePriority::IterationResult;
auto lock = main_priority->lock(); auto lock = priority_queue_guard.lock();
LockedCachePriority queue(lock, *main_priority);
[[maybe_unused]] size_t total_size = 0; [[maybe_unused]] size_t total_size = 0;
main_priority->iterate([&](const QueueEntry & entry) -> IterationResult queue.iterate([&](const QueueEntry & entry) -> IterationResult
{ {
auto key_transaction = entry.createKeyTransaction(); auto key_transaction = entry.createKeyTransaction();
auto * cell = key_transaction->getOffsets().get(entry.offset); auto * cell = key_transaction->getOffsets().get(entry.offset);
@ -1402,11 +1287,11 @@ void FileCache::assertCacheCorrectness()
total_size += entry.size; total_size += entry.size;
return IterationResult::CONTINUE; return IterationResult::CONTINUE;
}, lock); });
assert(total_size == main_priority->getCacheSize(lock)); chassert(queue.getSize() == total_size);
assert(main_priority->getCacheSize(lock) <= max_size); chassert(queue.getSize() <= queue.getSizeLimit());
assert(main_priority->getElementsNum(lock) <= max_element_size); chassert(queue.getElementsCount() <= queue.getElementsLimit());
} }
FileCache::QueryContextHolder::QueryContextHolder( FileCache::QueryContextHolder::QueryContextHolder(
@ -1424,28 +1309,23 @@ FileCache::QueryContextHolder::~QueryContextHolder()
/// If only the query_map and the current holder hold the context_query, /// If only the query_map and the current holder hold the context_query,
/// the query has been completed and the query_context is released. /// the query has been completed and the query_context is released.
if (context && context.use_count() == 2) if (context && context.use_count() == 2)
cache->query_limit->removeQueryContext(query_id, cache->main_priority->lock()); {
auto lock = cache->priority_queue_guard.lock();
cache->query_limit->removeQueryContext(query_id, lock);
}
} }
FileCache::QueryLimit::QueryContext::QueryContext( FileCache::QueryLimit::LockedQueryContextPtr
size_t max_cache_size_, FileCache::QueryLimit::tryGetQueryContext(CachePriorityQueueGuard::LockPtr lock)
bool skip_download_if_exceeds_query_cache_)
: max_cache_size(max_cache_size_)
, skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_)
{
}
FileCache::QueryLimit::QueryContextPtr
FileCache::QueryLimit::tryGetQueryContext(const CachePriorityQueueGuard::Lock &)
{ {
if (!isQueryInitialized()) if (!isQueryInitialized())
return nullptr; return nullptr;
auto query_iter = query_map.find(std::string(CurrentThread::getQueryId())); auto query_iter = query_map.find(std::string(CurrentThread::getQueryId()));
return (query_iter == query_map.end()) ? nullptr : query_iter->second; return (query_iter == query_map.end()) ? nullptr : std::make_unique<LockedQueryContext>(query_iter->second, lock);
} }
void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, const CachePriorityQueueGuard::Lock &) void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, CachePriorityQueueGuard::LockPtr)
{ {
auto query_iter = query_map.find(query_id); auto query_iter = query_map.find(query_id);
if (query_iter == query_map.end()) if (query_iter == query_map.end())
@ -1461,7 +1341,7 @@ void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, con
FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext( FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext(
const std::string & query_id, const std::string & query_id,
const ReadSettings & settings, const ReadSettings & settings,
const CachePriorityQueueGuard::Lock &) CachePriorityQueueGuard::LockPtr)
{ {
if (query_id.empty()) if (query_id.empty())
return nullptr; return nullptr;
@ -1470,7 +1350,7 @@ FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryConte
if (inserted) if (inserted)
{ {
it->second = std::make_shared<QueryContext>( it->second = std::make_shared<QueryContext>(
settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache); settings.max_query_cache_size, !settings.skip_download_if_exceeds_query_cache);
} }
return it->second; return it->second;
@ -1482,70 +1362,40 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
if (!query_limit || settings.max_query_cache_size == 0) if (!query_limit || settings.max_query_cache_size == 0)
return {}; return {};
auto context = query_limit->getOrSetQueryContext(query_id, settings, main_priority->lock()); auto lock = priority_queue_guard.lock();
auto context = query_limit->getOrSetQueryContext(query_id, settings, lock);
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context)); return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
} }
void FileCache::QueryLimit::QueryContext::remove( void FileCache::QueryLimit::LockedQueryContext::add(const Key & key, size_t offset, IFileCachePriority::Iterator iterator)
const Key & key,
size_t offset,
size_t size,
const CachePriorityQueueGuard::Lock & queue_lock)
{ {
std::lock_guard lock(mutex); auto [_, inserted] = context->records.emplace(KeyAndOffset{key, offset}, iterator);
if (cache_size < size) if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size");
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record != records.end())
{
record->second->remove(queue_lock);
records.erase({key, offset});
}
}
cache_size -= size;
}
void FileCache::QueryLimit::QueryContext::reserve(
const Key & key,
size_t offset,
size_t size,
const KeyTransaction & key_transaction,
const CachePriorityQueueGuard::Lock & queue_lock)
{
std::lock_guard lock(mutex);
if (cache_size + size > max_cache_size)
{ {
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Reserved cache size exceeds the remaining cache size (key: {}, offset: {})", "Cannot add offset {} to query context under key {}, it already exists",
key.toString(), offset); offset, key.toString());
} }
if (!skip_download_if_exceeds_query_cache)
{
auto record = records.find({key, offset});
if (record == records.end())
{
auto queue_iter = priority->add(key, offset, 0, key_transaction.getCreator(), queue_lock);
record = records.insert({{key, offset}, queue_iter}).first;
}
record->second->incrementSize(size, queue_lock);
}
cache_size += size;
} }
void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock & queue_lock) void FileCache::QueryLimit::LockedQueryContext::remove(const Key & key, size_t offset)
{ {
if (skip_download_if_exceeds_query_cache) auto record = context->records.find({key, offset});
return; if (record == context->records.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no {}:{} in query context", key.toString(), offset);
LockedCachePriorityIterator(lock, record->second).remove();
context->records.erase({key, offset});
}
IFileCachePriority::Iterator FileCache::QueryLimit::LockedQueryContext::tryGet(const Key & key, size_t offset)
{
auto it = context->records.find({key, offset});
if (it == context->records.end())
return nullptr;
return it->second;
std::lock_guard lock(mutex);
auto record = records.find({key, offset});
if (record != records.end())
record->second->use(queue_lock);
} }
KeyTransactionPtr KeyTransactionCreator::create() KeyTransactionPtr KeyTransactionCreator::create()

View File

@ -16,6 +16,7 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileCache_fwd.h> #include <Interpreters/Cache/FileCache_fwd.h>
#include <Interpreters/Cache/FileSegment.h> #include <Interpreters/Cache/FileSegment.h>
@ -116,13 +117,9 @@ private:
String cache_base_path; String cache_base_path;
const size_t max_size;
const size_t max_element_size;
const size_t max_file_segment_size; const size_t max_file_segment_size;
const bool allow_persistent_files; const bool allow_persistent_files;
const bool enable_bypass_cache_with_threshold; const size_t bypass_cache_threshold = 0;
const size_t bypass_cache_threshold;
Poco::Logger * log; Poco::Logger * log;
@ -143,8 +140,7 @@ private:
FileSegmentCell( FileSegmentCell(
FileSegmentPtr file_segment_, FileSegmentPtr file_segment_,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
IFileCachePriority & priority_queue, LockedCachePriority * locked_queue);
CachePriorityQueueGuard::Lock * queue_lock);
FileSegmentCell(FileSegmentCell && other) noexcept FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {}
@ -214,72 +210,88 @@ private:
mutable PendingRemoveFilesMetadata remove_files_metadata; mutable PendingRemoveFilesMetadata remove_files_metadata;
FileCachePriorityPtr main_priority; FileCachePriorityPtr main_priority;
mutable CachePriorityQueueGuard priority_queue_guard;
struct HitsCountStash struct HitsCountStash
{ {
HitsCountStash(size_t max_stash_queue_size_, size_t cache_hits_threshold_, FileCachePriorityPtr queue_) HitsCountStash(size_t hits_threashold_, size_t queue_size_)
: max_stash_queue_size(max_stash_queue_size_) : hits_threshold(hits_threashold_), queue(std::make_unique<LRUFileCachePriority>(0, queue_size_))
, cache_hits_threshold(cache_hits_threshold_) {
, queue(std::move(queue_)) {} if (!queue_size_)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Queue size for hits queue must be non-zero");
const size_t max_stash_queue_size; }
const size_t cache_hits_threshold;
auto lock() const { return queue->lock(); }
const size_t hits_threshold;
FileCachePriorityPtr queue; FileCachePriorityPtr queue;
using Records = std::unordered_map<KeyAndOffset, IFileCachePriority::Iterator, FileCacheKeyAndOffsetHash>; using Records = std::unordered_map<KeyAndOffset, IFileCachePriority::Iterator, FileCacheKeyAndOffsetHash>;
Records records; Records records;
}; };
mutable std::unique_ptr<HitsCountStash> stash; mutable std::unique_ptr<HitsCountStash> stash;
struct QueryLimit class QueryLimit
{ {
/// Used to track and control the cache access of each query. friend class FileCache;
/// Through it, we can realize the processing of different queries by the cache layer. public:
struct QueryContext class QueryContext;
{
std::mutex mutex;
HitsCountStash::Records records;
FileCachePriorityPtr priority;
size_t cache_size = 0;
size_t max_cache_size;
bool skip_download_if_exceeds_query_cache;
QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_);
size_t getMaxCacheSize() const { return max_cache_size; }
size_t getCacheSize() const { return cache_size; }
IFileCachePriority & getPriority() const { return *priority; }
bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; }
void remove(const Key & key, size_t offset, size_t size, const CachePriorityQueueGuard::Lock &);
void reserve(const Key & key, size_t offset, size_t size, const KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &);
void use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &);
};
using QueryContextPtr = std::shared_ptr<QueryContext>; using QueryContextPtr = std::shared_ptr<QueryContext>;
using QueryContextMap = std::unordered_map<String, QueryContextPtr>; class LockedQueryContext;
using LockedQueryContextPtr = std::unique_ptr<LockedQueryContext>;
QueryContextMap query_map; LockedQueryContextPtr tryGetQueryContext(CachePriorityQueueGuard::LockPtr lock);
QueryContextPtr tryGetQueryContext(const CachePriorityQueueGuard::Lock & lock);
void removeQueryContext(const std::string & query_id, const CachePriorityQueueGuard::Lock & lock);
QueryContextPtr getOrSetQueryContext( QueryContextPtr getOrSetQueryContext(
const std::string & query_id, const std::string & query_id, const ReadSettings & settings, CachePriorityQueueGuard::LockPtr);
const ReadSettings & settings,
const CachePriorityQueueGuard::Lock & lock); void removeQueryContext(const std::string & query_id, CachePriorityQueueGuard::LockPtr);
private:
using QueryContextMap = std::unordered_map<String, QueryContextPtr>;
QueryContextMap query_map;
public:
class QueryContext
{
public:
QueryContext(size_t query_cache_size, bool recache_on_query_limit_exceeded_)
: priority(std::make_unique<LRUFileCachePriority>(query_cache_size, 0))
, recache_on_query_limit_exceeded(recache_on_query_limit_exceeded_) {}
private:
friend class QueryLimit::LockedQueryContext;
using Records = std::unordered_map<KeyAndOffset, IFileCachePriority::Iterator, FileCacheKeyAndOffsetHash>;
Records records;
FileCachePriorityPtr priority;
const bool recache_on_query_limit_exceeded;
};
class LockedQueryContext
{
public:
LockedQueryContext(QueryContextPtr context_, CachePriorityQueueGuard::LockPtr lock_)
: context(context_), lock(lock_), priority(lock_, *context->priority) {}
IFileCachePriority & getPriority() { return *context->priority; }
const IFileCachePriority & getPriority() const { return *context->priority; }
size_t getSize() const { return priority.getSize(); }
size_t getSizeLimit() const { return priority.getSizeLimit(); }
bool recacheOnQueryLimitExceeded() const { return context->recache_on_query_limit_exceeded; }
IFileCachePriority::Iterator tryGet(const Key & key, size_t offset);
void add(const Key & key, size_t offset, IFileCachePriority::Iterator iterator);
void remove(const Key & key, size_t offset);
private:
QueryContextPtr context;
CachePriorityQueueGuard::LockPtr lock;
LockedCachePriority priority;
};
}; };
using QueryLimitPtr = std::unique_ptr<QueryLimit>; using QueryLimitPtr = std::unique_ptr<QueryLimit>;
@ -334,30 +346,23 @@ private:
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & create_settings, const CreateFileSegmentSettings & create_settings,
KeyTransaction & key_transaction, KeyTransaction & key_transaction,
CachePriorityQueueGuard::Lock * queue_lock); CachePriorityQueueGuard::LockPtr * queue_lock);
bool tryReserveUnlocked( bool tryReserveUnlocked(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock &); CachePriorityQueueGuard::LockPtr);
bool tryReserveInCache( bool tryReserveImpl(
IFileCachePriority & priority_queue,
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction, KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock &); QueryLimit::LockedQueryContext * query_context,
CachePriorityQueueGuard::LockPtr priority_lock);
bool tryReserveInQueryCache(
const Key & key,
size_t offset,
size_t size,
QueryLimit::QueryContextPtr query_context,
KeyTransactionPtr key_transaction,
const CachePriorityQueueGuard::Lock &);
struct IterateAndLockResult struct IterateAndLockResult
{ {
@ -366,10 +371,9 @@ private:
}; };
using IterateAndCollectLocksFunc = std::function<IterateAndLockResult(const IFileCachePriority::Entry &, KeyTransaction &)>; using IterateAndCollectLocksFunc = std::function<IterateAndLockResult(const IFileCachePriority::Entry &, KeyTransaction &)>;
static void iterateAndCollectKeyLocks( static void iterateAndCollectKeyLocks(
IFileCachePriority & priority, LockedCachePriority & priority,
IterateAndCollectLocksFunc && func, IterateAndCollectLocksFunc && func,
KeyTransactionsMap & locked_map, KeyTransactionsMap & locked_map);
const CachePriorityQueueGuard::Lock & queue_lock);
}; };
struct KeyTransactionCreator struct KeyTransactionCreator
@ -396,11 +400,11 @@ struct KeyTransaction : private boost::noncopyable
KeyTransactionCreatorPtr getCreator() const { return std::make_unique<KeyTransactionCreator>(key, offsets, cache); } KeyTransactionCreatorPtr getCreator() const { return std::make_unique<KeyTransactionCreator>(key, offsets, cache); }
void reduceSizeToDownloaded(size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &); void reduceSizeToDownloaded(size_t offset, const FileSegmentGuard::Lock &, CachePriorityQueueGuard::LockPtr);
void remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock &); void remove(FileSegmentPtr file_segment, CachePriorityQueueGuard::LockPtr);
void remove(size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &); void remove(size_t offset, const FileSegmentGuard::Lock &, CachePriorityQueueGuard::LockPtr);
bool isLastHolder(size_t offset); bool isLastHolder(size_t offset);

View File

@ -33,7 +33,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false); cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false); enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD); cache_hits_threshold = config.getUInt64(config_prefix + ".cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_DEFAULT_HITS_THRESHOLD);
enable_bypass_cache_with_threashold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threashold", false); enable_bypass_cache_with_threashold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threashold", false);

View File

@ -15,7 +15,7 @@ struct FileCacheSettings
bool cache_on_write_operations = false; bool cache_on_write_operations = false;
size_t enable_cache_hits_threshold = REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD; size_t cache_hits_threshold = REMOTE_FS_OBJECTS_CACHE_DEFAULT_HITS_THRESHOLD;
bool enable_filesystem_query_cache_limit = false; bool enable_filesystem_query_cache_limit = false;
bool do_not_evict_index_and_mark_files = true; bool do_not_evict_index_and_mark_files = true;

View File

@ -6,8 +6,8 @@ namespace DB
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024;
static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024;
static constexpr int REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD = 0; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_HITS_THRESHOLD = 0;
static constexpr size_t REMOTE_FS_OBJECTS_CACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024;; static constexpr size_t REMOTE_FS_OBJECTS_CACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024;
class FileCache; class FileCache;
using FileCachePtr = std::shared_ptr<FileCache>; using FileCachePtr = std::shared_ptr<FileCache>;

View File

@ -536,12 +536,12 @@ void FileSegment::setBroken()
void FileSegment::complete() void FileSegment::complete()
{ {
auto queue_lock = cache->main_priority->lock(); auto queue_lock = cache->priority_queue_guard.lock();
auto key_transaction = createKeyTransaction(); auto key_transaction = createKeyTransaction();
return completeUnlocked(*key_transaction, queue_lock); return completeUnlocked(*key_transaction, queue_lock);
} }
void FileSegment::completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock & queue_lock) void FileSegment::completeUnlocked(KeyTransaction & key_transaction, CachePriorityQueueGuard::LockPtr queue_lock)
{ {
auto segment_lock = segment_guard.lock(); auto segment_lock = segment_guard.lock();
@ -817,7 +817,8 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
return file_segments.erase(file_segments.begin()); return file_segments.erase(file_segments.begin());
} }
auto queue_lock = file_segment.cache->main_priority->lock(); auto queue_lock = file_segment.cache->priority_queue_guard.lock();
/// File segment pointer must be reset right after calling complete() and /// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers. /// under the same mutex, because complete() checks for segment pointers.
auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false); auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false);
@ -825,7 +826,7 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl()
{ {
auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator; auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator;
if (queue_iter) if (queue_iter)
queue_iter->use(queue_lock); LockedCachePriorityIterator(queue_lock, queue_iter).use();
if (!file_segment.isCompleted()) if (!file_segment.isCompleted())
{ {

View File

@ -294,7 +294,7 @@ private:
/// Function might check if the caller of the method /// Function might check if the caller of the method
/// is the last alive holder of the segment. Therefore, completion and destruction /// is the last alive holder of the segment. Therefore, completion and destruction
/// of the file segment pointer must be done under the same cache mutex. /// of the file segment pointer must be done under the same cache mutex.
void completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &); void completeUnlocked(KeyTransaction & key_transaction, CachePriorityQueueGuard::LockPtr);
void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock); void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock);
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;

View File

@ -41,10 +41,11 @@ struct CachePriorityQueueGuard
explicit Lock(CachePriorityQueueGuard & guard) : lock(guard.mutex) {} explicit Lock(CachePriorityQueueGuard & guard) : lock(guard.mutex) {}
std::unique_lock<std::mutex> lock; std::unique_lock<std::mutex> lock;
}; };
using LockPtr = std::shared_ptr<Lock>;
std::mutex mutex; std::mutex mutex;
Lock lock() { return Lock(*this); } LockPtr lock() { return std::make_shared<Lock>(*this); }
std::shared_ptr<Lock> lockShared() { return std::make_shared<Lock>(*this); } std::shared_ptr<Lock> lockShared() { return std::make_shared<Lock>(*this); }
CachePriorityQueueGuard() = default; CachePriorityQueueGuard() = default;

View File

@ -20,13 +20,13 @@ using KeyTransactionCreatorPtr = std::unique_ptr<KeyTransactionCreator>;
/// IFileCachePriority is used to maintain the priority of cached data. /// IFileCachePriority is used to maintain the priority of cached data.
class IFileCachePriority class IFileCachePriority
{ {
friend class LockedCachePriority;
public: public:
class IIterator; class IIterator;
using Key = FileCacheKey; using Key = FileCacheKey;
using KeyAndOffset = FileCacheKeyAndOffset; using KeyAndOffset = FileCacheKeyAndOffset;
using Iterator = std::shared_ptr<IIterator>; using Iterator = std::shared_ptr<IIterator>;
using ConstIterator = std::shared_ptr<const IIterator>; using ConstIterator = std::shared_ptr<const IIterator>;
using Lock = CachePriorityQueueGuard::Lock;
struct Entry struct Entry
{ {
@ -48,43 +48,28 @@ public:
/// can only traverse the records in the low priority queue. /// can only traverse the records in the low priority queue.
class IIterator class IIterator
{ {
friend class LockedCachePriorityIterator;
public: public:
virtual ~IIterator() = default; virtual ~IIterator() = default;
protected:
virtual Entry & operator *() = 0; virtual Entry & operator *() = 0;
virtual const Entry & operator *() const = 0; virtual const Entry & operator *() const = 0;
/// Mark a cache record as recently used, it will update the priority virtual size_t use() = 0;
/// of the cache record according to different cache algorithms.
/// Return result hits count.
virtual size_t use(const CachePriorityQueueGuard::Lock &) = 0;
virtual void incrementSize(ssize_t, const CachePriorityQueueGuard::Lock &) = 0; virtual void incrementSize(ssize_t) = 0;
/// Remove current cached record. Return iterator to the next value. virtual Iterator remove() = 0;
virtual Iterator remove(const CachePriorityQueueGuard::Lock &) = 0;
}; };
IFileCachePriority(size_t max_size_, size_t max_elements_) : max_size(max_size_), max_elements(max_elements_) {}
virtual ~IFileCachePriority() = default; virtual ~IFileCachePriority() = default;
size_t getCacheSize(const CachePriorityQueueGuard::Lock &) const { return cache_size; } size_t getElementsLimit() const { return max_elements; }
virtual size_t getElementsNum(const CachePriorityQueueGuard::Lock &) const = 0;
/// Lock current priority queue. All methods must be called under this lock.
CachePriorityQueueGuard::Lock lock() { return guard.lock(); }
/// Add a cache record that did not exist before, and throw a
/// logical exception if the cache block already exists.
virtual Iterator add(
const Key & key,
size_t offset,
size_t size,
KeyTransactionCreatorPtr key_transaction_creator,
const CachePriorityQueueGuard::Lock &) = 0;
virtual void removeAll(const CachePriorityQueueGuard::Lock &) = 0;
size_t getSizeLimit() const { return max_size; }
enum class IterationResult enum class IterationResult
{ {
@ -92,15 +77,71 @@ public:
CONTINUE, CONTINUE,
REMOVE_AND_CONTINUE, REMOVE_AND_CONTINUE,
}; };
using IterateFunc = std::function<IterationResult(const Entry &)>;
virtual void iterate(IterateFunc && func, const CachePriorityQueueGuard::Lock &) = 0;
protected: protected:
CachePriorityQueueGuard guard; const size_t max_size = 0;
const size_t max_elements = 0;
size_t max_cache_size = 0; virtual size_t getSize() const = 0;
size_t cache_size = 0;
virtual size_t getElementsCount() const = 0;
virtual Iterator add(
const Key & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) = 0;
virtual void pop() = 0;
virtual void removeAll() = 0;
using IterateFunc = std::function<IterationResult(const Entry &)>;
virtual void iterate(IterateFunc && func) = 0;
};
class LockedCachePriority
{
public:
LockedCachePriority(CachePriorityQueueGuard::LockPtr lock_, IFileCachePriority & priority_queue_)
: lock(lock_), queue(priority_queue_) {}
size_t getElementsLimit() const { return queue.max_elements; }
size_t getSizeLimit() const { return queue.max_size; }
size_t getSize() const { return queue.getSize(); }
size_t getElementsCount() const { return queue.getElementsCount(); }
IFileCachePriority::Iterator add(const FileCacheKey & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) { return queue.add(key, offset, size, std::move(key_transaction_creator)); }
void pop() { queue.pop(); }
void removeAll() { queue.removeAll(); }
void iterate(IFileCachePriority::IterateFunc && func) { queue.iterate(std::move(func)); }
private:
CachePriorityQueueGuard::LockPtr lock;
IFileCachePriority & queue;
};
class LockedCachePriorityIterator
{
public:
LockedCachePriorityIterator(CachePriorityQueueGuard::LockPtr lock_, IFileCachePriority::Iterator & iterator_)
: lock(lock_), iterator(iterator_) {}
IFileCachePriority::Entry & operator *() { return **iterator; }
const IFileCachePriority::Entry & operator *() const { return **iterator; }
size_t use() { return iterator->use(); }
void incrementSize(ssize_t size) { return iterator->incrementSize(size); }
IFileCachePriority::Iterator remove() { return iterator->remove(); }
private:
CachePriorityQueueGuard::LockPtr lock;
IFileCachePriority::Iterator & iterator;
}; };
using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>; using FileCachePriorityPtr = std::unique_ptr<IFileCachePriority>;

View File

@ -17,11 +17,7 @@ namespace ErrorCodes
} }
IFileCachePriority::Iterator LRUFileCachePriority::add( IFileCachePriority::Iterator LRUFileCachePriority::add(
const Key & key, const Key & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator)
size_t offset,
size_t size,
KeyTransactionCreatorPtr key_transaction_creator,
const CachePriorityQueueGuard::Lock &)
{ {
#ifndef NDEBUG #ifndef NDEBUG
for (const auto & entry : queue) for (const auto & entry : queue)
@ -34,8 +30,18 @@ IFileCachePriority::Iterator LRUFileCachePriority::add(
} }
#endif #endif
const auto & size_limit = getSizeLimit();
if (size_limit && current_size + size > size_limit)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Not enough space to add {}:{} with size {}: current size: {}/{}",
key.toString(), offset, size, current_size, getSizeLimit());
}
current_size += size;
auto iter = queue.insert(queue.end(), Entry(key, offset, size, std::move(key_transaction_creator))); auto iter = queue.insert(queue.end(), Entry(key, offset, size, std::move(key_transaction_creator)));
cache_size += size;
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size); CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements); CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
@ -45,20 +51,25 @@ IFileCachePriority::Iterator LRUFileCachePriority::add(
return std::make_shared<LRUFileCacheIterator>(this, iter); return std::make_shared<LRUFileCacheIterator>(this, iter);
} }
void LRUFileCachePriority::removeAll(const CachePriorityQueueGuard::Lock &) void LRUFileCachePriority::removeAll()
{ {
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, current_size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size()); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
LOG_TRACE(log, "Removed all entries from LRU queue"); LOG_TRACE(log, "Removed all entries from LRU queue");
queue.clear(); queue.clear();
cache_size = 0; current_size = 0;
}
void LRUFileCachePriority::pop()
{
remove(queue.begin());
} }
LRUFileCachePriority::LRUQueueIterator LRUFileCachePriority::remove(LRUQueueIterator it) LRUFileCachePriority::LRUQueueIterator LRUFileCachePriority::remove(LRUQueueIterator it)
{ {
cache_size -= it->size; current_size -= it->size;
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, it->size); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, it->size);
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
@ -73,7 +84,7 @@ LRUFileCachePriority::LRUFileCacheIterator::LRUFileCacheIterator(
{ {
} }
void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityQueueGuard::Lock &) void LRUFileCachePriority::iterate(IterateFunc && func)
{ {
for (auto it = queue.begin(); it != queue.end();) for (auto it = queue.begin(); it != queue.end();)
{ {
@ -98,19 +109,19 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityQueue
} }
} }
LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remove(const CachePriorityQueueGuard::Lock &) LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remove()
{ {
return std::make_shared<LRUFileCacheIterator>(cache_priority, cache_priority->remove(queue_iter)); return std::make_shared<LRUFileCacheIterator>(cache_priority, cache_priority->remove(queue_iter));
} }
void LRUFileCachePriority::LRUFileCacheIterator::incrementSize(ssize_t size, const CachePriorityQueueGuard::Lock &) void LRUFileCachePriority::LRUFileCacheIterator::incrementSize(ssize_t size)
{ {
cache_priority->cache_size += size; cache_priority->current_size += size;
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size); CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
queue_iter->size += size; queue_iter->size += size;
} }
size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CachePriorityQueueGuard::Lock &) size_t LRUFileCachePriority::LRUFileCacheIterator::use()
{ {
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, queue_iter); cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, queue_iter);
return ++queue_iter->hits; return ++queue_iter->hits;

View File

@ -18,25 +18,26 @@ private:
using LRUQueueIterator = typename LRUQueue::iterator; using LRUQueueIterator = typename LRUQueue::iterator;
public: public:
LRUFileCachePriority() = default; LRUFileCachePriority(size_t max_size_, size_t max_elements_) : IFileCachePriority(max_size_, max_elements_) {}
Iterator add( size_t getSize() const override { return current_size; }
const Key & key,
size_t offset,
size_t size,
KeyTransactionCreatorPtr key_transaction_creator,
const CachePriorityQueueGuard::Lock &) override;
void removeAll(const CachePriorityQueueGuard::Lock &) override; size_t getElementsCount() const override { return queue.size(); }
void iterate(IterateFunc && func, const CachePriorityQueueGuard::Lock &) override; Iterator add(const Key & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) override;
size_t getElementsNum(const CachePriorityQueueGuard::Lock &) const override { return queue.size(); } void pop() override;
void removeAll() override;
void iterate(IterateFunc && func) override;
private: private:
LRUQueue queue; LRUQueue queue;
Poco::Logger * log = &Poco::Logger::get("LRUFileCachePriority"); Poco::Logger * log = &Poco::Logger::get("LRUFileCachePriority");
size_t current_size = 0;
LRUQueueIterator remove(LRUQueueIterator it); LRUQueueIterator remove(LRUQueueIterator it);
}; };
@ -50,11 +51,11 @@ public:
Entry & operator *() override { return *queue_iter; } Entry & operator *() override { return *queue_iter; }
const Entry & operator *() const override { return *queue_iter; } const Entry & operator *() const override { return *queue_iter; }
size_t use(const CachePriorityQueueGuard::Lock &) override; size_t use() override;
Iterator remove(const CachePriorityQueueGuard::Lock &) override; Iterator remove() override;
void incrementSize(ssize_t size, const CachePriorityQueueGuard::Lock &) override; void incrementSize(ssize_t size) override;
private: private:
LRUFileCachePriority * cache_priority; LRUFileCachePriority * cache_priority;

View File

@ -20,7 +20,7 @@ static Block getSampleBlock()
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_elements"}, ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_elements"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_file_segment_size"}, ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "max_file_segment_size"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_on_write_operations"}, ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_on_write_operations"},
ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "enable_cache_hits_threshold"}, ColumnWithTypeAndName{std::make_shared<DataTypeNumber<UInt8>>(), "cache_hits_threshold"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "current_size"}, ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "current_size"},
ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "current_elements"}, ColumnWithTypeAndName{std::make_shared<DataTypeUInt64>(), "current_elements"},
ColumnWithTypeAndName{std::make_shared<DataTypeString>(), "path"}, ColumnWithTypeAndName{std::make_shared<DataTypeString>(), "path"},
@ -45,7 +45,7 @@ BlockIO InterpreterDescribeCacheQuery::execute()
res_columns[1]->insert(settings.max_elements); res_columns[1]->insert(settings.max_elements);
res_columns[2]->insert(settings.max_file_segment_size); res_columns[2]->insert(settings.max_file_segment_size);
res_columns[3]->insert(settings.cache_on_write_operations); res_columns[3]->insert(settings.cache_on_write_operations);
res_columns[4]->insert(settings.enable_cache_hits_threshold); res_columns[4]->insert(settings.cache_hits_threshold);
res_columns[5]->insert(cache->getUsedCacheSize()); res_columns[5]->insert(cache->getUsedCacheSize());
res_columns[6]->insert(cache->getFileSegmentsNum()); res_columns[6]->insert(cache->getFileSegmentsNum());
res_columns[7]->insert(cache->getBasePath()); res_columns[7]->insert(cache->getBasePath());