mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #58842 from ClickHouse/fix-data-race-in-slru
fs cache: fix data race in slru
This commit is contained in:
commit
5fcf1b216b
@ -817,7 +817,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
|
||||
}
|
||||
|
||||
file_segment.reserved_size += size;
|
||||
chassert(file_segment.reserved_size == queue_iterator->getEntry().size);
|
||||
chassert(file_segment.reserved_size == queue_iterator->getEntry()->size);
|
||||
|
||||
if (query_context)
|
||||
{
|
||||
|
@ -789,9 +789,9 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) cons
|
||||
|
||||
const auto & entry = it->getEntry();
|
||||
UNUSED(entry);
|
||||
chassert(entry.size == reserved_size);
|
||||
chassert(entry.key == key());
|
||||
chassert(entry.offset == offset());
|
||||
chassert(entry->size == reserved_size);
|
||||
chassert(entry->key == key());
|
||||
chassert(entry->offset == offset());
|
||||
};
|
||||
|
||||
if (download_state == State::DOWNLOADED)
|
||||
|
@ -31,13 +31,14 @@ public:
|
||||
std::atomic<size_t> size;
|
||||
size_t hits = 0;
|
||||
};
|
||||
using EntryPtr = std::shared_ptr<Entry>;
|
||||
|
||||
class Iterator
|
||||
{
|
||||
public:
|
||||
virtual ~Iterator() = default;
|
||||
|
||||
virtual const Entry & getEntry() const = 0;
|
||||
virtual EntryPtr getEntry() const = 0;
|
||||
|
||||
virtual size_t increasePriority(const CacheGuard::Lock &) = 0;
|
||||
|
||||
|
@ -36,49 +36,49 @@ IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
|
||||
const CacheGuard::Lock & lock,
|
||||
bool /* is_startup */)
|
||||
{
|
||||
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
|
||||
return std::make_shared<LRUIterator>(add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock));
|
||||
}
|
||||
|
||||
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock & lock)
|
||||
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(EntryPtr entry, const CacheGuard::Lock & lock)
|
||||
{
|
||||
if (entry.size == 0)
|
||||
if (entry->size == 0)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Adding zero size entries to LRU queue is not allowed "
|
||||
"(key: {}, offset: {})", entry.key, entry.offset);
|
||||
"(key: {}, offset: {})", entry->key, entry->offset);
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
for (const auto & queue_entry : queue)
|
||||
{
|
||||
/// entry.size == 0 means entry was invalidated.
|
||||
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
|
||||
if (queue_entry->size != 0 && queue_entry->key == entry->key && queue_entry->offset == entry->offset)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to add duplicate queue entry to queue. "
|
||||
"(Key: {}, offset: {}, size: {})",
|
||||
entry.key, entry.offset, entry.size);
|
||||
entry->key, entry->offset, entry->size);
|
||||
}
|
||||
#endif
|
||||
|
||||
const auto & size_limit = getSizeLimit(lock);
|
||||
if (size_limit && current_size + entry.size > size_limit)
|
||||
if (size_limit && current_size + entry->size > size_limit)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Not enough space to add {}:{} with size {}: current size: {}/{}",
|
||||
entry.key, entry.offset, entry.size, current_size, size_limit);
|
||||
entry->key, entry->offset, entry->size, current_size, size_limit);
|
||||
}
|
||||
|
||||
auto iterator = queue.insert(queue.end(), entry);
|
||||
|
||||
updateSize(entry.size);
|
||||
updateSize(entry->size);
|
||||
updateElementsCount(1);
|
||||
|
||||
LOG_TEST(
|
||||
log, "Added entry into LRU queue, key: {}, offset: {}, size: {}",
|
||||
entry.key, entry.offset, entry.size);
|
||||
entry->key, entry->offset, entry->size);
|
||||
|
||||
return LRUIterator(this, iterator);
|
||||
}
|
||||
@ -86,15 +86,16 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, cons
|
||||
LRUFileCachePriority::LRUQueue::iterator LRUFileCachePriority::remove(LRUQueue::iterator it, const CacheGuard::Lock &)
|
||||
{
|
||||
/// If size is 0, entry is invalidated, current_elements_num was already updated.
|
||||
if (it->size)
|
||||
const auto & entry = **it;
|
||||
if (entry.size)
|
||||
{
|
||||
updateSize(-it->size);
|
||||
updateSize(-entry.size);
|
||||
updateElementsCount(-1);
|
||||
}
|
||||
|
||||
LOG_TEST(
|
||||
log, "Removed entry from LRU queue, key: {}, offset: {}, size: {}",
|
||||
it->key, it->offset, it->size);
|
||||
entry.key, entry.offset, entry.size);
|
||||
|
||||
return queue.erase(it);
|
||||
}
|
||||
@ -143,27 +144,28 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
|
||||
{
|
||||
for (auto it = queue.begin(); it != queue.end();)
|
||||
{
|
||||
auto locked_key = it->key_metadata->tryLock();
|
||||
if (!locked_key || it->size == 0)
|
||||
const auto & entry = **it;
|
||||
auto locked_key = entry.key_metadata->tryLock();
|
||||
if (!locked_key || entry.size == 0)
|
||||
{
|
||||
it = remove(it, lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto metadata = locked_key->tryGetByOffset(it->offset);
|
||||
auto metadata = locked_key->tryGetByOffset(entry.offset);
|
||||
if (!metadata)
|
||||
{
|
||||
it = remove(it, lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (metadata->size() != it->size)
|
||||
if (metadata->size() != entry.size)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Mismatch of file segment size in file segment metadata "
|
||||
"and priority queue: {} != {} ({})",
|
||||
it->size, metadata->size(), metadata->file_segment->getInfoForLog());
|
||||
entry.size, metadata->size(), metadata->file_segment->getInfoForLog());
|
||||
}
|
||||
|
||||
auto result = func(*locked_key, metadata);
|
||||
@ -249,7 +251,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
|
||||
|
||||
LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &)
|
||||
{
|
||||
const auto & entry = it.getEntry();
|
||||
const auto & entry = *it.getEntry();
|
||||
if (entry.size == 0)
|
||||
{
|
||||
throw Exception(
|
||||
@ -261,7 +263,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
|
||||
for (const auto & queue_entry : queue)
|
||||
{
|
||||
/// entry.size == 0 means entry was invalidated.
|
||||
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
|
||||
if (queue_entry->size != 0 && queue_entry->key == entry.key && queue_entry->offset == entry.offset)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to add duplicate queue entry to queue. "
|
||||
@ -347,34 +349,36 @@ void LRUFileCachePriority::LRUIterator::invalidate()
|
||||
{
|
||||
assertValid();
|
||||
|
||||
const auto & entry = *iterator;
|
||||
LOG_TEST(
|
||||
cache_priority->log,
|
||||
"Invalidating entry in LRU queue. Key: {}, offset: {}, previous size: {}",
|
||||
iterator->key, iterator->offset, iterator->size);
|
||||
entry->key, entry->offset, entry->size);
|
||||
|
||||
cache_priority->updateSize(-iterator->size);
|
||||
cache_priority->updateSize(-entry->size);
|
||||
cache_priority->updateElementsCount(-1);
|
||||
iterator->size = 0;
|
||||
entry->size = 0;
|
||||
}
|
||||
|
||||
void LRUFileCachePriority::LRUIterator::updateSize(int64_t size)
|
||||
{
|
||||
assertValid();
|
||||
|
||||
const auto & entry = *iterator;
|
||||
LOG_TEST(
|
||||
cache_priority->log,
|
||||
"Update size with {} in LRU queue for key: {}, offset: {}, previous size: {}",
|
||||
size, iterator->key, iterator->offset, iterator->size);
|
||||
size, entry->key, entry->offset, entry->size);
|
||||
|
||||
cache_priority->updateSize(size);
|
||||
iterator->size += size;
|
||||
entry->size += size;
|
||||
}
|
||||
|
||||
size_t LRUFileCachePriority::LRUIterator::increasePriority(const CacheGuard::Lock &)
|
||||
{
|
||||
assertValid();
|
||||
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, iterator);
|
||||
return ++iterator->hits;
|
||||
return ++((*iterator)->hits);
|
||||
}
|
||||
|
||||
void LRUFileCachePriority::LRUIterator::assertValid() const
|
||||
|
@ -15,7 +15,7 @@ class LRUFileCachePriority final : public IFileCachePriority
|
||||
{
|
||||
private:
|
||||
class LRUIterator;
|
||||
using LRUQueue = std::list<Entry>;
|
||||
using LRUQueue = std::list<EntryPtr>;
|
||||
friend class SLRUFileCachePriority;
|
||||
|
||||
public:
|
||||
@ -76,7 +76,7 @@ private:
|
||||
void iterate(IterateFunc && func, const CacheGuard::Lock &);
|
||||
|
||||
LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &);
|
||||
LRUIterator add(Entry && entry, const CacheGuard::Lock &);
|
||||
LRUIterator add(EntryPtr entry, const CacheGuard::Lock &);
|
||||
};
|
||||
|
||||
class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator
|
||||
@ -91,7 +91,7 @@ public:
|
||||
LRUIterator & operator =(const LRUIterator & other);
|
||||
bool operator ==(const LRUIterator & other) const;
|
||||
|
||||
const Entry & getEntry() const override { return *iterator; }
|
||||
EntryPtr getEntry() const override { return *iterator; }
|
||||
|
||||
size_t increasePriority(const CacheGuard::Lock &) override;
|
||||
|
||||
|
@ -61,18 +61,18 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT
|
||||
/// because we do not know the distribution between queues after server restart.
|
||||
if (probationary_queue.canFit(size, lock))
|
||||
{
|
||||
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
|
||||
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
|
||||
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto lru_iterator = protected_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
|
||||
auto lru_iterator = protected_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
|
||||
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), true);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
|
||||
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
|
||||
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
|
||||
}
|
||||
}
|
||||
@ -151,7 +151,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
|
||||
|
||||
/// Entry is in probationary queue.
|
||||
/// We need to move it to protected queue.
|
||||
const size_t size = iterator.getEntry().size;
|
||||
const size_t size = iterator.getEntry()->size;
|
||||
if (size > protected_queue.getSizeLimit(lock))
|
||||
{
|
||||
/// Entry size is bigger than the whole protected queue limit.
|
||||
@ -205,7 +205,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
|
||||
|
||||
/// All checks passed, now we can move downgrade candidates to
|
||||
/// probationary queue and our entry to protected queue.
|
||||
Entry entry_copy = iterator.getEntry();
|
||||
EntryPtr entry = iterator.getEntry();
|
||||
iterator.lru_iterator.remove(lock);
|
||||
|
||||
for (const auto & [key, key_candidates] : downgrade_candidates)
|
||||
@ -218,7 +218,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
|
||||
}
|
||||
}
|
||||
|
||||
iterator.lru_iterator = protected_queue.add(std::move(entry_copy), lock);
|
||||
iterator.lru_iterator = protected_queue.add(entry, lock);
|
||||
iterator.is_protected = true;
|
||||
}
|
||||
|
||||
@ -257,21 +257,22 @@ SLRUFileCachePriority::SLRUIterator::SLRUIterator(
|
||||
bool is_protected_)
|
||||
: cache_priority(cache_priority_)
|
||||
, lru_iterator(lru_iterator_)
|
||||
, entry(lru_iterator.getEntry())
|
||||
, is_protected(is_protected_)
|
||||
{
|
||||
}
|
||||
|
||||
const SLRUFileCachePriority::Entry & SLRUFileCachePriority::SLRUIterator::getEntry() const
|
||||
SLRUFileCachePriority::EntryPtr SLRUFileCachePriority::SLRUIterator::getEntry() const
|
||||
{
|
||||
assertValid();
|
||||
return lru_iterator.getEntry();
|
||||
chassert(entry == lru_iterator.getEntry());
|
||||
return entry;
|
||||
}
|
||||
|
||||
size_t SLRUFileCachePriority::SLRUIterator::increasePriority(const CacheGuard::Lock & lock)
|
||||
{
|
||||
assertValid();
|
||||
cache_priority->increasePriority(*this, lock);
|
||||
return getEntry().hits;
|
||||
return getEntry()->hits;
|
||||
}
|
||||
|
||||
void SLRUFileCachePriority::SLRUIterator::updateSize(int64_t size)
|
||||
|
@ -11,10 +11,6 @@ namespace DB
|
||||
/// the head of the queue, and the record with the highest priority is stored at the tail.
|
||||
class SLRUFileCachePriority : public IFileCachePriority
|
||||
{
|
||||
private:
|
||||
using LRUIterator = LRUFileCachePriority::LRUIterator;
|
||||
using LRUQueue = std::list<Entry>;
|
||||
|
||||
public:
|
||||
class SLRUIterator;
|
||||
|
||||
@ -62,10 +58,10 @@ class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator
|
||||
public:
|
||||
SLRUIterator(
|
||||
SLRUFileCachePriority * cache_priority_,
|
||||
LRUIterator && lru_iterator_,
|
||||
LRUFileCachePriority::LRUIterator && lru_iterator_,
|
||||
bool is_protected_);
|
||||
|
||||
const Entry & getEntry() const override;
|
||||
EntryPtr getEntry() const override;
|
||||
|
||||
size_t increasePriority(const CacheGuard::Lock &) override;
|
||||
|
||||
@ -81,7 +77,8 @@ private:
|
||||
void assertValid() const;
|
||||
|
||||
SLRUFileCachePriority * cache_priority;
|
||||
mutable LRUIterator lru_iterator;
|
||||
LRUFileCachePriority::LRUIterator lru_iterator;
|
||||
const EntryPtr entry;
|
||||
bool is_protected;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user