This commit is contained in:
kssenii 2023-03-20 13:23:24 +01:00
parent 19413f6228
commit 019de827a1
4 changed files with 25 additions and 17 deletions

View File

@ -481,7 +481,6 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
if (read_type == ReadType::CACHED) if (read_type == ReadType::CACHED)
{ {
chassert(current_file_segment->getDownloadedSize(true) == current_file_segment->range().size()); chassert(current_file_segment->getDownloadedSize(true) == current_file_segment->range().size());
chassert(current_file_segment->state() == FileSegment::State::DOWNLOADED);
} }
file_segments->popFront(); file_segments->popFront();

View File

@ -518,7 +518,16 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, KeyMetad
assertInitialized(); assertInitialized();
auto lock = cache_guard.lock(); auto lock = cache_guard.lock();
auto locked_key = createLockedKey(key, key_metadata); auto locked_key = createLockedKey(key, key_metadata);
return tryReserveUnlocked(key, offset, size, locked_key, lock);
LOG_TEST(log, "Reserving {} bytes for key {} at offset {}", size, key.toString(), offset);
const bool reserved = tryReserveUnlocked(key, offset, size, locked_key, lock);
if (reserved)
LOG_TEST(log, "Successfully reserved {} bytes for key {} at offset {}", size, key.toString(), offset);
else
LOG_TEST(log, "Failed to reserve {} bytes for key {} at offset {}", size, key.toString(), offset);
return reserved;
} }
bool FileCache::tryReserveUnlocked( bool FileCache::tryReserveUnlocked(
@ -586,7 +595,7 @@ bool FileCache::tryReserveImpl(
size_t size, size_t size,
LockedKeyPtr locked_key, LockedKeyPtr locked_key,
FileCacheQueryLimit::LockedQueryContext * query_context, FileCacheQueryLimit::LockedQueryContext * query_context,
const CacheGuard::Lock & priority_lock) const CacheGuard::Lock & cache_lock)
{ {
/// Iterate cache entries in the priority of `priority_queue`. /// Iterate cache entries in the priority of `priority_queue`.
/// If some entry is in `priority_queue` it must be guaranteed to have a /// If some entry is in `priority_queue` it must be guaranteed to have a
@ -599,8 +608,8 @@ bool FileCache::tryReserveImpl(
LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset); LOG_TEST(log, "Reserving space {} for {}:{}", size, key.toString(), offset);
LockedCachePriority locked_priority_queue(priority_lock, priority_queue); LockedCachePriority locked_priority_queue(cache_lock, priority_queue);
LockedCachePriority locked_main_priority(priority_lock, *main_priority); LockedCachePriority locked_main_priority(cache_lock, *main_priority);
size_t queue_size = locked_priority_queue.getElementsCount(); size_t queue_size = locked_priority_queue.getElementsCount();
chassert(queue_size <= locked_priority_queue.getElementsLimit()); chassert(queue_size <= locked_priority_queue.getElementsLimit());
@ -669,7 +678,7 @@ bool FileCache::tryReserveImpl(
{ {
remove_current_it = true; remove_current_it = true;
file_segment_metadata->queue_iterator = {}; file_segment_metadata->queue_iterator = {};
current_locked_key.remove(file_segment, priority_lock); current_locked_key.remove(file_segment, cache_lock);
break; break;
} }
} }
@ -693,7 +702,7 @@ bool FileCache::tryReserveImpl(
for (const auto & offset_to_delete : current_locked_key->delete_offsets) for (const auto & offset_to_delete : current_locked_key->delete_offsets)
{ {
auto * file_segment_metadata = current_locked_key->getKeyMetadata()->getByOffset(offset_to_delete); auto * file_segment_metadata = current_locked_key->getKeyMetadata()->getByOffset(offset_to_delete);
current_locked_key->remove(file_segment_metadata->file_segment, priority_lock); current_locked_key->remove(file_segment_metadata->file_segment, cache_lock);
if (query_context) if (query_context)
query_context->remove(key, offset); query_context->remove(key, offset);
} }
@ -708,7 +717,7 @@ bool FileCache::tryReserveImpl(
/// acquires queue iterator on first successful space reservation attempt. /// acquires queue iterator on first successful space reservation attempt.
/// If queue iterator already exists, we need to update the size after each space reservation. /// If queue iterator already exists, we need to update the size after each space reservation.
if (file_segment_for_reserve->queue_iterator) if (file_segment_for_reserve->queue_iterator)
LockedCachePriorityIterator(priority_lock, file_segment_for_reserve->queue_iterator).incrementSize(size); LockedCachePriorityIterator(cache_lock, file_segment_for_reserve->queue_iterator).incrementSize(size);
else else
{ {
/// Space reservation is incremental, so file_segment_metadata is created first (with state empty), /// Space reservation is incremental, so file_segment_metadata is created first (with state empty),
@ -722,11 +731,11 @@ bool FileCache::tryReserveImpl(
auto queue_iterator = query_context->tryGet(key, offset); auto queue_iterator = query_context->tryGet(key, offset);
if (queue_iterator) if (queue_iterator)
{ {
LockedCachePriorityIterator(priority_lock, queue_iterator).incrementSize(size); LockedCachePriorityIterator(cache_lock, queue_iterator).incrementSize(size);
} }
else else
{ {
auto it = LockedCachePriority(priority_lock, query_context->getPriority()).add(key, offset, size, locked_key->getKeyMetadata()); auto it = LockedCachePriority(cache_lock, query_context->getPriority()).add(key, offset, size, locked_key->getKeyMetadata());
query_context->add(key, offset, it); query_context->add(key, offset, it);
} }
} }
@ -1124,10 +1133,10 @@ size_t FileCache::getFileSegmentsNum() const
void FileCache::assertCacheCorrectness() void FileCache::assertCacheCorrectness()
{ {
return assertCacheCorrectness(metadata.lock(), cache_guard.lock()); return assertCacheCorrectness(cache_guard.lock(), metadata.lock());
} }
void FileCache::assertCacheCorrectness(const CacheMetadataGuard::Lock & metadata_lock, const CacheGuard::Lock & cache_lock) void FileCache::assertCacheCorrectness(const CacheGuard::Lock & cache_lock, const CacheMetadataGuard::Lock & metadata_lock)
{ {
iterateCacheMetadata(metadata_lock, [&](KeyMetadata & key_metadata) iterateCacheMetadata(metadata_lock, [&](KeyMetadata & key_metadata)
{ {

View File

@ -246,7 +246,7 @@ private:
void performDelayedRemovalOfDeletedKeysFromMetadata(const CacheMetadataGuard::Lock &); void performDelayedRemovalOfDeletedKeysFromMetadata(const CacheMetadataGuard::Lock &);
void assertCacheCorrectness(); void assertCacheCorrectness();
void assertCacheCorrectness(const CacheMetadataGuard::Lock &, const CacheGuard::Lock &); void assertCacheCorrectness(const CacheGuard::Lock & cache_lock, const CacheMetadataGuard::Lock & metadata_lock);
}; };
} }

View File

@ -48,7 +48,7 @@ namespace DB
/** /**
* Cache priority queue guard. * Cache priority queue guard.
*/ */
struct CacheGuard struct CacheGuard : private boost::noncopyable
{ {
struct Lock : public std::unique_lock<std::mutex> struct Lock : public std::unique_lock<std::mutex>
{ {
@ -62,7 +62,7 @@ struct CacheGuard
/** /**
* Guard for cache metadata. * Guard for cache metadata.
*/ */
struct CacheMetadataGuard struct CacheMetadataGuard : private boost::noncopyable
{ {
struct Lock : public std::unique_lock<std::mutex> struct Lock : public std::unique_lock<std::mutex>
{ {
@ -77,7 +77,7 @@ struct CacheMetadataGuard
* Guard for a set of keys. * Guard for a set of keys.
* One guard per key prefix (first three digits of the path hash). * One guard per key prefix (first three digits of the path hash).
*/ */
struct KeyGuard struct KeyGuard : private boost::noncopyable
{ {
struct Lock : public std::unique_lock<std::mutex> struct Lock : public std::unique_lock<std::mutex>
{ {
@ -92,7 +92,7 @@ using KeyGuardPtr = std::shared_ptr<KeyGuard>;
/** /**
* Guard for a file segment. * Guard for a file segment.
*/ */
struct FileSegmentGuard struct FileSegmentGuard : private boost::noncopyable
{ {
struct Lock : public std::unique_lock<std::mutex> struct Lock : public std::unique_lock<std::mutex>
{ {