Make querying system.filesystem_cache not memory intensive

This commit is contained in:
kssenii 2023-12-08 17:09:09 +01:00
parent 7b0f8d44e8
commit 02d19fa7ef
8 changed files with 128 additions and 25 deletions

View File

@ -956,6 +956,11 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
return true;
}
CacheMetadata::Iterator FileCache::iterate()
{
return metadata.iterate();
}
void FileCache::removeKey(const Key & key)
{
assertInitialized();
@ -1280,7 +1285,7 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos()
metadata.iterate([&](const LockedKey & locked_key)
{
for (const auto & [_, file_segment_metadata] : locked_key)
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment, *this));
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment));
});
return file_segments;
}
@ -1290,7 +1295,7 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos(const Key & key)
std::vector<FileSegment::Info> file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL);
for (const auto & [_, file_segment_metadata] : *locked_key)
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment, *this));
file_segments.push_back(FileSegment::getInfo(file_segment_metadata->file_segment));
return file_segments;
}
@ -1301,7 +1306,7 @@ std::vector<FileSegment::Info> FileCache::dumpQueue()
std::vector<FileSegment::Info> file_segments;
main_priority->iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)
{
file_segments.push_back(FileSegment::getInfo(segment_metadata->file_segment, *this));
file_segments.push_back(FileSegment::getInfo(segment_metadata->file_segment));
return PriorityIterationResult::CONTINUE;
}, lockCache());
@ -1342,9 +1347,7 @@ void FileCache::assertCacheCorrectness()
{
for (const auto & [_, file_segment_metadata] : locked_key)
{
const auto & file_segment = *file_segment_metadata->file_segment;
UNUSED(file_segment);
chassert(file_segment.assertCorrectness());
chassert(file_segment_metadata->getFileSegment().assertCorrectness());
}
});
}
@ -1386,7 +1389,7 @@ std::vector<FileSegment::Info> FileCache::sync()
std::vector<FileSegment::Info> file_segments;
metadata.iterate([&](LockedKey & locked_key)
{
auto broken = locked_key.sync(*this);
auto broken = locked_key.sync();
file_segments.insert(file_segments.end(), broken.begin(), broken.end());
});
return file_segments;

View File

@ -154,6 +154,8 @@ public:
std::vector<FileSegment::Info> sync();
CacheMetadata::Iterator iterate();
private:
using KeyAndOffset = FileCacheKeyAndOffset;

View File

@ -14,7 +14,7 @@ class FileSegment;
using FileSegmentPtr = std::shared_ptr<FileSegment>;
using FileSegments = std::list<FileSegmentPtr>;
struct FileSegmentMetadata;
class FileSegmentMetadata;
using FileSegmentMetadataPtr = std::shared_ptr<FileSegmentMetadata>;
struct LockedKey;

View File

@ -120,6 +120,14 @@ String FileSegment::getPathInLocalCache() const
return getKeyMetadata()->getFileSegmentPath(*this);
}
String FileSegment::tryGetPathInLocalCache() const
{
auto metadata = tryGetKeyMetadata();
if (!metadata)
return "";
return metadata->getFileSegmentPath(*this);
}
FileSegmentGuard::Lock FileSegment::lockFileSegment() const
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentLockMicroseconds);
@ -833,13 +841,13 @@ void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock)
}
}
FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment, FileCache & cache)
FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment)
{
auto lock = file_segment->lockFileSegment();
return Info{
.key = file_segment->key(),
.offset = file_segment->offset(),
.path = cache.getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->segment_kind),
.path = file_segment->tryGetPathInLocalCache(),
.range_left = file_segment->range().left,
.range_right = file_segment->range().right,
.kind = file_segment->segment_kind,

View File

@ -220,7 +220,7 @@ public:
uint64_t references;
bool is_unbound;
};
static Info getInfo(const FileSegmentPtr & file_segment, FileCache & cache);
static Info getInfo(const FileSegmentPtr & file_segment);
bool isDetached() const;
@ -306,6 +306,8 @@ private:
LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const;
FileSegmentGuard::Lock lockFileSegment() const;
String tryGetPathInLocalCache() const;
Key file_key;
Range segment_range;
const FileSegmentKind segment_kind;

View File

@ -143,7 +143,7 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
ErrorCodes::LOGICAL_ERROR,
"Mismatch of file segment size in file segment metadata "
"and priority queue: {} != {} ({})",
it->size, metadata->size(), metadata->file_segment->getInfoForLog());
it->size, metadata->size(), metadata->getFileSegment().getInfoForLog());
}
auto result = func(*locked_key, metadata);

View File

@ -563,7 +563,7 @@ void CacheMetadata::downloadThreadFunc()
auto file_segment = file_segment_weak.lock();
if (!file_segment
|| file_segment != file_segment_metadata->file_segment
|| file_segment.get() != &file_segment_metadata->getFileSegment()
|| file_segment->state() != FileSegment::State::PARTIALLY_DOWNLOADED)
continue;
@ -675,6 +675,64 @@ void CacheMetadata::cancelDownload()
download_queue->cancel();
}
CacheMetadata::Iterator CacheMetadata::iterate()
{
return Iterator(this);
}
FileSegmentMetadataPtr CacheMetadata::Iterator::next()
{
if (current_bucket == buckets_num)
{
reset();
return nullptr;
}
auto & bucket = metadata->metadata_buckets[current_bucket];
if (!bucket_lock)
{
bucket_lock = std::make_unique<CacheMetadataGuard::Lock>(bucket.lock());
bucket_iterator = bucket.begin();
}
if (bucket_iterator == bucket.end())
{
bucket_iterator = {};
bucket_lock.reset();
++current_bucket;
return next();
}
if (!locked_key)
{
locked_key = bucket_iterator->second->lock();
key_iterator = locked_key->begin();
}
if (key_iterator == locked_key->end())
{
key_iterator = {};
locked_key.reset();
++bucket_iterator;
return next();
}
return (key_iterator++)->second;
}
void CacheMetadata::Iterator::reset()
{
key_iterator = {};
bucket_iterator = {};
locked_key.reset();
bucket_lock.reset();
current_bucket = buckets_num;
}
LockedKey::LockedKey(std::shared_ptr<KeyMetadata> key_metadata_)
: key_metadata(key_metadata_)
, lock(key_metadata->guard.lock())
@ -928,7 +986,7 @@ std::string LockedKey::toString() const
}
std::vector<FileSegment::Info> LockedKey::sync(FileCache & cache)
std::vector<FileSegment::Info> LockedKey::sync()
{
std::vector<FileSegment::Info> broken;
for (auto it = key_metadata->begin(); it != key_metadata->end();)
@ -961,7 +1019,7 @@ std::vector<FileSegment::Info> LockedKey::sync(FileCache & cache)
"File segment has DOWNLOADED state, but file does not exist ({})",
file_segment->getInfoForLog());
broken.push_back(FileSegment::getInfo(file_segment, cache));
broken.push_back(FileSegment::getInfo(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */true);
continue;
}
@ -980,7 +1038,7 @@ std::vector<FileSegment::Info> LockedKey::sync(FileCache & cache)
"File segment has unexpected size. Having {}, expected {} ({})",
actual_size, expected_size, file_segment->getInfoForLog());
broken.push_back(FileSegment::getInfo(file_segment, cache));
broken.push_back(FileSegment::getInfo(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */false);
}
return broken;

View File

@ -17,8 +17,11 @@ using DownloadQueuePtr = std::shared_ptr<DownloadQueue>;
using FileSegmentsHolderPtr = std::unique_ptr<FileSegmentsHolder>;
struct FileSegmentMetadata : private boost::noncopyable
class FileSegmentMetadata : private boost::noncopyable
{
friend class FileCache;
friend struct LockedKey;
public:
using Priority = IFileCachePriority;
explicit FileSegmentMetadata(FileSegmentPtr && file_segment_);
@ -31,6 +34,11 @@ struct FileSegmentMetadata : private boost::noncopyable
Priority::Iterator getQueueIterator() const { return file_segment->getQueueIterator(); }
const FileSegment & getFileSegment() const { return *file_segment; }
FileSegment::Info getFileSegmentInfo() const { return FileSegment::getInfo(file_segment); }
private:
FileSegmentPtr file_segment;
std::atomic<bool> removal_candidate{false};
};
@ -98,6 +106,14 @@ using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
struct CacheMetadata
{
private:
struct MetadataBucket : public std::unordered_map<FileCacheKey, KeyMetadataPtr>
{
CacheMetadataGuard::Lock lock() const;
private:
mutable CacheMetadataGuard guard;
};
public:
using Key = FileCacheKey;
using IterateFunc = std::function<void(LockedKey &)>;
@ -154,6 +170,27 @@ public:
void cancelDownload();
class Iterator
{
public:
explicit Iterator(CacheMetadata * metadata_) : metadata(metadata_) {}
FileSegmentMetadataPtr next();
void reset();
private:
CacheMetadata * metadata;
size_t current_bucket = 0;
std::unique_ptr<CacheMetadataGuard::Lock> bucket_lock{nullptr};
CacheMetadata::MetadataBucket::iterator bucket_iterator{};
LockedKeyPtr locked_key{nullptr};
KeyMetadata::iterator key_iterator{};
};
Iterator iterate();
private:
const std::string path; /// Cache base path
const CleanupQueuePtr cleanup_queue;
@ -162,13 +199,6 @@ private:
std::shared_mutex key_prefix_directory_mutex;
Poco::Logger * log;
struct MetadataBucket : public std::unordered_map<FileCacheKey, KeyMetadataPtr>
{
CacheMetadataGuard::Lock lock() const;
private:
mutable CacheMetadataGuard guard;
};
static constexpr size_t buckets_num = 1024;
std::vector<MetadataBucket> metadata_buckets{buckets_num};
@ -243,7 +273,7 @@ struct LockedKey : private boost::noncopyable
void markAsRemoved();
std::vector<FileSegment::Info> sync(FileCache & cache);
std::vector<FileSegment::Info> sync();
std::string toString() const;