Fix data race in slru

This commit is contained in:
kssenii 2024-01-15 21:19:11 +01:00
parent be371409ab
commit e2c5e5fa70
7 changed files with 54 additions and 51 deletions

View File

@ -817,7 +817,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
}
file_segment.reserved_size += size;
chassert(file_segment.reserved_size == queue_iterator->getEntry().size);
chassert(file_segment.reserved_size == queue_iterator->getEntry()->size);
if (query_context)
{

View File

@ -789,9 +789,9 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) cons
const auto & entry = it->getEntry();
UNUSED(entry);
chassert(entry.size == reserved_size);
chassert(entry.key == key());
chassert(entry.offset == offset());
chassert(entry->size == reserved_size);
chassert(entry->key == key());
chassert(entry->offset == offset());
};
if (download_state == State::DOWNLOADED)

View File

@ -31,13 +31,14 @@ public:
std::atomic<size_t> size;
size_t hits = 0;
};
using EntryPtr = std::shared_ptr<Entry>;
class Iterator
{
public:
virtual ~Iterator() = default;
virtual const Entry & getEntry() const = 0;
virtual EntryPtr getEntry() const = 0;
virtual size_t increasePriority(const CacheGuard::Lock &) = 0;

View File

@ -36,49 +36,49 @@ IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
const CacheGuard::Lock & lock,
bool /* is_startup */)
{
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
return std::make_shared<LRUIterator>(add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock));
}
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock & lock)
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(EntryPtr entry, const CacheGuard::Lock & lock)
{
if (entry.size == 0)
if (entry->size == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Adding zero size entries to LRU queue is not allowed "
"(key: {}, offset: {})", entry.key, entry.offset);
"(key: {}, offset: {})", entry->key, entry->offset);
}
#ifndef NDEBUG
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
if (queue_entry->size != 0 && queue_entry->key == entry->key && queue_entry->offset == entry->offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
"(Key: {}, offset: {}, size: {})",
entry.key, entry.offset, entry.size);
entry->key, entry->offset, entry->size);
}
#endif
const auto & size_limit = getSizeLimit(lock);
if (size_limit && current_size + entry.size > size_limit)
if (size_limit && current_size + entry->size > size_limit)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Not enough space to add {}:{} with size {}: current size: {}/{}",
entry.key, entry.offset, entry.size, current_size, size_limit);
entry->key, entry->offset, entry->size, current_size, size_limit);
}
auto iterator = queue.insert(queue.end(), entry);
updateSize(entry.size);
updateSize(entry->size);
updateElementsCount(1);
LOG_TEST(
log, "Added entry into LRU queue, key: {}, offset: {}, size: {}",
entry.key, entry.offset, entry.size);
entry->key, entry->offset, entry->size);
return LRUIterator(this, iterator);
}
@ -86,15 +86,16 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, cons
LRUFileCachePriority::LRUQueue::iterator LRUFileCachePriority::remove(LRUQueue::iterator it, const CacheGuard::Lock &)
{
/// If size is 0, entry is invalidated, current_elements_num was already updated.
if (it->size)
const auto & entry = **it;
if (entry.size)
{
updateSize(-it->size);
updateSize(-entry.size);
updateElementsCount(-1);
}
LOG_TEST(
log, "Removed entry from LRU queue, key: {}, offset: {}, size: {}",
it->key, it->offset, it->size);
entry.key, entry.offset, entry.size);
return queue.erase(it);
}
@ -143,27 +144,28 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
{
for (auto it = queue.begin(); it != queue.end();)
{
auto locked_key = it->key_metadata->tryLock();
if (!locked_key || it->size == 0)
const auto & entry = **it;
auto locked_key = entry.key_metadata->tryLock();
if (!locked_key || entry.size == 0)
{
it = remove(it, lock);
continue;
}
auto metadata = locked_key->tryGetByOffset(it->offset);
auto metadata = locked_key->tryGetByOffset(entry.offset);
if (!metadata)
{
it = remove(it, lock);
continue;
}
if (metadata->size() != it->size)
if (metadata->size() != entry.size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatch of file segment size in file segment metadata "
"and priority queue: {} != {} ({})",
it->size, metadata->size(), metadata->file_segment->getInfoForLog());
entry.size, metadata->size(), metadata->file_segment->getInfoForLog());
}
auto result = func(*locked_key, metadata);
@ -249,7 +251,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &)
{
const auto & entry = it.getEntry();
const auto & entry = *it.getEntry();
if (entry.size == 0)
{
throw Exception(
@ -261,7 +263,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
if (queue_entry->size != 0 && queue_entry->key == entry.key && queue_entry->offset == entry.offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
@ -347,34 +349,36 @@ void LRUFileCachePriority::LRUIterator::invalidate()
{
assertValid();
const auto & entry = *iterator;
LOG_TEST(
cache_priority->log,
"Invalidating entry in LRU queue. Key: {}, offset: {}, previous size: {}",
iterator->key, iterator->offset, iterator->size);
entry->key, entry->offset, entry->size);
cache_priority->updateSize(-iterator->size);
cache_priority->updateSize(-entry->size);
cache_priority->updateElementsCount(-1);
iterator->size = 0;
entry->size = 0;
}
void LRUFileCachePriority::LRUIterator::updateSize(int64_t size)
{
assertValid();
const auto & entry = *iterator;
LOG_TEST(
cache_priority->log,
"Update size with {} in LRU queue for key: {}, offset: {}, previous size: {}",
size, iterator->key, iterator->offset, iterator->size);
size, entry->key, entry->offset, entry->size);
cache_priority->updateSize(size);
iterator->size += size;
entry->size += size;
}
size_t LRUFileCachePriority::LRUIterator::increasePriority(const CacheGuard::Lock &)
{
assertValid();
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, iterator);
return ++iterator->hits;
return ++((*iterator)->hits);
}
void LRUFileCachePriority::LRUIterator::assertValid() const

View File

@ -15,7 +15,7 @@ class LRUFileCachePriority final : public IFileCachePriority
{
private:
class LRUIterator;
using LRUQueue = std::list<Entry>;
using LRUQueue = std::list<EntryPtr>;
friend class SLRUFileCachePriority;
public:
@ -76,7 +76,7 @@ private:
void iterate(IterateFunc && func, const CacheGuard::Lock &);
LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &);
LRUIterator add(Entry && entry, const CacheGuard::Lock &);
LRUIterator add(EntryPtr entry, const CacheGuard::Lock &);
};
class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator
@ -91,7 +91,7 @@ public:
LRUIterator & operator =(const LRUIterator & other);
bool operator ==(const LRUIterator & other) const;
const Entry & getEntry() const override { return *iterator; }
EntryPtr getEntry() const override { return *iterator; }
size_t increasePriority(const CacheGuard::Lock &) override;

View File

@ -61,18 +61,18 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT
/// because we do not know the distribution between queues after server restart.
if (probationary_queue.canFit(size, lock))
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
else
{
auto lru_iterator = protected_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = protected_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), true);
}
}
else
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
}
@ -151,7 +151,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
/// Entry is in probationary queue.
/// We need to move it to protected queue.
const size_t size = iterator.getEntry().size;
const size_t size = iterator.getEntry()->size;
if (size > protected_queue.getSizeLimit(lock))
{
/// Entry size is bigger than the whole protected queue limit.
@ -205,7 +205,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
/// All checks passed, now we can move downgrade candidates to
/// probationary queue and our entry to protected queue.
Entry entry_copy = iterator.getEntry();
EntryPtr entry = iterator.getEntry();
iterator.lru_iterator.remove(lock);
for (const auto & [key, key_candidates] : downgrade_candidates)
@ -218,7 +218,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
}
}
iterator.lru_iterator = protected_queue.add(std::move(entry_copy), lock);
iterator.lru_iterator = protected_queue.add(entry, lock);
iterator.is_protected = true;
}
@ -257,21 +257,22 @@ SLRUFileCachePriority::SLRUIterator::SLRUIterator(
bool is_protected_)
: cache_priority(cache_priority_)
, lru_iterator(lru_iterator_)
, entry(lru_iterator.getEntry())
, is_protected(is_protected_)
{
}
const SLRUFileCachePriority::Entry & SLRUFileCachePriority::SLRUIterator::getEntry() const
SLRUFileCachePriority::EntryPtr SLRUFileCachePriority::SLRUIterator::getEntry() const
{
assertValid();
return lru_iterator.getEntry();
chassert(entry == lru_iterator.getEntry());
return entry;
}
size_t SLRUFileCachePriority::SLRUIterator::increasePriority(const CacheGuard::Lock & lock)
{
assertValid();
cache_priority->increasePriority(*this, lock);
return getEntry().hits;
return getEntry()->hits;
}
void SLRUFileCachePriority::SLRUIterator::updateSize(int64_t size)

View File

@ -11,10 +11,6 @@ namespace DB
/// the head of the queue, and the record with the highest priority is stored at the tail.
class SLRUFileCachePriority : public IFileCachePriority
{
private:
using LRUIterator = LRUFileCachePriority::LRUIterator;
using LRUQueue = std::list<Entry>;
public:
class SLRUIterator;
@ -62,10 +58,10 @@ class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator
public:
SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUIterator && lru_iterator_,
LRUFileCachePriority::LRUIterator && lru_iterator_,
bool is_protected_);
const Entry & getEntry() const override;
EntryPtr getEntry() const override;
size_t increasePriority(const CacheGuard::Lock &) override;
@ -81,7 +77,8 @@ private:
void assertValid() const;
SLRUFileCachePriority * cache_priority;
mutable LRUIterator lru_iterator;
LRUFileCachePriority::LRUIterator lru_iterator;
const EntryPtr entry;
bool is_protected;
};