This commit is contained in:
kssenii 2022-05-03 13:15:27 +02:00
parent a3c9ff7fe8
commit b0d43e3f40
3 changed files with 37 additions and 14 deletions

View File

@ -115,8 +115,7 @@ String FileSegment::getOrSetDownloader()
{
std::lock_guard segment_lock(mutex);
if (detached)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot set downloader for a detached file segment");
assertNotDetached(segment_lock);
if (downloader_id.empty())
{
@ -140,6 +139,8 @@ void FileSegment::resetDownloader()
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (downloader_id.empty())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "There is no downloader");
@ -168,6 +169,7 @@ String FileSegment::getDownloader() const
bool FileSegment::isDownloader() const
{
std::lock_guard segment_lock(mutex);
return getCallerId() == downloader_id;
}
@ -232,8 +234,12 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Attempt to write {} bytes to offset: {}, but current download offset is {}",
size, offset_, download_offset);
std::unique_lock detach_lock(detach_mutex, std::defer_lock);
{
std::lock_guard segment_lock(mutex);
detach_lock.lock();
assertNotDetached(segment_lock);
}
@ -278,6 +284,9 @@ FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
if (detached)
throwDetached();
if (downloader_id.empty())
return download_state;
@ -302,8 +311,11 @@ bool FileSegment::reserve(size_t size)
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
std::unique_lock detach_lock(detach_mutex, std::defer_lock);
{
std::lock_guard segment_lock(mutex);
detach_lock.lock();
assertNotDetached(segment_lock);
@ -437,6 +449,9 @@ void FileSegment::complete(State state)
void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
{
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
completeUnlocked(cache_lock, segment_lock);
}
@ -585,16 +600,19 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard<std::mutex> & /* segment
assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0);
}
void FileSegment::throwDetached()
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry");
}
void FileSegment::assertNotDetached(std::lock_guard<std::mutex> & /* segment_lock */) const
{
if (detached)
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache file segment is in detached state, operation not allowed. "
"It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. "
"Please, retry");
}
throwDetached();
}
void FileSegment::assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const
@ -635,6 +653,8 @@ void FileSegment::detach(std::lock_guard<std::mutex> & /* cache_lock */, std::lo
if (detached)
return;
std::lock_guard detach_lock(detach_mutex);
detached = true;
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
downloader_id.clear();
@ -658,14 +678,17 @@ FileSegmentsHolder::~FileSegmentsHolder()
if (!cache)
cache = file_segment->cache;
try
{
bool detached = false;
{
std::lock_guard segment_lock(file_segment->mutex);
detached = file_segment->isDetached(segment_lock);
if (detached)
file_segment->assertDetachedStatus(segment_lock);
}
if (detached)
{
/// This file segment is not owned by cache, so it will be destructed
@ -674,10 +697,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
continue;
}
}
try
{
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std::lock_guard cache_lock(cache->mutex);
@ -689,7 +708,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(false);
}
}
}

View File

@ -158,6 +158,7 @@ private:
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
bool hasFinalizedState() const;
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return detached; }
[[noreturn]] static void throwDetached();
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
@ -218,6 +219,7 @@ private:
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
bool is_write_through_cache = false;
std::mutex detach_mutex;
};
struct FileSegmentsHolder : private boost::noncopyable

View File

@ -171,6 +171,9 @@ void WriteBufferFromS3::finalizeImpl()
if (!multipart_upload_id.empty())
completeMultipartUpload();
if (cacheEnabled())
cache_writer.finalize();
}
void WriteBufferFromS3::createMultipartUpload()