Cleanup a bit

This commit is contained in:
kssenii 2023-04-13 13:27:01 +02:00
parent 09c23287aa
commit ce45105448
8 changed files with 181 additions and 222 deletions

View File

@ -513,8 +513,6 @@ KeyMetadata::iterator FileCache::addFileSegment(
"Failed to insert {}:{}: entry already exists", key, offset);
}
if (state == FileSegment::State::DOWNLOADED)
chassert(file_segment_metadata_it->second->file_segment->getQueueIterator());
return file_segment_metadata_it;
}
catch (...)
@ -565,37 +563,30 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size)
return PriorityIterationResult::REMOVE_AND_CONTINUE;
chassert(file_segment_metadata->file_segment->getQueueIterator());
chassert(entry.offset == file_segment_metadata->file_segment->offset());
auto iteration_result = PriorityIterationResult::CONTINUE;
const bool is_persistent = allow_persistent_files && file_segment_metadata->file_segment->isPersistent();
const bool releasable = file_segment_metadata->releasable() && !is_persistent;
if (releasable)
{
auto current_file_segment = file_segment_metadata->file_segment;
const size_t file_segment_size = entry.size;
removed_size += entry.size;
--queue_size;
if (current_file_segment->state() == FileSegment::State::DOWNLOADED)
auto segment = file_segment_metadata->file_segment;
if (segment->state() == FileSegment::State::DOWNLOADED)
{
const auto & key = current_file_segment->key();
const auto & key = segment->key();
auto it = to_delete.find(key);
if (it == to_delete.end())
it = to_delete.emplace(key, locked_key.getKeyMetadata()).first;
it->second.add(file_segment_metadata);
return PriorityIterationResult::CONTINUE;
}
else
{
/// TODO: we can resize if partially downloaded instead.
iteration_result = PriorityIterationResult::REMOVE_AND_CONTINUE;
locked_key.removeFileSegment(current_file_segment->offset(), current_file_segment->lock());
locked_key.removeFileSegment(segment->offset(), segment->lock());
return PriorityIterationResult::REMOVE_AND_CONTINUE;
}
removed_size += file_segment_size;
--queue_size;
}
return iteration_result;
return PriorityIterationResult::CONTINUE;
};
if (query_priority)
@ -676,12 +667,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size)
if (main_priority->getSize(cache_lock) > (1ull << 63))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug");
const auto & key_metadata = file_segment.getKeyMetadata();
if (!key_metadata->created_base_directory.exchange(true))
{
fs::create_directories(metadata.getPathInLocalCache(file_segment.key()));
}
file_segment.getKeyMetadata()->createBaseDirectory();
return true;
}
@ -994,7 +980,16 @@ void FileCache::assertCacheCorrectness()
{
for (const auto & [offset, file_segment_metadata] : locked_key)
{
locked_key.assertFileSegmentCorrectness(*file_segment_metadata->file_segment);
const auto & file_segment = *file_segment_metadata->file_segment;
if (file_segment.key() != locked_key.getKey())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected {} = {}", file_segment.key(), locked_key.getKey());
}
file_segment.assertCorrectness();
}
});
}

View File

@ -138,11 +138,6 @@ size_t FileSegment::getDownloadedSize(bool sync) const
void FileSegment::setDownloadedSize(size_t delta)
{
auto lock = segment_guard.lock();
setDownloadedSizeUnlocked(delta, lock);
}
void FileSegment::setDownloadedSizeUnlocked(size_t delta, const FileSegmentGuard::Lock &)
{
downloaded_size += delta;
assert(downloaded_size == std::filesystem::file_size(getPathInLocalCache()));
}
@ -196,7 +191,7 @@ String FileSegment::getOrSetDownloader()
return current_downloader;
}
void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] const FileSegmentGuard::Lock & lock)
void FileSegment::resetDownloadingStateUnlocked(const FileSegmentGuard::Lock & lock)
{
assert(isDownloaderUnlocked(lock));
assert(download_state == State::DOWNLOADING);
@ -213,6 +208,8 @@ void FileSegment::resetDownloader()
{
auto lock = segment_guard.lock();
SCOPE_EXIT({ cv.notify_all(); });
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("resetDownloader", lock);
@ -224,7 +221,6 @@ void FileSegment::resetDownloaderUnlocked(const FileSegmentGuard::Lock &)
{
LOG_TEST(log, "Resetting downloader from {}", downloader_id);
downloader_id.clear();
cv.notify_all();
}
void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock & lock) const
@ -292,17 +288,6 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_)
remote_file_reader = remote_file_reader_;
}
void FileSegment::resetRemoteFileReader()
{
auto lock = segment_guard.lock();
assertIsDownloaderUnlocked("resetRemoteFileReader", lock);
if (!remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist");
remote_file_reader.reset();
}
void FileSegment::write(const char * from, size_t size, size_t offset)
{
if (!size)
@ -366,7 +351,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
{
auto lock = segment_guard.lock();
wrapWithCacheInfo(e, "while writing into cache", lock);
e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lock)));
setDownloadFailedUnlocked(lock);
@ -382,7 +367,7 @@ FileSegment::State FileSegment::wait(size_t offset)
{
auto lock = segment_guard.lock();
if (downloader_id.empty())
if (downloader_id.empty() || offset < getCurrentWriteOffset(true))
return download_state;
if (download_state == State::EMPTY)
@ -395,7 +380,7 @@ FileSegment::State FileSegment::wait(size_t offset)
chassert(!getDownloaderUnlocked(lock).empty());
chassert(!isDownloaderUnlocked(lock));
[[maybe_unused]] const bool ok = cv.wait_for(lock, std::chrono::seconds(60), [&, this]()
cv.wait_for(lock, std::chrono::seconds(60), [&, this]()
{
return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true);
});
@ -445,8 +430,6 @@ bool FileSegment::reserve(size_t size_to_reserve)
{
auto lock = segment_guard.lock();
LOG_TRACE(log, "Try reserve for {}", getInfoForLogUnlocked(lock));
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("reserve", lock);
@ -497,7 +480,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
return reserved;
}
void FileSegment::setDownloadedUnlocked([[maybe_unused]] const FileSegmentGuard::Lock & lock)
void FileSegment::setDownloadedUnlocked(const FileSegmentGuard::Lock &)
{
if (download_state == State::DOWNLOADED)
return;
@ -517,7 +500,9 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] const FileSegmentGuard:
void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
{
LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(lock));
LOG_INFO(log, "Setting download as failed: {}", getInfoForLogUnlocked(lock));
SCOPE_EXIT({ cv.notify_all(); });
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, lock);
@ -532,11 +517,9 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
void FileSegment::completePartAndResetDownloader()
{
auto lock = segment_guard.lock();
completePartAndResetDownloaderUnlocked(lock);
}
void FileSegment::completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & lock)
{
SCOPE_EXIT({ cv.notify_all(); });
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("completePartAndResetDownloader", lock);
@ -550,6 +533,8 @@ void FileSegment::setBroken()
{
auto lock = segment_guard.lock();
SCOPE_EXIT({ cv.notify_all(); });
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("setBroken", lock);
@ -566,35 +551,27 @@ void FileSegment::complete()
return;
auto locked_key = lockKeyMetadata(false);
if (locked_key)
if (!locked_key)
{
completeUnlocked(*locked_key);
return;
}
/// If we failed to lock a key, it must be in detached state.
if (isDetached())
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot complete file segment: {}", getInfoForLog());
}
}
void FileSegment::completeUnlocked(LockedKey & locked_key)
{
auto segment_lock = segment_guard.lock();
if (isCompleted(false))
return;
const bool is_downloader = isDownloaderUnlocked(segment_lock);
const bool is_last_holder = locked_key.isLastOwnerOfFileSegment(offset());
const bool is_last_holder = locked_key->isLastOwnerOfFileSegment(offset());
const size_t current_downloaded_size = getDownloadedSize(true);
SCOPE_EXIT({
if (is_downloader)
{
cv.notify_all();
}
});
LOG_TEST(
@ -618,9 +595,9 @@ void FileSegment::completeUnlocked(LockedKey & locked_key)
if (segment_kind == FileSegmentKind::Temporary && is_last_holder)
{
LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock));
detach(segment_lock, locked_key);
detach(segment_lock, *locked_key);
setDownloadState(State::DETACHED, segment_lock);
locked_key.removeFileSegment(offset(), segment_lock);
locked_key->removeFileSegment(offset(), segment_lock);
return;
}
@ -644,12 +621,10 @@ void FileSegment::completeUnlocked(LockedKey & locked_key)
{
if (is_last_holder)
{
setDownloadState(State::DETACHED, segment_lock);
if (current_downloaded_size == 0)
{
LOG_TEST(log, "Remove file segment {} (nothing downloaded)", range().toString());
locked_key.removeFileSegment(offset(), segment_lock);
locked_key->removeFileSegment(offset(), segment_lock);
}
else
{
@ -666,13 +641,13 @@ void FileSegment::completeUnlocked(LockedKey & locked_key)
/// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state,
/// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken
/// (this will be crucial for other file segment holder, not for current one).
locked_key.shrinkFileSegmentToDownloadedSize(offset(), segment_lock);
locked_key->shrinkFileSegmentToDownloadedSize(offset(), segment_lock);
/// We mark current file segment with state DETACHED, even though the data is still in cache
/// (but a separate file segment) because is_last_holder is satisfied, so it does not matter.
}
detachAssumeStateFinalized(segment_lock);
setDetachedState(segment_lock);
}
break;
}
@ -707,11 +682,6 @@ String FileSegment::getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const
return info.str();
}
void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, const FileSegmentGuard::Lock & lock) const
{
e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogUnlocked(lock)));
}
String FileSegment::stateToString(FileSegment::State state)
{
switch (state)
@ -755,15 +725,6 @@ bool FileSegment::assertCorrectness() const
return true;
}
void FileSegment::throwIfDetachedUnlocked(const FileSegmentGuard::Lock & lock) const
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry. File segment info: {}", getInfoForLogUnlocked(lock));
}
void FileSegment::assertNotDetached() const
{
auto lock = segment_guard.lock();
@ -773,7 +734,13 @@ void FileSegment::assertNotDetached() const
void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock) const
{
if (download_state == State::DETACHED)
throwIfDetachedUnlocked(lock);
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry. File segment info: {}", getInfoForLogUnlocked(lock));
}
}
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
@ -785,14 +752,12 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
file_segment->offset(),
file_segment->range().size(),
State::DETACHED,
CreateFileSegmentSettings(file_segment->getKind()));
CreateFileSegmentSettings(file_segment->getKind(), file_segment->is_unbound));
snapshot->hits_count = file_segment->getHitsCount();
snapshot->downloaded_size = file_segment->getDownloadedSize(false);
snapshot->download_state = file_segment->download_state.load();
snapshot->ref_count = file_segment.use_count();
snapshot->is_unbound = file_segment->is_unbound;
return snapshot;
}
@ -822,27 +787,32 @@ bool FileSegment::isCompleted(bool sync) const
return is_completed_state();
}
void FileSegment::setDetachedState(const FileSegmentGuard::Lock & lock)
{
setDownloadState(State::DETACHED, lock);
key_metadata.reset();
cache = nullptr;
}
void FileSegment::detach(const FileSegmentGuard::Lock & lock, const LockedKey &)
{
if (download_state == State::DETACHED)
return;
setDownloadState(State::DETACHED, lock);
resetDownloaderUnlocked(lock);
detachAssumeStateFinalized(lock);
}
void FileSegment::detachAssumeStateFinalized(const FileSegmentGuard::Lock & lock)
{
key_metadata.reset();
LOG_TEST(log, "Detached file segment: {}", getInfoForLogUnlocked(lock));
setDetachedState(lock);
}
void FileSegment::use()
{
if (!cache)
{
chassert(isCompleted(true));
return;
}
auto it = getQueueIterator();
if (it && cache)
if (it)
{
auto cache_lock = cache->lockCache();
it->use(cache_lock);
@ -869,7 +839,7 @@ String FileSegmentsHolder::toString()
if (!ranges.empty())
ranges += ", ";
ranges += file_segment->range().toString();
if (file_segment->is_unbound)
if (file_segment->isUnbound())
ranges += "(unbound)";
}
return ranges;

View File

@ -163,11 +163,10 @@ public:
size_t offset() const { return range().left; }
FileSegmentKind getKind() const { return segment_kind; }
bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; }
bool isUnbound() const { return is_unbound; }
using UniqueId = std::pair<FileCacheKey, size_t>;
UniqueId getUniqueId() const { return std::pair(key(), offset()); }
bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; }
bool isUnbound() const { return is_unbound; }
String getPathInLocalCache() const;
@ -198,6 +197,8 @@ public:
size_t getDownloadedSize(bool sync) const;
size_t getReservedSize() const;
/// Now detached status can be used in the following cases:
/// 1. there is only 1 remaining file segment holder
/// && it does not need this segment anymore
@ -218,12 +219,40 @@ public:
bool isDetached() const;
/// File segment has a completed state, if this state is final and is not going to be changed.
/// Completed states: DOWNALODED, DETACHED.
/// File segment has a completed state, if this state is final and
/// is not going to be changed. Completed states: DOWNALODED, DETACHED.
bool isCompleted(bool sync = false) const;
void use();
/**
* ========== Methods used by `cache` ========================
*/
FileSegmentGuard::Lock lock() const { return segment_guard.lock(); }
CachePriorityIterator getQueueIterator() const;
void setQueueIterator(CachePriorityIterator iterator);
KeyMetadataPtr tryGetKeyMetadata() const;
KeyMetadataPtr getKeyMetadata() const;
bool assertCorrectness() const;
/**
* ========== Methods that must do cv.notify() ==================
*/
void setBroken();
void complete();
void completePartAndResetDownloader();
void resetDownloader();
/**
* ========== Methods for _only_ file segment's `downloader` ==================
*/
@ -240,70 +269,33 @@ public:
/// Write data into reserved space.
void write(const char * from, size_t size, size_t offset);
void setBroken();
void complete();
/// Complete file segment's part which was last written.
void completePartAndResetDownloader();
void resetDownloader();
RemoteFileReaderPtr getRemoteFileReader();
RemoteFileReaderPtr extractRemoteFileReader();
void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_);
void resetRemoteFileReader();
FileSegmentGuard::Lock lock() const { return segment_guard.lock(); }
void setDownloadedSize(size_t delta);
size_t getReservedSize() const;
CachePriorityIterator getQueueIterator() const;
void setQueueIterator(CachePriorityIterator iterator);
KeyMetadataPtr getKeyMetadata() const;
void use();
private:
String getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const;
String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const;
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;
void resetDownloaderUnlocked(const FileSegmentGuard::Lock &);
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &);
void setDownloadState(State state, const FileSegmentGuard::Lock &);
void setDownloadedSizeUnlocked(size_t delta, const FileSegmentGuard::Lock &);
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &);
void setDetachedState(const FileSegmentGuard::Lock &);
String getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const;
void setDownloadedUnlocked(const FileSegmentGuard::Lock &);
void setDownloadFailedUnlocked(const FileSegmentGuard::Lock &);
bool isDetached(const FileSegmentGuard::Lock &) const { return download_state == State::DETACHED; }
void detachAssumeStateFinalized(const FileSegmentGuard::Lock &);
[[noreturn]] void throwIfDetachedUnlocked(const FileSegmentGuard::Lock &) const;
void assertNotDetached() const;
void assertNotDetachedUnlocked(const FileSegmentGuard::Lock &) const;
void assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock &) const;
LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const;
KeyMetadataPtr tryGetKeyMetadata() const;
/// completeWithoutStateUnlocked() is called from destructor of FileSegmentsHolder.
/// Function might check if the caller of the method
/// is the last alive holder of the segment. Therefore, completion and destruction
/// of the file segment pointer must be done under the same cache mutex.
void completeUnlocked(LockedKey & locked_key);
void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock);
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;
void wrapWithCacheInfo(Exception & e, const String & message, const FileSegmentGuard::Lock & segment_lock) const;
Key file_key;
Range segment_range;

View File

@ -37,9 +37,6 @@ public:
const size_t offset;
size_t size;
size_t hits = 0;
/// In fact, it is guaranteed that the lifetime of key metadata is longer
/// than Entry, but it is made as weak_ptr to avoid cycle in shared pointer
/// references (because entry actually lies in key metadata).
const KeyMetadataPtr key_metadata;
};
@ -52,15 +49,17 @@ public:
public:
virtual ~IIterator() = default;
virtual size_t use(const CacheGuard::Lock &) = 0;
virtual std::shared_ptr<IIterator> remove(const CacheGuard::Lock &) = 0;
virtual const Entry & getEntry() const = 0;
virtual Entry & getEntry() = 0;
virtual size_t use(const CacheGuard::Lock &) = 0;
virtual void annul() = 0;
virtual void updateSize(ssize_t size) = 0;
virtual std::shared_ptr<IIterator> remove(const CacheGuard::Lock &) = 0;
};
using Iterator = std::shared_ptr<IIterator>;

View File

@ -95,7 +95,8 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
for (auto it = queue.begin(); it != queue.end();)
{
auto locked_key = it->key_metadata->lock();
if (locked_key->getKeyState() != KeyMetadata::KeyState::ACTIVE)
if (it->size == 0
|| locked_key->getKeyState() != KeyMetadata::KeyState::ACTIVE)
{
it = remove(it);
continue;
@ -127,6 +128,12 @@ LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remov
return std::make_shared<LRUFileCacheIterator>(cache_priority, cache_priority->remove(queue_iter));
}
void LRUFileCachePriority::LRUFileCacheIterator::annul()
{
cache_priority->current_size -= queue_iter->size;
queue_iter->size = 0;
}
void LRUFileCachePriority::LRUFileCacheIterator::updateSize(ssize_t size)
{
cache_priority->current_size += size;
@ -135,6 +142,9 @@ void LRUFileCachePriority::LRUFileCacheIterator::updateSize(ssize_t size)
else
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, size);
queue_iter->size += size;
chassert(cache_priority->current_size >= 0);
chassert(queue_iter->size >= 0);
}
size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CacheGuard::Lock &)

View File

@ -56,6 +56,8 @@ public:
Iterator remove(const CacheGuard::Lock &) override;
void annul() override;
void updateSize(ssize_t size) override;
private:

View File

@ -49,9 +49,11 @@ KeyMetadata::KeyMetadata(
bool created_base_directory_)
: key(key_)
, key_path(key_path_)
, created_base_directory(created_base_directory_)
, cleanup_queue(cleanup_queue_)
, created_base_directory(created_base_directory_)
{
if (created_base_directory)
chassert(fs::exists(key_path));
}
LockedKeyPtr KeyMetadata::lock()
@ -65,14 +67,40 @@ LockedKeyPtr KeyMetadata::lock()
"Cannot lock key {} (state: {})", key, magic_enum::enum_name(key_state));
}
void KeyMetadata::createBaseDirectory()
{
if (!created_base_directory.exchange(true))
{
fs::create_directories(key_path);
}
}
std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment)
{
return fs::path(key_path)
/ CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind());
}
struct CleanupQueue
{
friend struct CacheMetadata;
public:
void add(const FileCacheKey & key);
void remove(const FileCacheKey & key);
size_t getSize() const;
private:
bool tryPop(FileCacheKey & key);
std::unordered_set<FileCacheKey> keys;
mutable std::mutex mutex;
};
CacheMetadata::CacheMetadata(const std::string & path_)
: path(path_)
, cleanup_queue(std::make_unique<CleanupQueue>())
, log(&Poco::Logger::get("CacheMetadata"))
{
}
@ -128,7 +156,7 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
it = emplace(
key, std::make_shared<KeyMetadata>(
key, getPathInLocalCache(key), cleanup_queue, is_initial_load)).first;
key, getPathInLocalCache(key), *cleanup_queue, is_initial_load)).first;
}
key_metadata = it->second;
@ -198,7 +226,7 @@ void CacheMetadata::doCleanup()
/// we perform this delayed removal.
FileCacheKey cleanup_key;
while (cleanup_queue.tryPop(cleanup_key))
while (cleanup_queue->tryPop(cleanup_key))
{
auto it = find(cleanup_key);
if (it == end())
@ -259,15 +287,9 @@ void LockedKey::removeFromCleanupQueue()
key_metadata->key_state = KeyMetadata::KeyState::ACTIVE;
}
bool LockedKey::markAsRemoved()
void LockedKey::markAsRemoved()
{
chassert(key_metadata->key_state != KeyMetadata::KeyState::REMOVED);
if (key_metadata->key_state == KeyMetadata::KeyState::ACTIVE)
return false;
key_metadata->key_state = KeyMetadata::KeyState::REMOVED;
return true;
}
bool LockedKey::isLastOwnerOfFileSegment(size_t offset) const
@ -301,13 +323,13 @@ KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegm
auto file_segment = it->second->file_segment;
if (file_segment->queue_iterator)
file_segment->queue_iterator->updateSize(-file_segment->queue_iterator->getEntry().size);
file_segment->queue_iterator->annul();
const auto path = key_metadata->getFileSegmentPath(*it->second->file_segment);
const auto path = key_metadata->getFileSegmentPath(*file_segment);
if (fs::exists(path))
fs::remove(path);
it->second->file_segment->detach(segment_lock, *this);
file_segment->detach(segment_lock, *this);
return key_metadata->erase(it);
}
@ -326,6 +348,8 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
const size_t downloaded_size = file_segment->getDownloadedSize(false);
const size_t full_size = file_segment->range().size();
chassert(downloaded_size <= file_segment->reserved_size);
if (downloaded_size == full_size)
{
throw Exception(
@ -334,16 +358,14 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
file_segment->getInfoForLogUnlocked(segment_lock));
}
auto queue_iterator = metadata->file_segment->queue_iterator;
chassert(downloaded_size <= file_segment->reserved_size);
chassert(queue_iterator->getEntry().size == file_segment->reserved_size);
CreateFileSegmentSettings create_settings(file_segment->getKind());
auto queue_iterator = file_segment->queue_iterator;
metadata->file_segment = std::make_shared<FileSegment>(
getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED, create_settings,
file_segment->cache, key_metadata, file_segment->queue_iterator);
file_segment->cache, key_metadata, queue_iterator);
chassert(queue_iterator->getEntry().size == file_segment->reserved_size);
ssize_t diff = file_segment->reserved_size - file_segment->downloaded_size;
if (diff)
queue_iterator->updateSize(-diff);
@ -352,18 +374,6 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
chassert(metadata->size() == queue_iterator->getEntry().size);
}
void LockedKey::assertFileSegmentCorrectness(const FileSegment & file_segment) const
{
if (file_segment.key() != getKey())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected {} = {}", file_segment.key(), getKey());
}
file_segment.assertCorrectness();
}
std::shared_ptr<const FileSegmentMetadata> LockedKey::getByOffset(size_t offset) const
{
auto it = key_metadata->find(offset);

View File

@ -12,6 +12,7 @@ using FileSegmentPtr = std::shared_ptr<FileSegment>;
struct LockedKey;
using LockedKeyPtr = std::shared_ptr<LockedKey>;
struct CleanupQueue;
using CleanupQueuePtr = std::shared_ptr<CleanupQueue>;
struct FileSegmentMetadata : private boost::noncopyable
@ -56,37 +57,23 @@ struct KeyMetadata : public std::map<size_t, FileSegmentMetadataPtr>,
const Key key;
const std::string key_path;
std::atomic<bool> created_base_directory = false;
LockedKeyPtr lock();
void createBaseDirectory();
std::string getFileSegmentPath(const FileSegment & file_segment);
private:
KeyState key_state = KeyState::ACTIVE;
KeyGuard guard;
CleanupQueue & cleanup_queue;
std::atomic<bool> created_base_directory = false;
};
using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
struct CleanupQueue
{
friend struct CacheMetadata;
public:
void add(const FileCacheKey & key);
void remove(const FileCacheKey & key);
size_t getSize() const;
private:
bool tryPop(FileCacheKey & key);
std::unordered_set<FileCacheKey> keys;
mutable std::mutex mutex;
};
struct CacheMetadata : public std::unordered_map<FileCacheKey, KeyMetadataPtr>, private boost::noncopyable
{
public:
@ -124,7 +111,7 @@ public:
private:
const std::string path; /// Cache base path
CacheMetadataGuard guard;
CleanupQueue cleanup_queue;
const CleanupQueuePtr cleanup_queue;
Poco::Logger * log;
};
@ -162,26 +149,20 @@ struct LockedKey : private boost::noncopyable
KeyMetadata::KeyState getKeyState() const { return key_metadata->key_state; }
KeyMetadataPtr getKeyMetadata() const { return key_metadata; }
KeyMetadataPtr getKeyMetadata() { return key_metadata; }
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
std::shared_ptr<const KeyMetadata> getKeyMetadata() const { return key_metadata; }
std::shared_ptr<KeyMetadata> getKeyMetadata() { return key_metadata; }
void removeAllReleasable();
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);
bool isLastOwnerOfFileSegment(size_t offset) const;
void assertFileSegmentCorrectness(const FileSegment & file_segment) const;
bool isRemovalCandidate() const;
bool markAsRemovalCandidate(size_t offset);
void removeFromCleanupQueue();
bool markAsRemoved();
void markAsRemoved();
std::string toString() const;