#include "FileCache.h" #include #include #include #include #include #include #include #include #include #include namespace fs = std::filesystem; namespace DB { namespace ErrorCodes { extern const int FILE_CACHE_ERROR; } namespace { String keyToStr(const FileCache::Key & key) { return getHexUIntLowercase(key); } } FileCache::FileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_) : cache_base_path(cache_base_path_), max_size(max_size_), max_element_size(max_element_size_) { } FileCache::Key FileCache::hash(const String & path) { return sipHash128(path.data(), path.size()); } String FileCache::path(const Key & key, size_t offset) { auto key_str = keyToStr(key); return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset); } String FileCache::path(const Key & key) { auto key_str = keyToStr(key); return fs::path(cache_base_path) / key_str.substr(0, 3) / key_str; } LRUFileCache::LRUFileCache(const String & cache_base_path_, size_t max_size_, size_t max_element_size_) : FileCache(cache_base_path_, max_size_, max_element_size_), log(&Poco::Logger::get("LRUFileCache")) { if (fs::exists(cache_base_path)) restore(); else fs::create_directories(cache_base_path); startup_restore_finished = true; } void LRUFileCache::useCell( const FileSegmentCell & cell, FileSegments & result, [[maybe_unused]] std::lock_guard & cache_lock) { result.push_back(cell.file_segment); /** * A cell receives a queue iterator on first successful space reservation attempt * (space is reserved incrementally on each read buffer nextImpl() call). */ if (cell.queue_iterator) { /// Move to the end of the queue. The iterator remains valid. queue.splice(queue.end(), queue, *cell.queue_iterator); } } LRUFileCache::FileSegmentCell * LRUFileCache::getCell( const Key & key, size_t offset, [[maybe_unused]] std::lock_guard & cache_lock) { auto it = files.find(key); if (it == files.end()) return nullptr; auto & offsets = it->second; auto cell_it = offsets.find(offset); if (cell_it == offsets.end()) return nullptr; return &cell_it->second; } FileSegments LRUFileCache::getImpl( const Key & key, const FileSegment::Range & range, [[maybe_unused]] std::lock_guard & cache_lock) { /// Given range = [left, right] and non-overlapping ordered set of file segments, /// find list [segment1, ..., segmentN] of segments which intersect with given range. auto it = files.find(key); if (it == files.end()) return {}; const auto & file_segments = it->second; if (file_segments.empty()) { removeFileKey(key); return {}; } FileSegments result; auto segment_it = file_segments.lower_bound(range.left); if (segment_it == file_segments.end()) { /// N - last cached segment for given file key, segment{N}.offset < range.left: /// segment{N} segment{N} /// [________ [_______] /// [__________] OR [________] /// ^ ^ /// range.left range.left const auto & cell = (--file_segments.end())->second; if (cell.file_segment->range().right < range.left) return {}; useCell(cell, result, cache_lock); } else /// segment_it <-- segmment{k} { if (segment_it != file_segments.begin()) { const auto & prev_cell = std::prev(segment_it)->second; const auto & prev_cell_range = prev_cell.file_segment->range(); if (range.left <= prev_cell_range.right) { /// segment{k-1} segment{k} /// [________] [_____ /// [___________ /// ^ /// range.left useCell(prev_cell, result, cache_lock); } } /// segment{k} ... segment{k-1} segment{k} segment{k} /// [______ [______] [____ [________ /// [_________ OR [________ OR [______] ^ /// ^ ^ ^ segment{k}.offset /// range.left range.left range.right while (segment_it != file_segments.end()) { const auto & cell = segment_it->second; if (range.right < cell.file_segment->range().left) break; useCell(cell, result, cache_lock); ++segment_it; } } /// TODO: remove this extra debug logging. String ranges; for (const auto & s : result) ranges += "\nRange: " + s->range().toString() + ", download state: " + FileSegment::toString(s->state()) + " "; LOG_TEST(log, "Cache get. Key: {}, range: {}, file_segments number: {}, ranges: {}", keyToStr(key), range.toString(), result.size(), ranges); return result; } FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t size) { FileSegment::Range range(offset, offset + size - 1); std::lock_guard cache_lock(mutex); /// Get all segments which intersect with the given range. auto file_segments = getImpl(key, range, cache_lock); if (file_segments.empty()) { auto * cell = setImpl(key, offset, size, FileSegment::State::EMPTY, cache_lock); file_segments = {cell->file_segment}; } else { /// There are segments [segment1, ..., segmentN] /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially) /// intersect with given range. /// It can have holes: /// [____________________] -- requested range /// [____] [_] [_________] -- intersecting cache [segment1, ..., segmentN] /// /// For each such hole create a cell with file segment state EMPTY. auto it = file_segments.begin(); auto segment_range = (*it)->range(); size_t current_pos; if (segment_range.left < range.left) { /// [_______ -- requested range /// [_______ /// ^ /// segment1 current_pos = segment_range.right + 1; ++it; } else current_pos = range.left; while (current_pos <= range.right && it != file_segments.end()) { segment_range = (*it)->range(); if (current_pos == segment_range.left) { current_pos = segment_range.right + 1; ++it; continue; } assert(current_pos < segment_range.left); auto hole_size = segment_range.left - current_pos; auto * cell = setImpl(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock); file_segments.insert(it, cell->file_segment); current_pos = segment_range.right + 1; ++it; } if (current_pos <= range.right) { /// ________] -- requested range /// _____] /// ^ /// segmentN auto hole_size = range.right - current_pos + 1; auto * cell = setImpl(key, current_pos, hole_size, FileSegment::State::EMPTY, cache_lock); file_segments.push_back(cell->file_segment); } } assert(!file_segments.empty()); return FileSegmentsHolder(std::move(file_segments)); } LRUFileCache::FileSegmentCell * LRUFileCache::addCell( const Key & key, size_t offset, size_t size, FileSegment::State state, [[maybe_unused]] std::lock_guard & cache_lock) { if (!size) return nullptr; /// Empty files are not cached. if (files[key].contains(offset)) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cache already exists for key: `{}`, offset: {}, size: {}", keyToStr(key), offset, size); auto file_segment = std::make_shared(offset, size, key, this, state); FileSegmentCell cell(std::move(file_segment)); auto & offsets = files[key]; if (offsets.empty()) { auto key_path = path(key); if (!fs::exists(key_path)) fs::create_directories(key_path); } auto [it, inserted] = offsets.insert({offset, std::move(cell)}); if (!inserted) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Failed to insert into cache key: `{}`, offset: {}, size: {}", keyToStr(key), offset, size); return &(it->second); } bool LRUFileCache::set( const Key & key, size_t offset, size_t size, [[maybe_unused]] std::lock_guard & segment_lock, [[maybe_unused]] std::lock_guard & cache_lock) { auto * cell = getCell(key, offset, cache_lock); if (!cell) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cannot set cell, because it was not created for key: {}, offset: {}", keyToStr(key), offset); auto state = cell->file_segment->download_state; return setImpl(key, offset, size, state, cache_lock) != nullptr; } LRUFileCache::FileSegmentCell * LRUFileCache::setImpl( const Key & key, size_t offset, size_t size, FileSegment::State state, [[maybe_unused]] std::lock_guard & cache_lock) { if (!size) return nullptr; LOG_TEST(log, "SetImpl. Key: {}, offset: {}, size: {}", keyToStr(key), offset, size); switch (state) { case FileSegment::State::EMPTY: { /** * A new cell in cache becomes EMPTY at first, does not have any reserved space and * is not present in LRUQueue, but are visible to cache users as a valid cache cell. * EMPTY cells acquire DOWNLOADING state when it's owner successfully calls getOrSetDownlaoder() * and are put into LRUQueue on first (successful) space reservation attempt. */ return addCell(key, offset, size, state, cache_lock); } case FileSegment::State::DOWNLOADED: { if (startup_restore_finished) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Setting DOWNLOADED state cell is allowed only at startup"); if (tryReserve(size, cache_lock)) { auto * cell = addCell(key, offset, size, state, cache_lock); cell->queue_iterator = queue.insert(queue.end(), std::make_pair(key, offset)); return cell; } return nullptr; } case FileSegment::State::DOWNLOADING: { auto * cell = getCell(key, offset, cache_lock); if (!cell) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cannot set cell, because it was not created for key: {}, offset: {}", keyToStr(key), offset); if (tryReserve(size, cache_lock)) { cell->queue_iterator = queue.insert(queue.end(), std::make_pair(key, offset)); return cell; } return nullptr; } default: throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Unexpected state: {}", FileSegment::toString(state)); } } bool LRUFileCache::tryReserve(size_t size, [[maybe_unused]] std::lock_guard & cache_lock) { auto queue_size = queue.size() + 1; auto removed_size = 0; auto is_overflow = [&] { return (current_size + size - removed_size > max_size) || (max_element_size != 0 && queue_size > max_element_size); }; std::vector to_evict; auto key_it = queue.begin(); while (is_overflow() && key_it != queue.end()) { const auto [key, offset] = *key_it++; auto * cell = getCell(key, offset, cache_lock); if (!cell) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cache became inconsistent. Key: {}, offset: {}", keyToStr(key), offset); size_t cell_size = cell->size(); /// It is guaranteed that cell is not removed from cache as long as /// pointer to corresponding file segment is hold by any other thread. if (cell->releasable()) { switch (cell->file_segment->state()) { case FileSegment::State::DOWNLOADED: { /// Cell will actually be removed only if /// we managed to reserve enough space. to_evict.emplace_back(key, offset); break; } case FileSegment::State::DOWNLOADING: { remove(key, offset, cache_lock); break; } default: { remove(key, offset, cache_lock); break; } } removed_size += cell_size; --queue_size; } } if (is_overflow()) return false; for (auto & [key, offset] : to_evict) remove(key, offset, cache_lock); current_size += size - removed_size; if (current_size > (1ull << 63)) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cache became inconsistent. There must be a bug"); return true; } void LRUFileCache::remove( Key key, size_t offset, [[maybe_unused]] std::lock_guard & cache_lock) { LOG_TEST(log, "Remove. Key: {}, offset: {}", keyToStr(key), offset); auto * cell = getCell(key, offset, cache_lock); if (!cell) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "No cache cell for key: {}, offset: {}", keyToStr(key), offset); if (cell->queue_iterator) queue.erase(*cell->queue_iterator); auto & offsets = files[key]; offsets.erase(offset); auto cache_file_path = path(key, offset); if (fs::exists(cache_file_path)) { try { fs::remove(cache_file_path); if (startup_restore_finished && offsets.empty()) { removeFileKey(key); return; } } catch (...) { throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}", keyToStr(key), offset, cache_file_path, getCurrentExceptionMessage(false)); } } } void LRUFileCache::restore() { std::lock_guard cache_lock(mutex); Key key; UInt64 offset; size_t size; std::vector cells; /// cache_base_path / key_prefix / key / offset fs::directory_iterator key_prefix_it{cache_base_path}; for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) { fs::directory_iterator key_it{key_prefix_it->path()}; for (; key_it != fs::directory_iterator(); ++key_it) { key = unhexUInt(key_it->path().filename().string().data()); fs::directory_iterator offset_it{key_it->path()}; for (; offset_it != fs::directory_iterator(); ++offset_it) { bool parsed = tryParse(offset, offset_it->path().filename()); if (!parsed) throw Exception( ErrorCodes::FILE_CACHE_ERROR, "Unexpected file in cache: cannot parse offset. Path: {}", key_it->path().string()); size = offset_it->file_size(); auto * cell = setImpl(key, offset, size, FileSegment::State::DOWNLOADED, cache_lock); if (cell) { cells.push_back(cell); } else { LOG_WARNING(log, "Cache capacity changed (max size: {}, available: {}), cached file `{}` does not fit in cache anymore (size: {})", max_size, available(), key_it->path().string(), size); fs::remove(path(key, offset)); } } } } /// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority. pcg64 generator(randomSeed()); std::shuffle(cells.begin(), cells.end(), generator); for (const auto & cell : cells) queue.splice(queue.end(), queue, *cell->queue_iterator); } void LRUFileCache::remove(const Key & key) { std::lock_guard cache_lock(mutex); auto it = files.find(key); if (it == files.end()) return; auto & offsets = it->second; for (auto & [offset, _] : offsets) remove(key, offset, cache_lock); removeFileKey(key); } void LRUFileCache::removeFileKey(const Key & key) { auto key_path = path(key); files.erase(key); if (fs::exists(key_path)) fs::remove(key_path); } LRUFileCache::Stat LRUFileCache::getStat() { std::lock_guard cache_lock(mutex); Stat stat { .size = queue.size(), .available = available(), .downloaded_size = 0, .downloading_size = 0, }; for (const auto & [key, offset] : queue) { const auto * cell = getCell(key, offset, cache_lock); if (!cell) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cache became inconsistent. Key: {}, offset: {}", keyToStr(key), offset); switch (cell->file_segment->state()) { case FileSegment::State::DOWNLOADED: { ++stat.downloaded_size; break; } case FileSegment::State::DOWNLOADING: { ++stat.downloading_size; break; } default: break; } } return stat; } size_t LRUFileCache::getUseCount(const FileSegment & file_segment, [[maybe_unused]] std::lock_guard & cache_lock) { auto * cell = getCell(file_segment.key(), file_segment.range().left, cache_lock); assert(cell->file_segment.use_count() >= 1); return cell->file_segment.use_count() - 1; /// Do not consider pointer which lies in cache itself. } String FileSegment::getCallerId() { if (!CurrentThread::isInitialized() || CurrentThread::getQueryId().size == 0) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Cannot use cache without query id"); return CurrentThread::getQueryId().toString(); } String FileSegment::getOrSetDownloader() { std::lock_guard segment_lock(mutex); if (downloader_id.empty()) { downloader_id = getCallerId(); download_state = State::DOWNLOADING; } return downloader_id; } bool FileSegment::isDownloader() const { std::lock_guard segment_lock(mutex); return getCallerId() == downloader_id; } void FileSegment::write(const char * from, size_t size) { if (available() < size) throw Exception( ErrorCodes::FILE_CACHE_ERROR, "Not enough space is reserved. Available: {}, expected: {}", available(), size); if (!download_buffer) { assert(!downloaded_size); auto download_path = cache->path(key(), range().left); download_buffer = std::make_unique(download_path); } download_buffer->write(from, size); downloaded_size += size; } void FileSegment::complete() { { std::lock_guard segment_lock(mutex); /// TODO: There is a gap between next thread will call getOrSetDownlaoder and no one will remove the cell during this gap. if (downloader_id != getCallerId()) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "File segment can be completed only by downloader or downloader's FileSegmentsHodler"); downloader_id.clear(); download_buffer.reset(); switch (download_state) { case State::EMPTY: case State::DOWNLOADED: { /// Download not even started or already completed successfully. break; } case State::NO_SPACE: /// Space reservation failed. { assert(!downloaded_size); reserved_size = 0; std::lock_guard cache_lock(cache->mutex); cache->remove(key(), range().left, cache_lock); break; } case State::DOWNLOADING: { if (downloaded_size == range().size()) { std::lock_guard cache_lock(cache->mutex); download_state = State::DOWNLOADED; reserved_size = downloaded_size; } else { /** * If file segment downloader did not finish download completely, check if there is some other * thread holding the same file segment. It can finish download. * Since pointers to file segments are returned to users in FileSegmentsHolder, * which calls complete() on destruction, removal of failed file segments is guaranteed. */ std::lock_guard cache_lock(cache->mutex); size_t users_count = cache->getUseCount(*this, cache_lock); assert(users_count >= 1); if (users_count == 1) { if (downloaded_size > 0) { segment_range.right = segment_range.left + downloaded_size - 1; reserved_size = downloaded_size; download_state = State::DOWNLOADED; } else cache->remove(key(), range().left, cache_lock); } } } } LOG_TEST(&Poco::Logger::get("kssenii"), "Complete on: {} with state: {}", range().toString(), FileSegment::toString(download_state)); } cv.notify_all(); } FileSegment::State FileSegment::wait() { std::unique_lock segment_lock(mutex); switch (download_state) { case State::DOWNLOADING: { LOG_TEST(&Poco::Logger::get("kssenii"), "Waiting on: {}", range().toString()); cv.wait_for(segment_lock, std::chrono::seconds(60)); /// TODO: use value defined by setting break; } case State::DOWNLOADED:[[fallthrough]]; case State::NO_SPACE: { break; } default: { throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Trying to wait for segment with incorrect"); } } LOG_TEST(&Poco::Logger::get("kssenii"), "Waiting on: {} finished with state: {}", range().toString(), FileSegment::toString(download_state)); return download_state; } bool FileSegment::reserve(size_t size) { if (!size) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Zero space reservation is not allowed"); std::lock_guard segment_lock(mutex); if (downloaded_size == range().size()) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Attempt to reserve space for fully downloaded file segment"); if (downloader_id != getCallerId()) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Space can be reserved only by downloader"); assert(reserved_size >= downloaded_size); std::lock_guard cache_lock(cache->mutex); /** * It is possible to have downloaded_size < reserved_size when reserve is called * in case previous downloader did not fully download current file_segment * and the caller is going to continue; */ size_t free_space = reserved_size - downloaded_size; size_t size_to_reserve = size - free_space; bool reserved; if (downloaded_size) reserved = cache->tryReserve(size_to_reserve, cache_lock); else reserved = cache->set(key(), range().left, size_to_reserve, segment_lock, cache_lock); if (reserved) reserved_size += size; else download_state = State::NO_SPACE; return reserved; } String LRUFileCache::dump() { std::lock_guard cache_lock(mutex); WriteBufferFromOwnString result; for (auto it = queue.begin(); it != queue.end(); ++it) { auto [key, offset] = *it; auto * cell = getCell(key, offset, cache_lock); result << (it != queue.begin() ? ", " : "") << cell->file_segment->range().toString(); result << "(state: " << cell->file_segment->state() << ")"; } return result.str(); } String FileSegment::toString(FileSegment::State state) { switch (state) { case FileSegment::State::DOWNLOADED: return "DOWNLOADED"; case FileSegment::State::DOWNLOADING: return "DOWNLOADING"; case FileSegment::State::EMPTY: return "EMPTY"; case FileSegment::State::NO_SPACE: return "NO_SPACE"; } } }