Merge pull request #36660 from kssenii/fix-stress-test

Fix stress test after 36639
This commit is contained in:
Kseniia Sumarokova 2022-04-29 12:56:25 +02:00 committed by GitHub
commit ca994e0861
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 14 deletions

View File

@ -576,7 +576,7 @@ void LRUFileCache::remove(bool force_remove_unreleasable)
if (file_segment) if (file_segment)
{ {
std::lock_guard<std::mutex> segment_lock(file_segment->mutex); std::lock_guard<std::mutex> segment_lock(file_segment->mutex);
file_segment->detached = true; file_segment->detach(cache_lock, segment_lock);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
} }
} }

View File

@ -498,7 +498,11 @@ void FileSegment::complete(State state)
void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock) void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
{ {
std::lock_guard segment_lock(mutex); std::lock_guard segment_lock(mutex);
completeUnlocked(cache_lock, segment_lock);
}
void FileSegment::completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
if (download_state == State::SKIP_CACHE || detached) if (download_state == State::SKIP_CACHE || detached)
return; return;
@ -647,11 +651,9 @@ void FileSegment::assertNotDetached() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Operation not allowed, file segment is detached"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Operation not allowed, file segment is detached");
} }
void FileSegment::assertDetachedStatus() const void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & /* segment_lock */) const
{ {
assert( assert(download_state == State::EMPTY || hasFinalizedState());
(download_state == State::EMPTY) || (download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|| (download_state == State::SKIP_CACHE));
} }
FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */) FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & /* cache_lock */)
@ -671,6 +673,26 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std
return snapshot; return snapshot;
} }
bool FileSegment::hasFinalizedState() const
{
return download_state == State::DOWNLOADED
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION
|| download_state == State::SKIP_CACHE;
}
void FileSegment::detach(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock)
{
if (detached)
return;
detached = true;
if (!hasFinalizedState())
{
completeUnlocked(cache_lock, segment_lock);
}
}
FileSegmentsHolder::~FileSegmentsHolder() FileSegmentsHolder::~FileSegmentsHolder()
{ {
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from /// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
@ -687,13 +709,22 @@ FileSegmentsHolder::~FileSegmentsHolder()
if (!cache) if (!cache)
cache = file_segment->cache; cache = file_segment->cache;
if (file_segment->detached)
{ {
/// This file segment is not owned by cache, so it will be destructed bool detached = false;
/// at this point, therefore no completion required. {
file_segment->assertDetachedStatus(); std::lock_guard segment_lock(file_segment->mutex);
file_segment_it = file_segments.erase(current_file_segment_it); detached = file_segment->isDetached(segment_lock);
continue; if (detached)
file_segment->assertDetachedStatus(segment_lock);
}
if (detached)
{
/// This file segment is not owned by cache, so it will be destructed
/// at this point, therefore no completion required.
file_segment_it = file_segments.erase(current_file_segment_it);
continue;
}
} }
try try

View File

@ -144,6 +144,8 @@ public:
static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & cache_lock); static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard<std::mutex> & cache_lock);
void detach(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
private: private:
size_t availableSize() const { return reserved_size - downloaded_size; } size_t availableSize() const { return reserved_size - downloaded_size; }
@ -151,8 +153,9 @@ private:
String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const; String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const; void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached() const; void assertNotDetached() const;
void assertDetachedStatus() const; void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
bool hasFinalizedState() const;
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return detached; }
void setDownloaded(std::lock_guard<std::mutex> & segment_lock); void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock); void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
@ -167,6 +170,7 @@ private:
/// is the last alive holder of the segment. Therefore, complete() and destruction /// is the last alive holder of the segment. Therefore, complete() and destruction
/// of the file segment pointer must be done under the same cache mutex. /// of the file segment pointer must be done under the same cache mutex.
void complete(std::lock_guard<std::mutex> & cache_lock); void complete(std::lock_guard<std::mutex> & cache_lock);
void completeUnlocked(std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & segment_lock);
void completeImpl( void completeImpl(
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & cache_lock,

View File

@ -46,7 +46,6 @@ CachedReadBufferFromRemoteFS::CachedReadBufferFromRemoteFS(
void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size) void CachedReadBufferFromRemoteFS::initialize(size_t offset, size_t size)
{ {
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache) if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
{ {
file_segments_holder.emplace(cache->get(cache_key, offset, size)); file_segments_holder.emplace(cache->get(cache_key, offset, size));

View File

@ -95,6 +95,7 @@ void WriteBufferFromS3::nextImpl()
if (cacheEnabled()) if (cacheEnabled())
{ {
auto cache_key = cache->hash(key); auto cache_key = cache->hash(key);
file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size)); file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size));
current_download_offset += size; current_download_offset += size;
@ -133,6 +134,7 @@ void WriteBufferFromS3::nextImpl()
writePart(); writePart();
allocateBuffer(); allocateBuffer();
file_segments_holder.reset();
} }
waitForReadyBackGroundTasks(); waitForReadyBackGroundTasks();