Merge pull request #50021 from kssenii/fix-logical-error-in-try-reserve

Fix logical error in stress test "Not enough space to add ..."
This commit is contained in:
Kseniia Sumarokova 2023-05-23 12:05:01 +02:00 committed by GitHub
commit 13cd2d6d5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -563,15 +563,9 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
file_segment.key(), file_segment.offset());
}
size_t queue_size = main_priority->getElementsCount(cache_lock);
chassert(queue_size <= main_priority->getElementsLimit());
/// A file_segment_metadata acquires a LRUQueue iterator on first successful space reservation attempt.
auto queue_iterator = file_segment.getQueueIterator();
if (queue_iterator)
chassert(file_segment.getReservedSize() > 0);
else
queue_size += 1;
chassert(!queue_iterator || file_segment.getReservedSize() > 0);
struct EvictionCandidates
{
@ -596,6 +590,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
};
std::unordered_map<Key, EvictionCandidates> to_delete;
size_t freeable_space = 0, freeable_count = 0;
size_t removed_size = 0;
auto iterate_func = [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
@ -607,17 +602,19 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
if (releasable)
{
removed_size += segment_metadata->size();
--queue_size;
auto segment = segment_metadata->file_segment;
if (segment->state() == FileSegment::State::DOWNLOADED)
{
const auto & key = segment->key();
auto it = to_delete.find(key);
if (it == to_delete.end())
it = to_delete.emplace(key, locked_key.getKeyMetadata()).first;
it->second.add(segment_metadata);
freeable_space += segment_metadata->size();
freeable_count += 1;
return PriorityIterationResult::CONTINUE;
}
@ -632,10 +629,12 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
{
auto is_query_priority_overflow = [&]
{
const size_t new_size = query_priority->getSize(cache_lock) + size - removed_size;
const size_t new_size = query_priority->getSize(cache_lock) + size - freeable_space;
return new_size > query_priority->getSizeLimit();
};
if (is_query_priority_overflow())
{
query_priority->iterate(
[&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{ return is_query_priority_overflow() ? iterate_func(locked_key, segment_metadata) : PriorityIterationResult::BREAK; },
@ -643,6 +642,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
if (is_query_priority_overflow())
return false;
}
LOG_TEST(
log, "Query limits satisfied (while reserving for {}:{})",
@ -652,10 +652,11 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
auto is_main_priority_overflow = [&]
{
/// max_size == 0 means unlimited cache size,
/// max_element_size means unlimited number of cache elements.
/// max_element_size == 0 means unlimited number of cache elements.
const bool is_overflow = (main_priority->getSizeLimit() != 0
&& main_priority->getSize(cache_lock) + size - removed_size > main_priority->getSizeLimit())
|| (main_priority->getElementsLimit() != 0 && queue_size > main_priority->getElementsLimit());
&& (main_priority->getSize(cache_lock) + size - freeable_space > main_priority->getSizeLimit()))
|| (main_priority->getElementsLimit() != 0
&& freeable_count == 0 && main_priority->getElementsCount(cache_lock) == main_priority->getElementsLimit());
LOG_TEST(
log, "Overflow: {}, size: {}, ready to remove: {}, current cache size: {}/{}, elements: {}/{}, while reserving for {}:{}",
@ -667,6 +668,8 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
return is_overflow;
};
if (is_main_priority_overflow())
{
main_priority->iterate(
[&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{ return is_main_priority_overflow() ? iterate_func(locked_key, segment_metadata) : PriorityIterationResult::BREAK; },
@ -674,6 +677,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
if (is_main_priority_overflow())
return false;
}
if (!file_segment.getKeyMetadata()->createBaseDirectory())
return false;