Fix stress test

This commit is contained in:
kssenii 2023-03-03 17:20:27 +01:00
parent c272f4befb
commit 7b369e516c
9 changed files with 95 additions and 76 deletions

View File

@ -228,7 +228,7 @@ bool CachedOnDiskReadBufferFromFile::canStartFromCache(size_t current_offset, co
/// requested_range: [__________]
/// ^
/// current_offset
size_t first_non_downloaded_offset = file_segment.getFirstNonDownloadedOffset();
size_t first_non_downloaded_offset = file_segment.getFirstNonDownloadedOffset(true);
return first_non_downloaded_offset > current_offset;
}
@ -321,7 +321,8 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegment & file_s
return getCacheReadBuffer(file_segment);
}
if (file_segment.getCurrentWriteOffset() < file_offset_of_buffer_end)
auto current_write_offset = file_segment.getCurrentWriteOffset(false);
if (current_write_offset < file_offset_of_buffer_end)
{
/// segment{1}
/// cache: [_____|___________
@ -332,8 +333,8 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegment & file_s
/// file_offset_of_buffer_end
LOG_TEST(log, "Predownload. File segment info: {}", file_segment.getInfoForLog());
chassert(file_offset_of_buffer_end > file_segment.getCurrentWriteOffset());
bytes_to_predownload = file_offset_of_buffer_end - file_segment.getCurrentWriteOffset();
chassert(file_offset_of_buffer_end > current_write_offset);
bytes_to_predownload = file_offset_of_buffer_end - current_write_offset;
chassert(bytes_to_predownload < file_segment.range().size());
}
@ -438,7 +439,7 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegment & file_segme
if (bytes_to_predownload)
{
size_t current_write_offset = file_segment.getCurrentWriteOffset();
size_t current_write_offset = file_segment.getCurrentWriteOffset(false);
read_buffer_for_file_segment->seek(current_write_offset, SEEK_SET);
}
else
@ -449,7 +450,7 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegment & file_segme
assert(static_cast<size_t>(read_buffer_for_file_segment->getFileOffsetOfBufferEnd()) == file_offset_of_buffer_end);
}
auto current_write_offset = file_segment.getCurrentWriteOffset();
auto current_write_offset = file_segment.getCurrentWriteOffset(false);
if (current_write_offset != static_cast<size_t>(read_buffer_for_file_segment->getPosition()))
{
throw Exception(
@ -522,8 +523,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
/// download from offset a'' < a', but return buffer from offset a'.
LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId());
chassert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment.getCurrentWriteOffset());
size_t current_offset = file_segment.getCurrentWriteOffset();
chassert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment.getCurrentWriteOffset(false));
size_t current_offset = file_segment.getCurrentWriteOffset(false);
const auto & current_range = file_segment.range();
while (true)
@ -549,7 +550,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
"current download offset: {}, expected: {}, eof: {}",
bytes_to_predownload,
current_range.toString(),
file_segment.getCurrentWriteOffset(),
file_segment.getCurrentWriteOffset(false),
file_offset_of_buffer_end,
implementation_buffer->eof());
@ -559,7 +560,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
{
nextimpl_working_buffer_offset = implementation_buffer->offset();
auto current_write_offset = file_segment.getCurrentWriteOffset();
auto current_write_offset = file_segment.getCurrentWriteOffset(false);
if (current_write_offset != static_cast<size_t>(implementation_buffer->getPosition())
|| current_write_offset != file_offset_of_buffer_end)
{
@ -588,7 +589,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
{
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
chassert(file_segment.getCurrentWriteOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
chassert(file_segment.getCurrentWriteOffset(false) == static_cast<size_t>(implementation_buffer->getPosition()));
bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, file_segment);
if (success)
@ -677,7 +678,7 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded()
/// ^
/// file_offset_of_buffer_end
auto current_write_offset = file_segment.getCurrentWriteOffset();
auto current_write_offset = file_segment.getCurrentWriteOffset(true);
bool cached_part_is_finished = current_write_offset == file_offset_of_buffer_end;
LOG_TEST(log, "Current write offset: {}, file offset of buffer end: {}", current_write_offset, file_offset_of_buffer_end);
@ -914,15 +915,15 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
bool success = file_segment.reserve(size);
if (success)
{
chassert(file_segment.getCurrentWriteOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
chassert(file_segment.getCurrentWriteOffset(false) == static_cast<size_t>(implementation_buffer->getPosition()));
success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, file_segment);
if (success)
{
chassert(file_segment.getCurrentWriteOffset() <= file_segment.range().right + 1);
chassert(file_segment.getCurrentWriteOffset(false) <= file_segment.range().right + 1);
chassert(
/* last_file_segment */file_segments->size() == 1
|| file_segment.getCurrentWriteOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
|| file_segment.getCurrentWriteOffset(false) == implementation_buffer->getFileOffsetOfBufferEnd());
LOG_TEST(log, "Successfully written {} bytes", size);
}
@ -987,7 +988,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
current_read_range.toString(),
file_offset_of_buffer_end,
FileSegment::stateToString(file_segment.state()),
file_segment.getCurrentWriteOffset(),
file_segment.getCurrentWriteOffset(false),
toString(read_type),
read_until_position,
first_offset,

View File

@ -68,7 +68,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
while (size > 0)
{
size_t available_size = file_segment->range().size() - file_segment->getDownloadedSize();
size_t available_size = file_segment->range().size() - file_segment->getDownloadedSize(false);
if (available_size == 0)
{
completeFileSegment(*file_segment);
@ -157,7 +157,7 @@ void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_s
if (cache_log)
{
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize() - 1;
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
FilesystemCacheLogElement elem
{

View File

@ -130,7 +130,7 @@ FileSegments FileCache::getImpl(
{
if (file_segment_metadata.file_segment->isDownloaded())
{
if (file_segment_metadata.file_segment->getDownloadedSize() == 0)
if (file_segment_metadata.file_segment->getDownloadedSize(true) == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -504,11 +504,11 @@ KeyMetadata::iterator FileCache::addFileSegment(
return file_segment_metadata_it;
}
bool FileCache::tryReserve(const Key & key, size_t offset, size_t size)
bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, KeyMetadataPtr key_metadata)
{
assertInitialized();
auto lock = cache_guard.lock();
auto locked_key = createLockedKey(key, KeyNotFoundPolicy::THROW);
auto locked_key = createLockedKey(key, key_metadata);
return tryReserveUnlocked(key, offset, size, locked_key, lock);
}
@ -1016,7 +1016,6 @@ FileSegmentsHolderPtr FileCache::getSnapshot()
{
assertInitialized();
auto cache_lock = cache_guard.lock();
auto lock = metadata.lock();
performDelayedRemovalOfDeletedKeysFromMetadata(lock);

View File

@ -100,7 +100,7 @@ public:
size_t getMaxFileSegmentSize() const { return max_file_segment_size; }
bool tryReserve(const Key & key, size_t offset, size_t size);
bool tryReserve(const Key & key, size_t offset, size_t size, KeyMetadataPtr key_metadata);
FileSegmentsHolderPtr getSnapshot();

View File

@ -96,36 +96,23 @@ size_t FileSegment::getReservedSize() const
return reserved_size;
}
size_t FileSegment::getFirstNonDownloadedOffset() const
size_t FileSegment::getFirstNonDownloadedOffset(bool sync) const
{
auto lock = segment_guard.lock();
return getFirstNonDownloadedOffsetUnlocked(lock);
return range().left + getDownloadedSize(sync);
}
size_t FileSegment::getFirstNonDownloadedOffsetUnlocked(const FileSegmentGuard::Lock & lock) const
size_t FileSegment::getCurrentWriteOffset(bool sync) const
{
return range().left + getDownloadedSizeUnlocked(lock);
return getFirstNonDownloadedOffset(sync);
}
size_t FileSegment::getCurrentWriteOffset() const
{
auto lock = segment_guard.lock();
return getCurrentWriteOffsetUnlocked(lock);
}
size_t FileSegment::getCurrentWriteOffsetUnlocked(const FileSegmentGuard::Lock & lock) const
{
return getFirstNonDownloadedOffsetUnlocked(lock);
}
size_t FileSegment::getDownloadedSize() const
{
auto lock = segment_guard.lock();
return getDownloadedSizeUnlocked(lock);
}
size_t FileSegment::getDownloadedSizeUnlocked(const FileSegmentGuard::Lock &) const
size_t FileSegment::getDownloadedSize(bool sync) const
{
if (sync)
{
std::lock_guard lock(download_mutex);
return downloaded_size;
}
return downloaded_size;
}
@ -195,7 +182,7 @@ void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] const FileSegme
assert(isDownloaderUnlocked(lock));
assert(download_state == State::DOWNLOADING);
size_t current_downloaded_size = getDownloadedSizeUnlocked(lock);
size_t current_downloaded_size = getDownloadedSize(true);
/// range().size() can equal 0 in case of write-though cache.
if (current_downloaded_size != 0 && current_downloaded_size == range().size())
setDownloadedUnlocked(lock);
@ -312,14 +299,14 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
ErrorCodes::LOGICAL_ERROR,
"Expected DOWNLOADING state, got {}", stateToString(download_state));
size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(lock);
size_t first_non_downloaded_offset = getFirstNonDownloadedOffset(false);
if (offset != first_non_downloaded_offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset, first_non_downloaded_offset);
size_t current_downloaded_size = getDownloadedSizeUnlocked(lock);
size_t current_downloaded_size = getDownloadedSize(false);
chassert(reserved_size >= current_downloaded_size);
size_t free_reserved_size = reserved_size - current_downloaded_size;
@ -347,6 +334,8 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
{
cache_writer->write(from, size);
std::lock_guard lock(download_mutex);
cache_writer->next();
downloaded_size += size;
@ -366,7 +355,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
throw;
}
chassert(getFirstNonDownloadedOffset() == offset + size);
chassert(getFirstNonDownloadedOffset(false) == offset + size);
}
FileSegment::State FileSegment::wait()
@ -420,7 +409,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("reserve", lock);
expected_downloaded_size = getDownloadedSizeUnlocked(lock);
expected_downloaded_size = getDownloadedSize(false);
is_file_segment_size_exceeded = expected_downloaded_size + size_to_reserve > range().size();
if (is_file_segment_size_exceeded && !is_unbound)
@ -452,7 +441,7 @@ bool FileSegment::reserve(size_t size_to_reserve)
if (is_unbound && is_file_segment_size_exceeded)
segment_range.right = range().left + expected_downloaded_size + size_to_reserve;
reserved = cache->tryReserve(key(), offset(), size_to_reserve);
reserved = cache->tryReserve(key(), offset(), size_to_reserve, key_metadata.lock());
if (reserved)
{
/// No lock is required because reserved size is always
@ -480,8 +469,8 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] const FileSegmentGuard:
remote_file_reader.reset();
}
assert(getDownloadedSizeUnlocked(lock) > 0);
assert(std::filesystem::file_size(file_path) > 0);
chassert(getDownloadedSize(false) > 0);
chassert(std::filesystem::file_size(file_path) > 0);
}
void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
@ -547,7 +536,7 @@ void FileSegment::completeUnlocked(LockedKey & locked_key, const CacheGuard::Loc
const bool is_downloader = isDownloaderUnlocked(segment_lock);
const bool is_last_holder = locked_key.isLastHolder(offset());
const size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
const size_t current_downloaded_size = getDownloadedSize(true);
SCOPE_EXIT({
if (is_downloader)
@ -587,8 +576,8 @@ void FileSegment::completeUnlocked(LockedKey & locked_key, const CacheGuard::Loc
{
case State::DOWNLOADED:
{
chassert(getDownloadedSizeUnlocked(segment_lock) == range().size());
chassert(getDownloadedSizeUnlocked(segment_lock) == std::filesystem::file_size(file_path));
chassert(getDownloadedSize(false) == range().size());
chassert(getDownloadedSize(false) == std::filesystem::file_size(file_path));
chassert(!cache_writer);
is_completed = true;
@ -648,17 +637,17 @@ String FileSegment::getInfoForLog() const
return getInfoForLogUnlocked(lock);
}
String FileSegment::getInfoForLogUnlocked(const FileSegmentGuard::Lock & lock) const
String FileSegment::getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const
{
WriteBufferFromOwnString info;
info << "File segment: " << range().toString() << ", ";
info << "key: " << key().toString() << ", ";
info << "state: " << download_state << ", ";
info << "downloaded size: " << getDownloadedSizeUnlocked(lock) << ", ";
info << "downloaded size: " << getDownloadedSize(false) << ", ";
info << "reserved size: " << reserved_size.load() << ", ";
info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", ";
info << "current write offset: " << getCurrentWriteOffsetUnlocked(lock) << ", ";
info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(lock) << ", ";
info << "current write offset: " << getCurrentWriteOffset(false) << ", ";
info << "first non-downloaded offset: " << getFirstNonDownloadedOffset(false) << ", ";
info << "caller id: " << getCallerId() << ", ";
info << "kind: " << toString(segment_kind);
@ -755,7 +744,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment)
CreateFileSegmentSettings(file_segment->getKind()));
snapshot->hits_count = file_segment->getHitsCount();
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(lock);
snapshot->downloaded_size = file_segment->getDownloadedSize(false);
snapshot->download_state = file_segment->download_state;
snapshot->ref_count = file_segment.use_count();

View File

@ -192,11 +192,11 @@ public:
void incrementHitsCount() { ++hits_count; }
size_t getCurrentWriteOffset() const;
size_t getCurrentWriteOffset(bool sync) const;
size_t getFirstNonDownloadedOffset() const;
size_t getFirstNonDownloadedOffset(bool sync) const;
size_t getDownloadedSize() const;
size_t getDownloadedSize(bool sync) const;
/// Now detached status can be used in the following cases:
/// 1. there is only 1 remaining file segment holder
@ -262,12 +262,7 @@ public:
size_t getReservedSize() const;
private:
size_t getFirstNonDownloadedOffsetUnlocked(const FileSegmentGuard::Lock &) const;
size_t getCurrentWriteOffsetUnlocked(const FileSegmentGuard::Lock &) const;
size_t getDownloadedSizeUnlocked(const FileSegmentGuard::Lock &) const;
String getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const;
String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const;
void resetDownloaderUnlocked(const FileSegmentGuard::Lock &);
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &);
@ -318,6 +313,7 @@ private:
/// downloaded_size should always be less or equal to reserved_size
std::atomic<size_t> downloaded_size = 0;
std::atomic<size_t> reserved_size = 0;
mutable std::mutex download_mutex;
mutable FileSegmentGuard segment_guard;
std::weak_ptr<KeyMetadata> key_metadata;

View File

@ -7,8 +7,42 @@
namespace DB
{
/**
* Priority of locking:
* Cache priority queue guard > key prefix guard > file segment guard.
* FileCache::get/getOrSet/set
* 1. CacheMetadataGuard::Lock (take key lock and relase metadata lock)
* 2. KeyGuard::Lock (hold till the end of the method)
*
* FileCache::tryReserve
* 1. CacheGuard::Lock
* 2. KeyGuard::Lock (taken without metadata lock)
* 3. any number of KeyGuard::Lock's for files which are going to be evicted (taken via metadata lock)
*
* FileCache::removeIfExists
* 1. CacheGuard::Lock
* 2. KeyGuard::Lock (taken via metadata lock)
* 3. FileSegmentGuard::Lock
*
* FileCache::removeAllReleasable
* 1. CacheGuard::Lock
* 2. any number of KeyGuard::Lock's locks (takken via metadata lock), but at a moment of time only one key lock can be hold
* 3. FileSegmentGuard::Lock
*
* FileCache::getSnapshot (for all cache)
* 1. metadata lock
* 2. any number of KeyGuard::Lock's locks (takken via metadata lock), but at a moment of time only one key lock can be hold
* 3. FileSegmentGuard::Lock
*
* FileCache::getSnapshot(key)
* 1. KeyGuard::Lock (taken via metadata lock)
* 2. FileSegmentGuard::Lock
*
* FileSegment::complete
* 1. CacheGuard::Lock
* 2. KeyGuard::Lock (taken without metadata lock)
* 3. FileSegmentGuard::Lock
*
* Rules:
* 1. Priority of locking: CacheGuard::Lock > CacheMetadataGuard::Lock > KeyGuard::Lock > FileSegmentGuard::Lock
* 2. If we take more than one key lock at a moment of time, we need to take CacheGuard::Lock (example: tryReserve())
*/
/**

View File

@ -50,7 +50,7 @@ void download(const std::string & cache_base_path, DB::FileSegment & file_segmen
fs::create_directories(subdir);
std::string data(size, '0');
file_segment.write(data.data(), size, file_segment.getCurrentWriteOffset());
file_segment.write(data.data(), size, file_segment.getCurrentWriteOffset(false));
}
using Range = FileSegment::Range;
@ -102,7 +102,7 @@ void download(FileSegment & file_segment)
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
ASSERT_EQ(file_segment.getDownloadedSize(false), 0);
ASSERT_TRUE(file_segment.reserve(file_segment.range().size()));
download(cache_base_path, file_segment);
@ -115,7 +115,7 @@ void download(FileSegment & file_segment)
void assertDownloadFails(FileSegment & file_segment)
{
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
ASSERT_EQ(file_segment.getDownloadedSize(false), 0);
ASSERT_FALSE(file_segment.reserve(file_segment.range().size()));
file_segment.complete();
}

View File

@ -61,7 +61,7 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
res_columns[6]->insert(FileSegment::stateToString(file_segment->state()));
res_columns[7]->insert(file_segment->getHitsCount());
res_columns[8]->insert(file_segment->getRefCount());
res_columns[9]->insert(file_segment->getDownloadedSize());
res_columns[9]->insert(file_segment->getDownloadedSize(false));
res_columns[10]->insert(file_segment->isPersistent());
res_columns[11]->insert(toString(file_segment->getKind()));
}