From b1426ddbdd4bf38f0efd33b64d7f7e26c883ea69 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 18 Jan 2023 20:52:16 +0100 Subject: [PATCH] Refactoring --- src/Interpreters/Cache/FileCache.cpp | 472 ++++++------------ src/Interpreters/Cache/FileCache.h | 156 +++--- src/Interpreters/Cache/FileCacheSettings.cpp | 2 +- src/Interpreters/Cache/FileCacheSettings.h | 2 +- src/Interpreters/Cache/FileCache_fwd.h | 4 +- src/Interpreters/Cache/FileSegment.cpp | 9 +- src/Interpreters/Cache/FileSegment.h | 2 +- src/Interpreters/Cache/Guards.h | 3 +- src/Interpreters/Cache/IFileCachePriority.h | 103 ++-- .../Cache/LRUFileCachePriority.cpp | 41 +- src/Interpreters/Cache/LRUFileCachePriority.h | 27 +- .../InterpreterDescribeCacheQuery.cpp | 4 +- 12 files changed, 367 insertions(+), 458 deletions(-) diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index c38450132a8..9bb233bea25 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -26,17 +26,18 @@ FileCache::FileCache( const String & cache_base_path_, const FileCacheSettings & cache_settings_) : cache_base_path(cache_base_path_) - , max_size(cache_settings_.max_size) - , max_element_size(cache_settings_.max_elements) , max_file_segment_size(cache_settings_.max_file_segment_size) , allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files) - , enable_bypass_cache_with_threshold(cache_settings_.enable_bypass_cache_with_threashold) - , bypass_cache_threshold(enable_bypass_cache_with_threshold ? cache_settings_.bypass_cache_threashold : 0) + , bypass_cache_threshold(cache_settings_.enable_bypass_cache_with_threashold ? cache_settings_.bypass_cache_threashold : 0) , log(&Poco::Logger::get("FileCache")) - , main_priority(std::make_unique()) - , stash(std::make_unique(cache_settings_.max_elements, cache_settings_.enable_cache_hits_threshold, std::make_unique())) - , query_limit(cache_settings_.enable_filesystem_query_cache_limit ? std::make_unique() : nullptr) { + main_priority = std::make_unique(cache_settings_.max_size, cache_settings_.max_elements); + + if (cache_settings_.cache_hits_threshold) + stash = std::make_unique(cache_settings_.cache_hits_threshold, cache_settings_.max_elements); + + if (cache_settings_.enable_filesystem_query_cache_limit) + query_limit = std::make_unique(); } FileCache::Key FileCache::createKeyForPath(const String & path) @@ -497,7 +498,7 @@ FileCache::CacheCells::iterator FileCache::addCell( FileSegment::State state, const CreateFileSegmentSettings & settings, KeyTransaction & key_transaction, - CachePriorityQueueGuard::Lock * queue_lock) + CachePriorityQueueGuard::LockPtr * queue_lock) { /// Create a file segment cell and put it in `files` map by [key][offset]. @@ -516,30 +517,32 @@ FileCache::CacheCells::iterator FileCache::addCell( /// `stash` - a queue of "stashed" key-offset pairs. Implements counting of /// cache entries and allows caching only if cache hit threadhold is reached. - if (stash && stash->cache_hits_threshold && state == FileSegment::State::EMPTY) + if (stash && state == FileSegment::State::EMPTY) { - // auto stash_lock = stash.lock(); - // KeyAndOffset stash_key(key, offset); + if (!queue_lock) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Using stash requires queue_lock"); - // auto record_it = stash.records.find(stash_key); - // if (record_it == stash.records.end()) - // { - // auto & stash_queue = *stash.queue; - // auto & stash_records = stash.records; + KeyAndOffset stash_key(key, offset); - // stash_records.insert({stash_key, stash_queue.add(key, offset, 0, key_transaction.getCreator(), lock)}); + auto record_it = stash->records.find(stash_key); + if (record_it == stash->records.end()) + { + auto stash_queue = LockedCachePriority(*queue_lock, *stash->queue); + auto & stash_records = stash->records; - // if (stash_queue.getElementsNum(lock) > stash.max_stash_queue_size) - // stash_records.erase(stash_queue.pop(lock)); + stash_records.emplace(stash_key, stash_queue.add(key, offset, 0, key_transaction.getCreator())); - // result_state = FileSegment::State::SKIP_CACHE; - // } - // else - // { - // result_state = record_it->second->use() >= stash.cache_hits_threshold - // ? FileSegment::State::EMPTY - // : FileSegment::State::SKIP_CACHE; - // } + if (stash_queue.getElementsCount() > stash->queue->getElementsLimit()) + stash_queue.pop(); + + result_state = FileSegment::State::SKIP_CACHE; + } + else + { + result_state = LockedCachePriorityIterator(*queue_lock, record_it->second).use() >= stash->hits_threshold + ? FileSegment::State::EMPTY + : FileSegment::State::SKIP_CACHE; + } } else { @@ -549,7 +552,9 @@ FileCache::CacheCells::iterator FileCache::addCell( auto file_segment = std::make_shared( offset, size, key, key_transaction.getCreator(), this, result_state, settings); - FileSegmentCell cell(std::move(file_segment), key_transaction, *main_priority, queue_lock); + std::optional locked_queue(queue_lock ? LockedCachePriority(*queue_lock, *main_priority) : std::optional{}); + + FileSegmentCell cell(std::move(file_segment), key_transaction, locked_queue ? &*locked_queue : nullptr); auto [cell_it, inserted] = key_transaction.getOffsets().emplace(offset, std::move(cell)); assert(inserted); @@ -560,9 +565,9 @@ FileCache::CacheCells::iterator FileCache::addCell( bool FileCache::tryReserve(const Key & key, size_t offset, size_t size) { assertInitialized(); - auto queue_lock = main_priority->lock(); + auto lock = priority_queue_guard.lock(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW); - return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock); + return tryReserveUnlocked(key, offset, size, key_transaction, lock); } bool FileCache::tryReserveUnlocked( @@ -570,24 +575,20 @@ bool FileCache::tryReserveUnlocked( size_t offset, size_t size, KeyTransactionPtr key_transaction, - const CachePriorityQueueGuard::Lock & queue_lock) + CachePriorityQueueGuard::LockPtr queue_lock) { auto query_context = query_limit ? query_limit->tryGetQueryContext(queue_lock) : nullptr; - bool reserved; + if (query_context) { - const bool query_limit_exceeded = query_context->getCacheSize() + size > query_context->getMaxCacheSize(); - if (!query_limit_exceeded) - reserved = tryReserveInCache(key, offset, size, query_context, key_transaction, queue_lock); - else if (query_context->isSkipDownloadIfExceed()) - reserved = false; - else - reserved = tryReserveInQueryCache(key, offset, size, query_context, key_transaction, queue_lock); + const bool query_limit_exceeded = query_context->getSize() + size > query_context->getSizeLimit(); + reserved = (!query_limit_exceeded || query_context->recacheOnQueryLimitExceeded()) + && tryReserveImpl(query_context->getPriority(), key, offset, size, key_transaction, query_context.get(), queue_lock); } else { - reserved = tryReserveInCache(key, offset, size, nullptr, key_transaction, queue_lock); + reserved = tryReserveImpl(*main_priority, key, offset, size, key_transaction, nullptr, queue_lock); } if (reserved && !key_transaction->getOffsets().created_base_directory) @@ -595,148 +596,14 @@ bool FileCache::tryReserveUnlocked( fs::create_directories(getPathInLocalCache(key)); key_transaction->getOffsets().created_base_directory = true; } + return reserved; } -bool FileCache::tryReserveInQueryCache( - const Key & key, - size_t offset, - size_t size, - QueryLimit::QueryContextPtr query_context, - KeyTransactionPtr, - const CachePriorityQueueGuard::Lock & queue_lock) -{ - LOG_TEST(log, "Reserving query cache space {} for {}:{}", size, key.toString(), offset); - - auto & query_priority = query_context->getPriority(); - if (query_priority.getElementsNum(queue_lock)) {} -// struct Segment -// { -// Key key; -// size_t offset; -// size_t size; -// -// Segment(Key key_, size_t offset_, size_t size_) -// : key(key_), offset(offset_), size(size_) {} -// }; -// -// std::vector ghost; - - // size_t queue_size = main_priority->getElementsNum(lock); - // size_t removed_size = 0; - - // auto is_overflow = [&] - // { - // return (max_size != 0 && main_priority->getCacheSize() + size - removed_size > max_size) - // || (max_element_size != 0 && queue_size > max_element_size) - // || (query_context->getCacheSize() + size - removed_size > query_context->getMaxCacheSize()); - // }; - - // query_priority.iterate([&](const QueueEntry & entry) -> IterationResult - // { - // if (!is_overflow()) - // return IterationResult::BREAK; - - // // auto * cell = key_transaction.getOffsets().tryGet(iter->offset()); - - // return IterationResult::CONTINUE; - // }, lock); -// -// auto * cell = key_transaction.getOffsets().tryGet(iter->offset()); -// -// if (!cell) -// { -// /// The cache corresponding to this record may be swapped out by -// /// other queries, so it has become invalid. -// removed_size += iter->size(); -// ghost.push_back(Segment(iter->key(), iter->offset(), iter->size())); -// /// next() -// iter->removeAndGetNext(); -// } -// else -// { -// size_t cell_size = cell->size(); -// assert(iter->size() == cell_size); -// -// if (cell->releasable()) -// { -// auto & file_segment = cell->file_segment; -// -// if (file_segment->isPersistent() && allow_persistent_files) -// continue; -// -// switch (file_segment->state()) -// { -// case FileSegment::State::DOWNLOADED: -// { -// to_evict.push_back(cell); -// break; -// } -// default: -// { -// trash.push_back(cell); -// break; -// } -// } -// removed_size += cell_size; -// --queue_size; -// } -// -// iter->next(); -// } -// } -// -// // auto remove_file_segment = [&](FileSegmentPtr file_segment, size_t file_segment_size) -// // { -// // /// FIXME: key transaction is incorrect -// // query_context->remove(file_segment->key(), file_segment->offset(), file_segment_size, key_transaction); -// // remove(file_segment); -// // }; -// -// assert(trash.empty()); -// //for (auto & cell : trash) -// //{ -// // if (auto file_segment = cell->file_segment) -// // // remove_file_segment(file_segment, cell->size()); -// //} -// -// for (auto & entry : ghost) -// /// FIXME: key transaction is incorrect -// query_context->remove(entry.key, entry.offset, entry.size, key_transaction); -// -// if (is_overflow()) -// return false; -// -// if (cell_for_reserve) -// { -// auto queue_iterator = cell_for_reserve->queue_iterator; -// if (queue_iterator) -// queue_iterator->incrementSize(size); -// else -// cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction.getCreator()); -// } -// -// // for (auto & cell : to_evict) -// // { -// // if (auto file_segment = cell->file_segment) -// // remove_file_segment(file_segment, cell->size()); -// // } -// -// query_context->reserve(key, offset, size, key_transaction); -// -// if (reserved && !key_transaction.getOffsets().created_base_directory) -// { -// fs::create_directories(getPathInLocalCache(key)); -// key_transaction.getOffsets().created_base_directory = true; -// } - return true; -} - void FileCache::iterateAndCollectKeyLocks( - IFileCachePriority & priority, + LockedCachePriority & priority, IterateAndCollectLocksFunc && func, - KeyTransactionsMap & locked_map, - const CachePriorityQueueGuard::Lock & queue_lock) + KeyTransactionsMap & locked_map) { priority.iterate([&, func = std::move(func)](const IFileCachePriority::Entry & entry) { @@ -754,25 +621,37 @@ void FileCache::iterateAndCollectKeyLocks( locked_map.emplace(entry.key, current); return res.iteration_result; - }, queue_lock); + }); } -bool FileCache::tryReserveInCache( +bool FileCache::tryReserveImpl( + IFileCachePriority & priority_queue, const Key & key, size_t offset, size_t size, - QueryLimit::QueryContextPtr query_context, KeyTransactionPtr key_transaction, - const CachePriorityQueueGuard::Lock & queue_lock) + QueryLimit::LockedQueryContext * query_context, + CachePriorityQueueGuard::LockPtr priority_lock) { + /// Iterate cells in the priority of `priority_queue`. + /// If some entry is in `priority_queue` it must be guaranteed to have a + /// corresponding cache cell in key_transaction->offsets() and in + /// query_context->records (if query_context != nullptr). + /// When we evict some entry, then it must be removed from both: + /// main_priority and query_context::priority (if query_context != nullptr). + /// If we successfulkly reserved space, entry must be added to both: + /// cells and query_context::records (if query_context != nullptr); + LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset); - size_t queue_size = main_priority->getElementsNum(queue_lock); - chassert(queue_size <= max_element_size); + LockedCachePriority locked_priority_queue(priority_lock, priority_queue); + LockedCachePriority locked_main_priority(priority_lock, *main_priority); - auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset); + size_t queue_size = locked_priority_queue.getElementsCount(); + chassert(queue_size <= locked_priority_queue.getElementsLimit()); /// A cell acquires a LRUQueue iterator on first successful space reservation attempt. + auto * cell_for_reserve = key_transaction->getOffsets().tryGet(offset); if (!cell_for_reserve || !cell_for_reserve->queue_iterator) queue_size += 1; @@ -781,8 +660,9 @@ bool FileCache::tryReserveInCache( { /// max_size == 0 means unlimited cache size, /// max_element_size means unlimited number of cache elements. - return (max_size != 0 && main_priority->getCacheSize(queue_lock) + size - removed_size > max_size) - || (max_element_size != 0 && queue_size > max_element_size); + return (main_priority->getSizeLimit() != 0 && locked_main_priority.getSize() + size - removed_size > main_priority->getSizeLimit()) + || (main_priority->getElementsLimit() != 0 && queue_size > main_priority->getElementsLimit()) + || (query_context && query_context->getSize() + size - removed_size > query_context->getSizeLimit()); }; KeyTransactionsMap locked; @@ -792,7 +672,7 @@ bool FileCache::tryReserveInCache( using IterationResult = IFileCachePriority::IterationResult; iterateAndCollectKeyLocks( - *main_priority, + locked_priority_queue, [&](const QueueEntry & entry, KeyTransaction & locked_key) -> IterateAndLockResult { if (!is_overflow()) @@ -800,7 +680,9 @@ bool FileCache::tryReserveInCache( auto * cell = locked_key.getOffsets().get(entry.offset); + chassert(cell->queue_iterator); chassert(entry.size == cell->size()); + const size_t cell_size = cell->size(); bool remove_current_it = false; @@ -832,7 +714,7 @@ bool FileCache::tryReserveInCache( { remove_current_it = true; cell->queue_iterator = {}; - locked_key.remove(file_segment, queue_lock); + locked_key.remove(file_segment, priority_lock); break; } } @@ -845,41 +727,48 @@ bool FileCache::tryReserveInCache( return { IterationResult::REMOVE_AND_CONTINUE, save_key_transaction }; return { IterationResult::CONTINUE, save_key_transaction }; - }, locked, queue_lock); + }, locked); if (is_overflow()) return false; - if (cell_for_reserve) - { - /// queue_iteratir is std::nullopt here if no space has been reserved yet, a cache cell - /// acquires queue iterator on first successful space reservation attempt. - /// If queue iterator already exists, we need to update the size after each space reservation. - auto queue_iterator = cell_for_reserve->queue_iterator; - if (queue_iterator) - queue_iterator->incrementSize(size, queue_lock); - else - { - /// Space reservation is incremental, so cache cell is created first (with state empty), - /// and queue_iterator is assigned on first space reservation attempt. - cell_for_reserve->queue_iterator = main_priority->add(key, offset, size, key_transaction->getCreator(), queue_lock); - } - } - for (auto & [_, transaction] : locked) { for (const auto & offset_to_delete : transaction->delete_offsets) { auto * cell = transaction->getOffsets().get(offset_to_delete); - transaction->remove(cell->file_segment, queue_lock); + transaction->remove(cell->file_segment, priority_lock); + if (query_context) + query_context->remove(key, offset); } } - if (main_priority->getCacheSize(queue_lock) > (1ull << 63)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); + if (cell_for_reserve) + { + /// queue_iteratir is std::nullopt here if no space has been reserved yet, a cache cell + /// acquires queue iterator on first successful space reservation attempt. + /// If queue iterator already exists, we need to update the size after each space reservation. + if (cell_for_reserve->queue_iterator) + LockedCachePriorityIterator(priority_lock, cell_for_reserve->queue_iterator).incrementSize(size); + else + { + /// Space reservation is incremental, so cache cell is created first (with state empty), + /// and queue_iterator is assigned on first space reservation attempt. + cell_for_reserve->queue_iterator = locked_main_priority.add(key, offset, size, key_transaction->getCreator()); + } + } if (query_context) - query_context->reserve(key, offset, size, *key_transaction, queue_lock); + { + auto queue_iterator = query_context->tryGet(key, offset); + if (queue_iterator) + LockedCachePriorityIterator(priority_lock, queue_iterator).incrementSize(size); + else + query_context->add(key, offset, LockedCachePriority(priority_lock, query_context->getPriority()).add(key, offset, size, key_transaction->getCreator())); + } + + if (locked_main_priority.getSize() > (1ull << 63)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); return true; } @@ -888,7 +777,7 @@ void FileCache::removeIfExists(const Key & key) { assertInitialized(); - auto queue_lock = main_priority->lock(); + auto queue_lock = priority_queue_guard.lock(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); if (!key_transaction) return; @@ -927,8 +816,8 @@ void FileCache::removeAllReleasable() /// `remove_persistent_files` defines whether non-evictable by some criteria files /// (they do not comply with the cache eviction policy) should also be removed. - auto queue_lock = main_priority->lock(); - main_priority->iterate([&](const QueueEntry & entry) -> IterationResult + auto queue_lock = priority_queue_guard.lock(); + LockedCachePriority(queue_lock, *main_priority).iterate([&](const QueueEntry & entry) -> IterationResult { auto key_transaction = entry.createKeyTransaction(); auto * cell = key_transaction->getOffsets().get(entry.offset); @@ -940,14 +829,13 @@ void FileCache::removeAllReleasable() return IterationResult::REMOVE_AND_CONTINUE; } return IterationResult::CONTINUE; - }, queue_lock); + }); if (stash) { /// Remove all access information. - auto lock = stash->queue->lock(); stash->records.clear(); - stash->queue->removeAll(lock); + LockedCachePriority(queue_lock, *stash->queue).removeAll(); } } @@ -966,7 +854,7 @@ KeyTransaction::~KeyTransaction() cleanupKeyDirectory(); } -void KeyTransaction::remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock & queue_lock) +void KeyTransaction::remove(FileSegmentPtr file_segment, CachePriorityQueueGuard::LockPtr queue_lock) { /// We must hold pointer to file segment while removing it. chassert(file_segment->key() == key); @@ -982,7 +870,7 @@ bool KeyTransaction::isLastHolder(size_t offset) void KeyTransaction::remove( size_t offset, const FileSegmentGuard::Lock & segment_lock, - const CachePriorityQueueGuard::Lock & queue_lock) + CachePriorityQueueGuard::LockPtr queue_lock) { LOG_DEBUG( log, "Remove from cache. Key: {}, offset: {}", @@ -991,7 +879,7 @@ void KeyTransaction::remove( auto * cell = offsets->get(offset); if (cell->queue_iterator) - cell->queue_iterator->remove(queue_lock); + LockedCachePriorityIterator(queue_lock, cell->queue_iterator).remove(); const auto cache_file_path = cell->file_segment->getPathInLocalCache(); cell->file_segment->detach(segment_lock, *this); @@ -1041,7 +929,8 @@ void KeyTransaction::cleanupKeyDirectory() const void FileCache::loadMetadata() { - auto queue_lock = main_priority->lock(); + auto queue_lock = priority_queue_guard.lock(); + LockedCachePriority priority_queue(queue_lock, *main_priority); UInt64 offset = 0; size_t size = 0; @@ -1137,7 +1026,7 @@ void FileCache::loadMetadata() log, "Cache capacity changed (max size: {}, used: {}), " "cached file `{}` does not fit in cache anymore (size: {})", - max_size, main_priority->getCacheSize(queue_lock), key_it->path().string(), size); + priority_queue.getSizeLimit(), priority_queue.getSize(), key_it->path().string(), size); fs::remove(offset_it->path()); } @@ -1148,7 +1037,7 @@ void FileCache::loadMetadata() /// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority. pcg64 generator(randomSeed()); std::shuffle(queue_entries.begin(), queue_entries.end(), generator); - for (const auto & [it, file_segment] : queue_entries) + for (auto & [it, file_segment] : queue_entries) { /// Cell cache size changed and, for example, 1st file segment fits into cache /// and 2nd file segment will fit only if first was evicted, then first will be removed and @@ -1156,14 +1045,14 @@ void FileCache::loadMetadata() if (file_segment.expired()) continue; - it->use(queue_lock); + LockedCachePriorityIterator(queue_lock, it).use(); } } void KeyTransaction::reduceSizeToDownloaded( size_t offset, const FileSegmentGuard::Lock & segment_lock, - const CachePriorityQueueGuard::Lock & queue_lock) + CachePriorityQueueGuard::LockPtr queue_lock) { /** * In case file was partially downloaded and it's download cannot be continued @@ -1184,7 +1073,7 @@ void KeyTransaction::reduceSizeToDownloaded( file_segment->getInfoForLogUnlocked(segment_lock)); } - [[maybe_unused]] const auto & entry = **cell->queue_iterator; + [[maybe_unused]] const auto & entry = *LockedCachePriorityIterator(queue_lock, cell->queue_iterator); assert(file_segment->downloaded_size <= file_segment->reserved_size); assert(entry.size == file_segment->reserved_size); assert(entry.size >= file_segment->downloaded_size); @@ -1192,7 +1081,7 @@ void KeyTransaction::reduceSizeToDownloaded( if (file_segment->reserved_size > file_segment->downloaded_size) { int64_t extra_size = static_cast(cell->file_segment->reserved_size) - static_cast(file_segment->downloaded_size); - cell->queue_iterator->incrementSize(-extra_size, queue_lock); + LockedCachePriorityIterator(queue_lock, cell->queue_iterator).incrementSize(-extra_size); } CreateFileSegmentSettings create_settings(file_segment->getKind()); @@ -1235,13 +1124,13 @@ FileSegmentsHolderPtr FileCache::dumpQueue() using IterationResult = IFileCachePriority::IterationResult; FileSegments file_segments; - main_priority->iterate([&](const QueueEntry & entry) + LockedCachePriority(priority_queue_guard.lock(), *main_priority).iterate([&](const QueueEntry & entry) { auto tx = entry.createKeyTransaction(); auto * cell = tx->getOffsets().get(entry.offset); file_segments.push_back(FileSegment::getSnapshot(cell->file_segment)); return IterationResult::CONTINUE; - }, main_priority->lock()); + }); return std::make_unique(std::move(file_segments)); } @@ -1266,21 +1155,18 @@ std::vector FileCache::tryGetCachePaths(const Key & key) size_t FileCache::getUsedCacheSize() const { - auto lock = main_priority->lock(); - return main_priority->getCacheSize(lock); + return LockedCachePriority(priority_queue_guard.lock(), *main_priority).getSize(); } size_t FileCache::getFileSegmentsNum() const { - auto lock = main_priority->lock(); - return main_priority->getElementsNum(lock); + return LockedCachePriority(priority_queue_guard.lock(), *main_priority).getElementsCount(); } FileCache::FileSegmentCell::FileSegmentCell( FileSegmentPtr file_segment_, KeyTransaction & key_transaction, - IFileCachePriority & priority_queue, - CachePriorityQueueGuard::Lock * queue_lock) + LockedCachePriority * locked_queue) : file_segment(file_segment_) { /** @@ -1293,17 +1179,15 @@ FileCache::FileSegmentCell::FileSegmentCell( { case FileSegment::State::DOWNLOADED: { - if (!queue_lock) + if (!locked_queue) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Adding file segment with state DOWNLOADED requires locked queue lock"); } - queue_iterator = priority_queue.add( - file_segment->key(), file_segment->offset(), file_segment->range().size(), - key_transaction.getCreator(), *queue_lock); + queue_iterator = locked_queue->add( + file_segment->key(), file_segment->offset(), file_segment->range().size(), key_transaction.getCreator()); - /// TODO: add destructor break; } case FileSegment::State::SKIP_CACHE: @@ -1384,10 +1268,11 @@ void FileCache::assertCacheCorrectness() using QueueEntry = IFileCachePriority::Entry; using IterationResult = IFileCachePriority::IterationResult; - auto lock = main_priority->lock(); + auto lock = priority_queue_guard.lock(); + LockedCachePriority queue(lock, *main_priority); [[maybe_unused]] size_t total_size = 0; - main_priority->iterate([&](const QueueEntry & entry) -> IterationResult + queue.iterate([&](const QueueEntry & entry) -> IterationResult { auto key_transaction = entry.createKeyTransaction(); auto * cell = key_transaction->getOffsets().get(entry.offset); @@ -1402,11 +1287,11 @@ void FileCache::assertCacheCorrectness() total_size += entry.size; return IterationResult::CONTINUE; - }, lock); + }); - assert(total_size == main_priority->getCacheSize(lock)); - assert(main_priority->getCacheSize(lock) <= max_size); - assert(main_priority->getElementsNum(lock) <= max_element_size); + chassert(queue.getSize() == total_size); + chassert(queue.getSize() <= queue.getSizeLimit()); + chassert(queue.getElementsCount() <= queue.getElementsLimit()); } FileCache::QueryContextHolder::QueryContextHolder( @@ -1424,28 +1309,23 @@ FileCache::QueryContextHolder::~QueryContextHolder() /// If only the query_map and the current holder hold the context_query, /// the query has been completed and the query_context is released. if (context && context.use_count() == 2) - cache->query_limit->removeQueryContext(query_id, cache->main_priority->lock()); + { + auto lock = cache->priority_queue_guard.lock(); + cache->query_limit->removeQueryContext(query_id, lock); + } } -FileCache::QueryLimit::QueryContext::QueryContext( - size_t max_cache_size_, - bool skip_download_if_exceeds_query_cache_) - : max_cache_size(max_cache_size_) - , skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) -{ -} - -FileCache::QueryLimit::QueryContextPtr -FileCache::QueryLimit::tryGetQueryContext(const CachePriorityQueueGuard::Lock &) +FileCache::QueryLimit::LockedQueryContextPtr +FileCache::QueryLimit::tryGetQueryContext(CachePriorityQueueGuard::LockPtr lock) { if (!isQueryInitialized()) return nullptr; auto query_iter = query_map.find(std::string(CurrentThread::getQueryId())); - return (query_iter == query_map.end()) ? nullptr : query_iter->second; + return (query_iter == query_map.end()) ? nullptr : std::make_unique(query_iter->second, lock); } -void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, const CachePriorityQueueGuard::Lock &) +void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, CachePriorityQueueGuard::LockPtr) { auto query_iter = query_map.find(query_id); if (query_iter == query_map.end()) @@ -1461,7 +1341,7 @@ void FileCache::QueryLimit::removeQueryContext(const std::string & query_id, con FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryContext( const std::string & query_id, const ReadSettings & settings, - const CachePriorityQueueGuard::Lock &) + CachePriorityQueueGuard::LockPtr) { if (query_id.empty()) return nullptr; @@ -1470,7 +1350,7 @@ FileCache::QueryLimit::QueryContextPtr FileCache::QueryLimit::getOrSetQueryConte if (inserted) { it->second = std::make_shared( - settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache); + settings.max_query_cache_size, !settings.skip_download_if_exceeds_query_cache); } return it->second; @@ -1482,70 +1362,40 @@ FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder( if (!query_limit || settings.max_query_cache_size == 0) return {}; - auto context = query_limit->getOrSetQueryContext(query_id, settings, main_priority->lock()); + auto lock = priority_queue_guard.lock(); + auto context = query_limit->getOrSetQueryContext(query_id, settings, lock); return std::make_unique(query_id, this, std::move(context)); } -void FileCache::QueryLimit::QueryContext::remove( - const Key & key, - size_t offset, - size_t size, - const CachePriorityQueueGuard::Lock & queue_lock) +void FileCache::QueryLimit::LockedQueryContext::add(const Key & key, size_t offset, IFileCachePriority::Iterator iterator) { - std::lock_guard lock(mutex); - if (cache_size < size) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size"); - - if (!skip_download_if_exceeds_query_cache) - { - auto record = records.find({key, offset}); - if (record != records.end()) - { - record->second->remove(queue_lock); - records.erase({key, offset}); - } - } - cache_size -= size; -} - -void FileCache::QueryLimit::QueryContext::reserve( - const Key & key, - size_t offset, - size_t size, - const KeyTransaction & key_transaction, - const CachePriorityQueueGuard::Lock & queue_lock) -{ - std::lock_guard lock(mutex); - if (cache_size + size > max_cache_size) + auto [_, inserted] = context->records.emplace(KeyAndOffset{key, offset}, iterator); + if (!inserted) { throw Exception( ErrorCodes::LOGICAL_ERROR, - "Reserved cache size exceeds the remaining cache size (key: {}, offset: {})", - key.toString(), offset); + "Cannot add offset {} to query context under key {}, it already exists", + offset, key.toString()); } - - if (!skip_download_if_exceeds_query_cache) - { - auto record = records.find({key, offset}); - if (record == records.end()) - { - auto queue_iter = priority->add(key, offset, 0, key_transaction.getCreator(), queue_lock); - record = records.insert({{key, offset}, queue_iter}).first; - } - record->second->incrementSize(size, queue_lock); - } - cache_size += size; } -void FileCache::QueryLimit::QueryContext::use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock & queue_lock) +void FileCache::QueryLimit::LockedQueryContext::remove(const Key & key, size_t offset) { - if (skip_download_if_exceeds_query_cache) - return; + auto record = context->records.find({key, offset}); + if (record == context->records.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no {}:{} in query context", key.toString(), offset); + + LockedCachePriorityIterator(lock, record->second).remove(); + context->records.erase({key, offset}); +} + +IFileCachePriority::Iterator FileCache::QueryLimit::LockedQueryContext::tryGet(const Key & key, size_t offset) +{ + auto it = context->records.find({key, offset}); + if (it == context->records.end()) + return nullptr; + return it->second; - std::lock_guard lock(mutex); - auto record = records.find({key, offset}); - if (record != records.end()) - record->second->use(queue_lock); } KeyTransactionPtr KeyTransactionCreator::create() diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 6349be639f3..c0ed8a1da7f 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -116,13 +117,9 @@ private: String cache_base_path; - const size_t max_size; - const size_t max_element_size; const size_t max_file_segment_size; - const bool allow_persistent_files; - const bool enable_bypass_cache_with_threshold; - const size_t bypass_cache_threshold; + const size_t bypass_cache_threshold = 0; Poco::Logger * log; @@ -143,8 +140,7 @@ private: FileSegmentCell( FileSegmentPtr file_segment_, KeyTransaction & key_transaction, - IFileCachePriority & priority_queue, - CachePriorityQueueGuard::Lock * queue_lock); + LockedCachePriority * locked_queue); FileSegmentCell(FileSegmentCell && other) noexcept : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} @@ -214,72 +210,88 @@ private: mutable PendingRemoveFilesMetadata remove_files_metadata; FileCachePriorityPtr main_priority; + mutable CachePriorityQueueGuard priority_queue_guard; struct HitsCountStash { - HitsCountStash(size_t max_stash_queue_size_, size_t cache_hits_threshold_, FileCachePriorityPtr queue_) - : max_stash_queue_size(max_stash_queue_size_) - , cache_hits_threshold(cache_hits_threshold_) - , queue(std::move(queue_)) {} - - const size_t max_stash_queue_size; - const size_t cache_hits_threshold; - - auto lock() const { return queue->lock(); } + HitsCountStash(size_t hits_threashold_, size_t queue_size_) + : hits_threshold(hits_threashold_), queue(std::make_unique(0, queue_size_)) + { + if (!queue_size_) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Queue size for hits queue must be non-zero"); + } + const size_t hits_threshold; FileCachePriorityPtr queue; - using Records = std::unordered_map; Records records; }; mutable std::unique_ptr stash; - struct QueryLimit + class QueryLimit { - /// Used to track and control the cache access of each query. - /// Through it, we can realize the processing of different queries by the cache layer. - struct QueryContext - { - std::mutex mutex; - HitsCountStash::Records records; - FileCachePriorityPtr priority; - - size_t cache_size = 0; - size_t max_cache_size; - - bool skip_download_if_exceeds_query_cache; - - QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_); - - size_t getMaxCacheSize() const { return max_cache_size; } - - size_t getCacheSize() const { return cache_size; } - - IFileCachePriority & getPriority() const { return *priority; } - - bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } - - void remove(const Key & key, size_t offset, size_t size, const CachePriorityQueueGuard::Lock &); - - void reserve(const Key & key, size_t offset, size_t size, const KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &); - - void use(const Key & key, size_t offset, const CachePriorityQueueGuard::Lock &); - }; - + friend class FileCache; + public: + class QueryContext; using QueryContextPtr = std::shared_ptr; - using QueryContextMap = std::unordered_map; + class LockedQueryContext; + using LockedQueryContextPtr = std::unique_ptr; - QueryContextMap query_map; - - QueryContextPtr tryGetQueryContext(const CachePriorityQueueGuard::Lock & lock); - - void removeQueryContext(const std::string & query_id, const CachePriorityQueueGuard::Lock & lock); + LockedQueryContextPtr tryGetQueryContext(CachePriorityQueueGuard::LockPtr lock); QueryContextPtr getOrSetQueryContext( - const std::string & query_id, - const ReadSettings & settings, - const CachePriorityQueueGuard::Lock & lock); + const std::string & query_id, const ReadSettings & settings, CachePriorityQueueGuard::LockPtr); + + void removeQueryContext(const std::string & query_id, CachePriorityQueueGuard::LockPtr); + + private: + using QueryContextMap = std::unordered_map; + QueryContextMap query_map; + + public: + class QueryContext + { + public: + QueryContext(size_t query_cache_size, bool recache_on_query_limit_exceeded_) + : priority(std::make_unique(query_cache_size, 0)) + , recache_on_query_limit_exceeded(recache_on_query_limit_exceeded_) {} + + private: + friend class QueryLimit::LockedQueryContext; + + using Records = std::unordered_map; + Records records; + FileCachePriorityPtr priority; + const bool recache_on_query_limit_exceeded; + }; + + class LockedQueryContext + { + public: + LockedQueryContext(QueryContextPtr context_, CachePriorityQueueGuard::LockPtr lock_) + : context(context_), lock(lock_), priority(lock_, *context->priority) {} + + IFileCachePriority & getPriority() { return *context->priority; } + const IFileCachePriority & getPriority() const { return *context->priority; } + + size_t getSize() const { return priority.getSize(); } + + size_t getSizeLimit() const { return priority.getSizeLimit(); } + + bool recacheOnQueryLimitExceeded() const { return context->recache_on_query_limit_exceeded; } + + IFileCachePriority::Iterator tryGet(const Key & key, size_t offset); + + void add(const Key & key, size_t offset, IFileCachePriority::Iterator iterator); + + void remove(const Key & key, size_t offset); + + private: + QueryContextPtr context; + CachePriorityQueueGuard::LockPtr lock; + LockedCachePriority priority; + }; }; using QueryLimitPtr = std::unique_ptr; @@ -334,30 +346,23 @@ private: FileSegment::State state, const CreateFileSegmentSettings & create_settings, KeyTransaction & key_transaction, - CachePriorityQueueGuard::Lock * queue_lock); + CachePriorityQueueGuard::LockPtr * queue_lock); bool tryReserveUnlocked( const Key & key, size_t offset, size_t size, KeyTransactionPtr key_transaction, - const CachePriorityQueueGuard::Lock &); + CachePriorityQueueGuard::LockPtr); - bool tryReserveInCache( + bool tryReserveImpl( + IFileCachePriority & priority_queue, const Key & key, size_t offset, size_t size, - QueryLimit::QueryContextPtr query_context, KeyTransactionPtr key_transaction, - const CachePriorityQueueGuard::Lock &); - - bool tryReserveInQueryCache( - const Key & key, - size_t offset, - size_t size, - QueryLimit::QueryContextPtr query_context, - KeyTransactionPtr key_transaction, - const CachePriorityQueueGuard::Lock &); + QueryLimit::LockedQueryContext * query_context, + CachePriorityQueueGuard::LockPtr priority_lock); struct IterateAndLockResult { @@ -366,10 +371,9 @@ private: }; using IterateAndCollectLocksFunc = std::function; static void iterateAndCollectKeyLocks( - IFileCachePriority & priority, + LockedCachePriority & priority, IterateAndCollectLocksFunc && func, - KeyTransactionsMap & locked_map, - const CachePriorityQueueGuard::Lock & queue_lock); + KeyTransactionsMap & locked_map); }; struct KeyTransactionCreator @@ -396,11 +400,11 @@ struct KeyTransaction : private boost::noncopyable KeyTransactionCreatorPtr getCreator() const { return std::make_unique(key, offsets, cache); } - void reduceSizeToDownloaded(size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &); + void reduceSizeToDownloaded(size_t offset, const FileSegmentGuard::Lock &, CachePriorityQueueGuard::LockPtr); - void remove(FileSegmentPtr file_segment, const CachePriorityQueueGuard::Lock &); + void remove(FileSegmentPtr file_segment, CachePriorityQueueGuard::LockPtr); - void remove(size_t offset, const FileSegmentGuard::Lock &, const CachePriorityQueueGuard::Lock &); + void remove(size_t offset, const FileSegmentGuard::Lock &, CachePriorityQueueGuard::LockPtr); bool isLastHolder(size_t offset); diff --git a/src/Interpreters/Cache/FileCacheSettings.cpp b/src/Interpreters/Cache/FileCacheSettings.cpp index b13cdd2ed04..3090e036d84 100644 --- a/src/Interpreters/Cache/FileCacheSettings.cpp +++ b/src/Interpreters/Cache/FileCacheSettings.cpp @@ -33,7 +33,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false); enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false); - enable_cache_hits_threshold = config.getUInt64(config_prefix + ".enable_cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD); + cache_hits_threshold = config.getUInt64(config_prefix + ".cache_hits_threshold", REMOTE_FS_OBJECTS_CACHE_DEFAULT_HITS_THRESHOLD); enable_bypass_cache_with_threashold = config.getUInt64(config_prefix + ".enable_bypass_cache_with_threashold", false); diff --git a/src/Interpreters/Cache/FileCacheSettings.h b/src/Interpreters/Cache/FileCacheSettings.h index 80f7b5fa93f..66cb720ab05 100644 --- a/src/Interpreters/Cache/FileCacheSettings.h +++ b/src/Interpreters/Cache/FileCacheSettings.h @@ -15,7 +15,7 @@ struct FileCacheSettings bool cache_on_write_operations = false; - size_t enable_cache_hits_threshold = REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD; + size_t cache_hits_threshold = REMOTE_FS_OBJECTS_CACHE_DEFAULT_HITS_THRESHOLD; bool enable_filesystem_query_cache_limit = false; bool do_not_evict_index_and_mark_files = true; diff --git a/src/Interpreters/Cache/FileCache_fwd.h b/src/Interpreters/Cache/FileCache_fwd.h index 72dc1144fb9..476c04affdf 100644 --- a/src/Interpreters/Cache/FileCache_fwd.h +++ b/src/Interpreters/Cache/FileCache_fwd.h @@ -6,8 +6,8 @@ namespace DB static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 100 * 1024 * 1024; static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS = 1024 * 1024; -static constexpr int REMOTE_FS_OBJECTS_CACHE_ENABLE_HITS_THRESHOLD = 0; -static constexpr size_t REMOTE_FS_OBJECTS_CACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024;; +static constexpr int REMOTE_FS_OBJECTS_CACHE_DEFAULT_HITS_THRESHOLD = 0; +static constexpr size_t REMOTE_FS_OBJECTS_CACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024; class FileCache; using FileCachePtr = std::shared_ptr; diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index e336ba435b5..0c150520e45 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -536,12 +536,12 @@ void FileSegment::setBroken() void FileSegment::complete() { - auto queue_lock = cache->main_priority->lock(); + auto queue_lock = cache->priority_queue_guard.lock(); auto key_transaction = createKeyTransaction(); return completeUnlocked(*key_transaction, queue_lock); } -void FileSegment::completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock & queue_lock) +void FileSegment::completeUnlocked(KeyTransaction & key_transaction, CachePriorityQueueGuard::LockPtr queue_lock) { auto segment_lock = segment_guard.lock(); @@ -817,7 +817,8 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() return file_segments.erase(file_segments.begin()); } - auto queue_lock = file_segment.cache->main_priority->lock(); + auto queue_lock = file_segment.cache->priority_queue_guard.lock(); + /// File segment pointer must be reset right after calling complete() and /// under the same mutex, because complete() checks for segment pointers. auto key_transaction = file_segment.createKeyTransaction(/* assert_exists */false); @@ -825,7 +826,7 @@ FileSegments::iterator FileSegmentsHolder::completeAndPopFrontImpl() { auto queue_iter = key_transaction->getOffsets().tryGet(file_segment.offset())->queue_iterator; if (queue_iter) - queue_iter->use(queue_lock); + LockedCachePriorityIterator(queue_lock, queue_iter).use(); if (!file_segment.isCompleted()) { diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 13fd1f3d86a..9f1e0aef5a3 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -294,7 +294,7 @@ private: /// Function might check if the caller of the method /// is the last alive holder of the segment. Therefore, completion and destruction /// of the file segment pointer must be done under the same cache mutex. - void completeUnlocked(KeyTransaction & key_transaction, const CachePriorityQueueGuard::Lock &); + void completeUnlocked(KeyTransaction & key_transaction, CachePriorityQueueGuard::LockPtr); void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock); bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; diff --git a/src/Interpreters/Cache/Guards.h b/src/Interpreters/Cache/Guards.h index 3189d4daf88..78903ba1168 100644 --- a/src/Interpreters/Cache/Guards.h +++ b/src/Interpreters/Cache/Guards.h @@ -41,10 +41,11 @@ struct CachePriorityQueueGuard explicit Lock(CachePriorityQueueGuard & guard) : lock(guard.mutex) {} std::unique_lock lock; }; + using LockPtr = std::shared_ptr; std::mutex mutex; - Lock lock() { return Lock(*this); } + LockPtr lock() { return std::make_shared(*this); } std::shared_ptr lockShared() { return std::make_shared(*this); } CachePriorityQueueGuard() = default; diff --git a/src/Interpreters/Cache/IFileCachePriority.h b/src/Interpreters/Cache/IFileCachePriority.h index ccef858eb4d..5382db93a22 100644 --- a/src/Interpreters/Cache/IFileCachePriority.h +++ b/src/Interpreters/Cache/IFileCachePriority.h @@ -20,13 +20,13 @@ using KeyTransactionCreatorPtr = std::unique_ptr; /// IFileCachePriority is used to maintain the priority of cached data. class IFileCachePriority { +friend class LockedCachePriority; public: class IIterator; using Key = FileCacheKey; using KeyAndOffset = FileCacheKeyAndOffset; using Iterator = std::shared_ptr; using ConstIterator = std::shared_ptr; - using Lock = CachePriorityQueueGuard::Lock; struct Entry { @@ -48,43 +48,28 @@ public: /// can only traverse the records in the low priority queue. class IIterator { + friend class LockedCachePriorityIterator; public: virtual ~IIterator() = default; + protected: virtual Entry & operator *() = 0; virtual const Entry & operator *() const = 0; - /// Mark a cache record as recently used, it will update the priority - /// of the cache record according to different cache algorithms. - /// Return result hits count. - virtual size_t use(const CachePriorityQueueGuard::Lock &) = 0; + virtual size_t use() = 0; - virtual void incrementSize(ssize_t, const CachePriorityQueueGuard::Lock &) = 0; + virtual void incrementSize(ssize_t) = 0; - /// Remove current cached record. Return iterator to the next value. - virtual Iterator remove(const CachePriorityQueueGuard::Lock &) = 0; + virtual Iterator remove() = 0; }; + IFileCachePriority(size_t max_size_, size_t max_elements_) : max_size(max_size_), max_elements(max_elements_) {} + virtual ~IFileCachePriority() = default; - size_t getCacheSize(const CachePriorityQueueGuard::Lock &) const { return cache_size; } - - virtual size_t getElementsNum(const CachePriorityQueueGuard::Lock &) const = 0; - - /// Lock current priority queue. All methods must be called under this lock. - CachePriorityQueueGuard::Lock lock() { return guard.lock(); } - - /// Add a cache record that did not exist before, and throw a - /// logical exception if the cache block already exists. - virtual Iterator add( - const Key & key, - size_t offset, - size_t size, - KeyTransactionCreatorPtr key_transaction_creator, - const CachePriorityQueueGuard::Lock &) = 0; - - virtual void removeAll(const CachePriorityQueueGuard::Lock &) = 0; + size_t getElementsLimit() const { return max_elements; } + size_t getSizeLimit() const { return max_size; } enum class IterationResult { @@ -92,15 +77,71 @@ public: CONTINUE, REMOVE_AND_CONTINUE, }; - using IterateFunc = std::function; - virtual void iterate(IterateFunc && func, const CachePriorityQueueGuard::Lock &) = 0; - protected: - CachePriorityQueueGuard guard; + const size_t max_size = 0; + const size_t max_elements = 0; - size_t max_cache_size = 0; - size_t cache_size = 0; + virtual size_t getSize() const = 0; + + virtual size_t getElementsCount() const = 0; + + virtual Iterator add( + const Key & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) = 0; + + virtual void pop() = 0; + + virtual void removeAll() = 0; + + using IterateFunc = std::function; + virtual void iterate(IterateFunc && func) = 0; +}; + +class LockedCachePriority +{ +public: + LockedCachePriority(CachePriorityQueueGuard::LockPtr lock_, IFileCachePriority & priority_queue_) + : lock(lock_), queue(priority_queue_) {} + + size_t getElementsLimit() const { return queue.max_elements; } + + size_t getSizeLimit() const { return queue.max_size; } + + size_t getSize() const { return queue.getSize(); } + + size_t getElementsCount() const { return queue.getElementsCount(); } + + IFileCachePriority::Iterator add(const FileCacheKey & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) { return queue.add(key, offset, size, std::move(key_transaction_creator)); } + + void pop() { queue.pop(); } + + void removeAll() { queue.removeAll(); } + + void iterate(IFileCachePriority::IterateFunc && func) { queue.iterate(std::move(func)); } + +private: + CachePriorityQueueGuard::LockPtr lock; + IFileCachePriority & queue; +}; + +class LockedCachePriorityIterator +{ +public: + LockedCachePriorityIterator(CachePriorityQueueGuard::LockPtr lock_, IFileCachePriority::Iterator & iterator_) + : lock(lock_), iterator(iterator_) {} + + IFileCachePriority::Entry & operator *() { return **iterator; } + const IFileCachePriority::Entry & operator *() const { return **iterator; } + + size_t use() { return iterator->use(); } + + void incrementSize(ssize_t size) { return iterator->incrementSize(size); } + + IFileCachePriority::Iterator remove() { return iterator->remove(); } + +private: + CachePriorityQueueGuard::LockPtr lock; + IFileCachePriority::Iterator & iterator; }; using FileCachePriorityPtr = std::unique_ptr; diff --git a/src/Interpreters/Cache/LRUFileCachePriority.cpp b/src/Interpreters/Cache/LRUFileCachePriority.cpp index 49bbb9d05da..215ee257c3b 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.cpp +++ b/src/Interpreters/Cache/LRUFileCachePriority.cpp @@ -17,11 +17,7 @@ namespace ErrorCodes } IFileCachePriority::Iterator LRUFileCachePriority::add( - const Key & key, - size_t offset, - size_t size, - KeyTransactionCreatorPtr key_transaction_creator, - const CachePriorityQueueGuard::Lock &) + const Key & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) { #ifndef NDEBUG for (const auto & entry : queue) @@ -34,8 +30,18 @@ IFileCachePriority::Iterator LRUFileCachePriority::add( } #endif + const auto & size_limit = getSizeLimit(); + if (size_limit && current_size + size > size_limit) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Not enough space to add {}:{} with size {}: current size: {}/{}", + key.toString(), offset, size, current_size, getSizeLimit()); + } + + current_size += size; + auto iter = queue.insert(queue.end(), Entry(key, offset, size, std::move(key_transaction_creator))); - cache_size += size; CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size); CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements); @@ -45,20 +51,25 @@ IFileCachePriority::Iterator LRUFileCachePriority::add( return std::make_shared(this, iter); } -void LRUFileCachePriority::removeAll(const CachePriorityQueueGuard::Lock &) +void LRUFileCachePriority::removeAll() { - CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size); + CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, current_size); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size()); LOG_TRACE(log, "Removed all entries from LRU queue"); queue.clear(); - cache_size = 0; + current_size = 0; +} + +void LRUFileCachePriority::pop() +{ + remove(queue.begin()); } LRUFileCachePriority::LRUQueueIterator LRUFileCachePriority::remove(LRUQueueIterator it) { - cache_size -= it->size; + current_size -= it->size; CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, it->size); CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements); @@ -73,7 +84,7 @@ LRUFileCachePriority::LRUFileCacheIterator::LRUFileCacheIterator( { } -void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityQueueGuard::Lock &) +void LRUFileCachePriority::iterate(IterateFunc && func) { for (auto it = queue.begin(); it != queue.end();) { @@ -98,19 +109,19 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityQueue } } -LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remove(const CachePriorityQueueGuard::Lock &) +LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remove() { return std::make_shared(cache_priority, cache_priority->remove(queue_iter)); } -void LRUFileCachePriority::LRUFileCacheIterator::incrementSize(ssize_t size, const CachePriorityQueueGuard::Lock &) +void LRUFileCachePriority::LRUFileCacheIterator::incrementSize(ssize_t size) { - cache_priority->cache_size += size; + cache_priority->current_size += size; CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size); queue_iter->size += size; } -size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CachePriorityQueueGuard::Lock &) +size_t LRUFileCachePriority::LRUFileCacheIterator::use() { cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, queue_iter); return ++queue_iter->hits; diff --git a/src/Interpreters/Cache/LRUFileCachePriority.h b/src/Interpreters/Cache/LRUFileCachePriority.h index 1288f5b0e81..fa916fd2a06 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.h +++ b/src/Interpreters/Cache/LRUFileCachePriority.h @@ -18,25 +18,26 @@ private: using LRUQueueIterator = typename LRUQueue::iterator; public: - LRUFileCachePriority() = default; + LRUFileCachePriority(size_t max_size_, size_t max_elements_) : IFileCachePriority(max_size_, max_elements_) {} - Iterator add( - const Key & key, - size_t offset, - size_t size, - KeyTransactionCreatorPtr key_transaction_creator, - const CachePriorityQueueGuard::Lock &) override; + size_t getSize() const override { return current_size; } - void removeAll(const CachePriorityQueueGuard::Lock &) override; + size_t getElementsCount() const override { return queue.size(); } - void iterate(IterateFunc && func, const CachePriorityQueueGuard::Lock &) override; + Iterator add(const Key & key, size_t offset, size_t size, KeyTransactionCreatorPtr key_transaction_creator) override; - size_t getElementsNum(const CachePriorityQueueGuard::Lock &) const override { return queue.size(); } + void pop() override; + + void removeAll() override; + + void iterate(IterateFunc && func) override; private: LRUQueue queue; Poco::Logger * log = &Poco::Logger::get("LRUFileCachePriority"); + size_t current_size = 0; + LRUQueueIterator remove(LRUQueueIterator it); }; @@ -50,11 +51,11 @@ public: Entry & operator *() override { return *queue_iter; } const Entry & operator *() const override { return *queue_iter; } - size_t use(const CachePriorityQueueGuard::Lock &) override; + size_t use() override; - Iterator remove(const CachePriorityQueueGuard::Lock &) override; + Iterator remove() override; - void incrementSize(ssize_t size, const CachePriorityQueueGuard::Lock &) override; + void incrementSize(ssize_t size) override; private: LRUFileCachePriority * cache_priority; diff --git a/src/Interpreters/InterpreterDescribeCacheQuery.cpp b/src/Interpreters/InterpreterDescribeCacheQuery.cpp index b8f6a9b308d..ca875ee57b2 100644 --- a/src/Interpreters/InterpreterDescribeCacheQuery.cpp +++ b/src/Interpreters/InterpreterDescribeCacheQuery.cpp @@ -20,7 +20,7 @@ static Block getSampleBlock() ColumnWithTypeAndName{std::make_shared(), "max_elements"}, ColumnWithTypeAndName{std::make_shared(), "max_file_segment_size"}, ColumnWithTypeAndName{std::make_shared>(), "cache_on_write_operations"}, - ColumnWithTypeAndName{std::make_shared>(), "enable_cache_hits_threshold"}, + ColumnWithTypeAndName{std::make_shared>(), "cache_hits_threshold"}, ColumnWithTypeAndName{std::make_shared(), "current_size"}, ColumnWithTypeAndName{std::make_shared(), "current_elements"}, ColumnWithTypeAndName{std::make_shared(), "path"}, @@ -45,7 +45,7 @@ BlockIO InterpreterDescribeCacheQuery::execute() res_columns[1]->insert(settings.max_elements); res_columns[2]->insert(settings.max_file_segment_size); res_columns[3]->insert(settings.cache_on_write_operations); - res_columns[4]->insert(settings.enable_cache_hits_threshold); + res_columns[4]->insert(settings.cache_hits_threshold); res_columns[5]->insert(cache->getUsedCacheSize()); res_columns[6]->insert(cache->getFileSegmentsNum()); res_columns[7]->insert(cache->getBasePath());