This commit is contained in:
kssenii 2023-12-14 15:55:15 +01:00
parent cc71b40002
commit ea7e55b929
7 changed files with 25 additions and 111 deletions

View File

@ -839,9 +839,13 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
return true;
}
CacheMetadata::Iterator FileCache::iterate()
void FileCache::iterate(IterateFunc && func)
{
return metadata.iterate();
return metadata.iterate([&](const LockedKey & locked_key)
{
for (const auto & file_segment_metadata : locked_key)
func(FileSegment::getInfo(file_segment_metadata.second->file_segment));
});
}
void FileCache::removeKey(const Key & key)
@ -1181,7 +1185,7 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos(const Key & key)
std::vector<FileSegment::Info> FileCache::dumpQueue()
{
assertInitialized();
return main_priority->dump(*this);
return main_priority->dump(lockCache());
}
std::vector<String> FileCache::tryGetCachePaths(const Key & key)
@ -1218,7 +1222,7 @@ void FileCache::assertCacheCorrectness()
{
for (const auto & [_, file_segment_metadata] : locked_key)
{
chassert(file_segment_metadata->getFileSegment().assertCorrectness());
chassert(file_segment_metadata->file_segment->assertCorrectness());
}
});
}

View File

@ -150,7 +150,8 @@ public:
std::vector<FileSegment::Info> sync();
CacheMetadata::Iterator iterate();
using IterateFunc = std::function<void(const FileSegmentInfo &)>;
void iterate(IterateFunc && func);
private:
using KeyAndOffset = FileCacheKeyAndOffset;

View File

@ -14,7 +14,7 @@ class FileSegment;
using FileSegmentPtr = std::shared_ptr<FileSegment>;
using FileSegments = std::list<FileSegmentPtr>;
class FileSegmentMetadata;
struct FileSegmentMetadata;
using FileSegmentMetadataPtr = std::shared_ptr<FileSegmentMetadata>;
struct LockedKey;

View File

@ -160,7 +160,7 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
ErrorCodes::LOGICAL_ERROR,
"Mismatch of file segment size in file segment metadata "
"and priority queue: {} != {} ({})",
it->size, metadata->size(), metadata->getFileSegment().getInfoForLog());
it->size, metadata->size(), metadata->file_segment->getInfoForLog());
}
auto result = func(*locked_key, metadata);
@ -277,7 +277,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
return LRUIterator(this, it.iterator);
}
std::vector<FileSegmentInfo> LRUFileCachePriority::dump(FileCache & cache)
std::vector<FileSegmentInfo> LRUFileCachePriority::dump(const CacheGuard::Lock & lock)
{
std::vector<FileSegmentInfo> res;
iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)

View File

@ -563,7 +563,7 @@ void CacheMetadata::downloadThreadFunc()
auto file_segment = file_segment_weak.lock();
if (!file_segment
|| file_segment.get() != &file_segment_metadata->getFileSegment()
|| file_segment != file_segment_metadata->file_segment
|| file_segment->state() != FileSegment::State::PARTIALLY_DOWNLOADED)
continue;
@ -675,64 +675,6 @@ void CacheMetadata::cancelDownload()
download_queue->cancel();
}
CacheMetadata::Iterator CacheMetadata::iterate()
{
return Iterator(this);
}
FileSegmentMetadataPtr CacheMetadata::Iterator::next()
{
if (current_bucket == buckets_num)
{
reset();
return nullptr;
}
auto & bucket = metadata->metadata_buckets[current_bucket];
if (!bucket_lock)
{
bucket_lock = std::make_unique<CacheMetadataGuard::Lock>(bucket.lock());
bucket_iterator = bucket.begin();
}
if (bucket_iterator == bucket.end())
{
bucket_iterator = {};
bucket_lock.reset();
++current_bucket;
return next();
}
if (!locked_key)
{
locked_key = bucket_iterator->second->lock();
key_iterator = locked_key->begin();
}
if (key_iterator == locked_key->end())
{
key_iterator = {};
locked_key.reset();
++bucket_iterator;
return next();
}
return (key_iterator++)->second;
}
void CacheMetadata::Iterator::reset()
{
key_iterator = {};
bucket_iterator = {};
locked_key.reset();
bucket_lock.reset();
current_bucket = buckets_num;
}
LockedKey::LockedKey(std::shared_ptr<KeyMetadata> key_metadata_)
: key_metadata(key_metadata_)
, lock(key_metadata->guard.lock())

View File

@ -17,11 +17,8 @@ using DownloadQueuePtr = std::shared_ptr<DownloadQueue>;
using FileSegmentsHolderPtr = std::unique_ptr<FileSegmentsHolder>;
class FileSegmentMetadata : private boost::noncopyable
struct FileSegmentMetadata : private boost::noncopyable
{
friend class FileCache;
friend struct LockedKey;
public:
using Priority = IFileCachePriority;
explicit FileSegmentMetadata(FileSegmentPtr && file_segment_);
@ -34,11 +31,6 @@ public:
Priority::IteratorPtr getQueueIterator() const { return file_segment->getQueueIterator(); }
const FileSegment & getFileSegment() const { return *file_segment; }
FileSegment::Info getFileSegmentInfo() const { return FileSegment::getInfo(file_segment); }
private:
FileSegmentPtr file_segment;
std::atomic<bool> removal_candidate{false};
};
@ -106,14 +98,6 @@ using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
struct CacheMetadata
{
private:
struct MetadataBucket : public std::unordered_map<FileCacheKey, KeyMetadataPtr>
{
CacheMetadataGuard::Lock lock() const;
private:
mutable CacheMetadataGuard guard;
};
public:
using Key = FileCacheKey;
using IterateFunc = std::function<void(LockedKey &)>;
@ -131,6 +115,7 @@ public:
static String getFileNameForFileSegment(size_t offset, FileSegmentKind segment_kind);
void iterate(IterateFunc && func);
bool isEmpty() const;
enum class KeyNotFoundPolicy
@ -170,27 +155,6 @@ public:
void cancelDownload();
class Iterator
{
public:
explicit Iterator(CacheMetadata * metadata_) : metadata(metadata_) {}
FileSegmentMetadataPtr next();
void reset();
private:
CacheMetadata * metadata;
size_t current_bucket = 0;
std::unique_ptr<CacheMetadataGuard::Lock> bucket_lock{nullptr};
CacheMetadata::MetadataBucket::iterator bucket_iterator{};
LockedKeyPtr locked_key{nullptr};
KeyMetadata::iterator key_iterator{};
};
Iterator iterate();
private:
const std::string path; /// Cache base path
const CleanupQueuePtr cleanup_queue;
@ -199,6 +163,13 @@ private:
std::shared_mutex key_prefix_directory_mutex;
Poco::Logger * log;
struct MetadataBucket : public std::unordered_map<FileCacheKey, KeyMetadataPtr>
{
CacheMetadataGuard::Lock lock() const;
private:
mutable CacheMetadataGuard guard;
};
static constexpr size_t buckets_num = 1024;
std::vector<MetadataBucket> metadata_buckets{buckets_num};

View File

@ -44,11 +44,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
for (const auto & [cache_name, cache_data] : caches)
{
const auto & cache = cache_data->cache;
auto iterator = cache->iterate();
while (auto metadata = iterator.next())
{
const auto & file_segment = metadata->getFileSegmentInfo();
cache->iterate([&](const FileSegment::Info & file_segment){
size_t i = 0;
res_columns[i++]->insert(cache_name);
res_columns[i++]->insert(cache->getBasePath());
@ -75,7 +71,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
res_columns[i++]->insert(size);
else
res_columns[i++]->insertDefault();
}
});
}
}