Review fixes

This commit is contained in:
kssenii 2022-05-05 16:11:26 +02:00
parent cac3cb5086
commit 0117fd40a6
6 changed files with 57 additions and 127 deletions

View File

@ -393,7 +393,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
keyToStr(key), offset, size, dumpStructureImpl(key, cache_lock));
keyToStr(key), offset, size, dumpStructureUnlocked(key, cache_lock));
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, state);
FileSegmentCell cell(std::move(file_segment), this, cache_lock);
@ -421,6 +421,10 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
{
std::lock_guard cache_lock(mutex);
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif
auto * cell = getCell(key, offset, cache_lock);
if (cell)
throw Exception(
@ -433,7 +437,7 @@ FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset,
}
bool LRUFileCache::tryReserve(
const Key & key_, size_t offset_, size_t size, std::lock_guard<std::mutex> & cache_lock)
const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
auto removed_size = 0;
size_t queue_size = queue.getElementsNum(cache_lock);
@ -441,7 +445,7 @@ bool LRUFileCache::tryReserve(
/// Since space reservation is incremental, cache cell already exists if it's state is EMPTY.
/// And it cache cell does not exist on startup -- as we first check for space and then add a cell.
auto * cell_for_reserve = getCell(key_, offset_, cache_lock);
auto * cell_for_reserve = getCell(key, offset, cache_lock);
/// A cell acquires a LRUQueue iterator on first successful space reservation attempt.
/// cell_for_reserve can be nullptr here when we call tryReserve() from loadCacheInfoIntoMemory().
@ -458,17 +462,20 @@ bool LRUFileCache::tryReserve(
std::vector<FileSegmentCell *> to_evict;
std::vector<FileSegmentCell *> trash;
auto func = [&](const LRUQueue::Iterator & it)
for (const auto & [entry_key, entry_offset, entry_size] : queue)
{
if (!is_overflow())
return false;
break;
auto * cell = getCell(it->key, it->offset, cache_lock);
auto * cell = getCell(entry_key, entry_offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache became inconsistent. Key: {}, offset: {}", keyToStr(it->key), it->offset);
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache became inconsistent. Key: {}, offset: {}",
keyToStr(key), offset);
size_t cell_size = cell->size();
assert(entry_size == cell_size);
/// It is guaranteed that cell is not removed from cache as long as
/// pointer to corresponding file segment is hold by any other thread.
@ -499,19 +506,18 @@ bool LRUFileCache::tryReserve(
removed_size += cell_size;
--queue_size;
}
}
return true;
};
queue.iterate(func, cache_lock);
/// This case is very unlikely, can happen in case of exception from
/// file_segment->complete(), which would be a logical error.
assert(trash.empty());
for (auto & cell : trash)
{
auto file_segment = cell->file_segment;
if (file_segment)
{
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
}
@ -529,7 +535,7 @@ bool LRUFileCache::tryReserve(
if (queue_iterator)
queue.incrementSize(*queue_iterator, size, cache_lock);
else
cell_for_reserve->queue_iterator = queue.add(key_, offset_, size, cache_lock);
cell_for_reserve->queue_iterator = queue.add(key, offset, size, cache_lock);
}
for (auto & cell : to_evict)
@ -588,6 +594,10 @@ void LRUFileCache::remove(const Key & key)
if (fs::exists(key_path))
fs::remove(key_path);
#ifndef NDEBUG
assertCacheCorrectness(cache_lock);
#endif
}
void LRUFileCache::remove(bool force_remove_unreleasable)
@ -598,9 +608,10 @@ void LRUFileCache::remove(bool force_remove_unreleasable)
std::lock_guard cache_lock(mutex);
std::vector<FileSegment *> to_remove;
auto func = [&](const LRUQueue::Iterator & it)
for (auto it = queue.begin(); it != queue.end();)
{
auto * cell = getCell(it->key, it->offset, cache_lock);
const auto & [key, offset, size] = *it++;
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -610,19 +621,12 @@ void LRUFileCache::remove(bool force_remove_unreleasable)
{
auto file_segment = cell->file_segment;
if (file_segment)
to_remove.push_back(file_segment.get());
{
std::lock_guard segment_lock(file_segment->mutex);
file_segment->detach(cache_lock, segment_lock);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
}
return true;
};
queue.iterate(func, cache_lock);
for (const auto & file_segment : to_remove)
{
std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
file_segment->detach(cache_lock, segment_lock);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
}
@ -714,7 +718,7 @@ void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_l
{
LOG_WARNING(log,
"Cache capacity changed (max size: {}, available: {}), cached file `{}` does not fit in cache anymore (size: {})",
max_size, getAvailableCacheSizeImpl(cache_lock), key_it->path().string(), size);
max_size, getAvailableCacheSizeUnlocked(cache_lock), key_it->path().string(), size);
fs::remove(offset_it->path());
}
}
@ -734,52 +738,9 @@ void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_l
queue.moveToEnd(it, cache_lock);
}
}
LRUFileCache::Stat LRUFileCache::getStat()
{
std::lock_guard cache_lock(mutex);
#ifndef NDEBUG
assertCacheCorrectness(cache_lock);
#endif
Stat stat
{
.size = queue.getElementsNum(cache_lock),
.available = getAvailableCacheSizeImpl(cache_lock),
.downloaded_size = 0,
.downloading_size = 0,
};
auto func = [&](const LRUQueue::Iterator & it)
{
const auto * cell = getCell(it->key, it->offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache became inconsistent. Key: {}, offset: {}", keyToStr(it->key), it->offset);
switch (cell->file_segment->download_state)
{
case FileSegment::State::DOWNLOADED:
{
++stat.downloaded_size;
break;
}
case FileSegment::State::DOWNLOADING:
{
++stat.downloading_size;
break;
}
default:
break;
}
return true;
};
queue.iterate(func, cache_lock);
return stat;
}
void LRUFileCache::reduceSizeToDownloaded(
@ -863,10 +824,10 @@ std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)
size_t LRUFileCache::getUsedCacheSize() const
{
std::lock_guard cache_lock(mutex);
return getUsedCacheSizeImpl(cache_lock);
return getUsedCacheSizeUnlocked(cache_lock);
}
size_t LRUFileCache::getUsedCacheSizeImpl(std::lock_guard<std::mutex> & cache_lock) const
size_t LRUFileCache::getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
{
return queue.getTotalWeight(cache_lock);
}
@ -874,21 +835,21 @@ size_t LRUFileCache::getUsedCacheSizeImpl(std::lock_guard<std::mutex> & cache_lo
size_t LRUFileCache::getAvailableCacheSize() const
{
std::lock_guard cache_lock(mutex);
return getAvailableCacheSizeImpl(cache_lock);
return getAvailableCacheSizeUnlocked(cache_lock);
}
size_t LRUFileCache::getAvailableCacheSizeImpl(std::lock_guard<std::mutex> & cache_lock) const
size_t LRUFileCache::getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const
{
return max_size - getUsedCacheSizeImpl(cache_lock);
return max_size - getUsedCacheSizeUnlocked(cache_lock);
}
size_t LRUFileCache::getFileSegmentsNum() const
{
std::lock_guard cache_lock(mutex);
return getFileSegmentsNumImpl(cache_lock);
return getFileSegmentsNumUnlocked(cache_lock);
}
size_t LRUFileCache::getFileSegmentsNumImpl(std::lock_guard<std::mutex> & cache_lock) const
size_t LRUFileCache::getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const
{
return queue.getElementsNum(cache_lock);
}
@ -959,24 +920,14 @@ void LRUFileCache::LRUQueue::incrementSize(Iterator queue_it, size_t size_increm
queue_it->size += size_increment;
}
void LRUFileCache::LRUQueue::iterate(IterateFunc func, std::lock_guard<std::mutex> & /* cache_lock */)
{
for (auto it = queue.begin(); it != queue.end(); ++it)
{
bool should_continue = func(it);
if (!should_continue)
break;
}
}
bool LRUFileCache::LRUQueue::contains(
const IFileCache::Key & key, size_t offset, std::lock_guard<std::mutex> & /* cache_lock */) const
{
/// This method is used for assertions in debug mode.
/// So we do not care about complexity here.
for (const auto [key_, offset_, size] : queue)
for (const auto [queue_key, queue_offset, size] : queue)
{
if (key == key_ && offset == offset_)
if (key == queue_key && offset == queue_offset)
return true;
}
return false;
@ -1021,10 +972,10 @@ String LRUFileCache::LRUQueue::toString(std::lock_guard<std::mutex> & /* cache_l
String LRUFileCache::dumpStructure(const Key & key)
{
std::lock_guard cache_lock(mutex);
return dumpStructureImpl(key, cache_lock);
return dumpStructureUnlocked(key, cache_lock);
}
String LRUFileCache::dumpStructureImpl(const Key & key, std::lock_guard<std::mutex> & cache_lock)
String LRUFileCache::dumpStructureUnlocked(const Key & key, std::lock_guard<std::mutex> & cache_lock)
{
WriteBufferFromOwnString result;
const auto & cells_by_offset = files[key];

View File

@ -165,13 +165,12 @@ private:
{
Key key;
size_t offset;
size_t size = 0;
size_t size;
FileKeyAndOffset(const Key & key_, size_t offset_, size_t size_) : key(key_), offset(offset_), size(size_) {}
};
using Impl = std::list<FileKeyAndOffset>;
using Iterator = typename Impl::iterator;
using Iterator = typename std::list<FileKeyAndOffset>::iterator;
size_t getTotalWeight(std::lock_guard<std::mutex> & /* cache_lock */) const { return cache_size; }
@ -183,20 +182,21 @@ private:
void moveToEnd(Iterator queue_it, std::lock_guard<std::mutex> & cache_lock);
/// Space reservation for a file segment is incremental, so we need to be able to increment size of the queue entry.
void incrementSize(Iterator queue_it, size_t size_increment, std::lock_guard<std::mutex> & cache_lock);
/// If func returns true - ietration should continue, othersize stop iterating.
using IterateFunc = std::function<bool(const Iterator &)>;
void iterate(IterateFunc func, std::lock_guard<std::mutex> & cache_lock);
void assertCorrectness(LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
String toString(std::lock_guard<std::mutex> & cache_lock) const;
bool contains(const Key & key, size_t offset, std::lock_guard<std::mutex> & cache_lock) const;
Iterator begin() { return queue.begin(); }
Iterator end() { return queue.end(); }
private:
Impl queue;
std::list<FileKeyAndOffset> queue;
size_t cache_size = 0;
};
@ -216,7 +216,7 @@ private:
FileSegmentCell(FileSegmentPtr file_segment_, LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell(FileSegmentCell && other) noexcept
FileSegmentCell(FileSegmentCell && other)
: file_segment(std::move(other.file_segment))
, queue_iterator(std::move(other.queue_iterator)) {}
};
@ -267,30 +267,20 @@ private:
FileSegments splitRangeIntoCells(
const Key & key, size_t offset, size_t size, FileSegment::State state, std::lock_guard<std::mutex> & cache_lock);
String dumpStructureImpl(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
String dumpStructureUnlocked(const Key & key_, std::lock_guard<std::mutex> & cache_lock);
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock);
size_t getUsedCacheSizeImpl(std::lock_guard<std::mutex> & cache_lock) const;
size_t getUsedCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
size_t getAvailableCacheSizeImpl(std::lock_guard<std::mutex> & cache_lock) const;
size_t getAvailableCacheSizeUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
size_t getFileSegmentsNumImpl(std::lock_guard<std::mutex> & cache_lock) const;
size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
public:
struct Stat
{
size_t size;
size_t available;
size_t downloaded_size;
size_t downloading_size;
};
Stat getStat();
String dumpStructure(const Key & key_) override;
void assertCacheCorrectness(const Key & key, std::lock_guard<std::mutex> & cache_lock);

View File

@ -89,11 +89,6 @@ size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_l
}
String FileSegment::getCallerId()
{
return getCallerIdImpl();
}
String FileSegment::getCallerIdImpl()
{
if (!CurrentThread::isInitialized()
|| !CurrentThread::get().getQueryContext()

View File

@ -177,8 +177,6 @@ private:
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
static String getCallerIdImpl();
void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock);
const Range segment_range;

View File

@ -257,7 +257,6 @@ TEST(LRUFileCache, get)
/// 10 17 21 24 26
ASSERT_EQ(cache.getFileSegmentsNum(), 5);
ASSERT_EQ(cache.getStat().size, 5);
{
auto holder = cache.getOrSet(key, 23, 5); /// Get [23, 28]
@ -487,8 +486,6 @@ TEST(LRUFileCache, get)
auto cache2 = DB::LRUFileCache(cache_base_path, settings);
cache2.initialize();
ASSERT_EQ(cache2.getStat().downloaded_size, 5);
auto holder1 = cache2.getOrSet(key, 2, 28); /// Get [2, 29]
auto segments1 = fromHolder(holder1);
ASSERT_EQ(segments1.size(), 5);

View File

@ -31,7 +31,6 @@ namespace DB
// because custom S3 implementation may allow relaxed requirements on that.
const int S3_WARN_MAX_PARTS = 10000;
namespace ErrorCodes
{
extern const int S3_ERROR;