diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index d135e30cbcd..238541cc1f7 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -115,8 +115,7 @@ String FileSegment::getOrSetDownloader() { std::lock_guard segment_lock(mutex); - if (detached) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot set downloader for a detached file segment"); + assertNotDetached(segment_lock); if (downloader_id.empty()) { @@ -140,6 +139,8 @@ void FileSegment::resetDownloader() { std::lock_guard segment_lock(mutex); + assertNotDetached(segment_lock); + if (downloader_id.empty()) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "There is no downloader"); @@ -168,6 +169,7 @@ String FileSegment::getDownloader() const bool FileSegment::isDownloader() const { std::lock_guard segment_lock(mutex); + return getCallerId() == downloader_id; } @@ -232,8 +234,12 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) "Attempt to write {} bytes to offset: {}, but current download offset is {}", size, offset_, download_offset); + std::unique_lock detach_lock(detach_mutex, std::defer_lock); + { std::lock_guard segment_lock(mutex); + detach_lock.lock(); + assertNotDetached(segment_lock); } @@ -278,6 +284,9 @@ FileSegment::State FileSegment::wait() { std::unique_lock segment_lock(mutex); + if (detached) + throwDetached(); + if (downloader_id.empty()) return download_state; @@ -302,8 +311,11 @@ bool FileSegment::reserve(size_t size) if (!size) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed"); + std::unique_lock detach_lock(detach_mutex, std::defer_lock); + { std::lock_guard segment_lock(mutex); + detach_lock.lock(); assertNotDetached(segment_lock); @@ -437,6 +449,9 @@ void FileSegment::complete(State state) void FileSegment::complete(std::lock_guard & cache_lock) { std::lock_guard segment_lock(mutex); + + assertNotDetached(segment_lock); + completeUnlocked(cache_lock, segment_lock); } @@ -585,16 +600,19 @@ void FileSegment::assertCorrectnessImpl(std::lock_guard & /* segment assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(cache->getPathInLocalCache(key(), offset())) > 0); } +void FileSegment::throwDetached() +{ + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Cache file segment is in detached state, operation not allowed. " + "It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. " + "Please, retry"); +} + void FileSegment::assertNotDetached(std::lock_guard & /* segment_lock */) const { if (detached) - { - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Cache file segment is in detached state, operation not allowed. " - "It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. " - "Please, retry"); - } + throwDetached(); } void FileSegment::assertDetachedStatus(std::lock_guard & segment_lock) const @@ -635,6 +653,8 @@ void FileSegment::detach(std::lock_guard & /* cache_lock */, std::lo if (detached) return; + std::lock_guard detach_lock(detach_mutex); + detached = true; download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; downloader_id.clear(); @@ -658,14 +678,17 @@ FileSegmentsHolder::~FileSegmentsHolder() if (!cache) cache = file_segment->cache; + try { bool detached = false; + { std::lock_guard segment_lock(file_segment->mutex); detached = file_segment->isDetached(segment_lock); if (detached) file_segment->assertDetachedStatus(segment_lock); } + if (detached) { /// This file segment is not owned by cache, so it will be destructed @@ -674,10 +697,6 @@ FileSegmentsHolder::~FileSegmentsHolder() continue; } - } - - try - { /// File segment pointer must be reset right after calling complete() and /// under the same mutex, because complete() checks for segment pointers. std::lock_guard cache_lock(cache->mutex); @@ -689,7 +708,6 @@ FileSegmentsHolder::~FileSegmentsHolder() catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); - assert(false); } } } diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index 4aa9a934333..15a9a8871b3 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -158,6 +158,7 @@ private: void assertDetachedStatus(std::lock_guard & segment_lock) const; bool hasFinalizedState() const; bool isDetached(std::lock_guard & /* segment_lock */) const { return detached; } + [[noreturn]] static void throwDetached(); void setDownloaded(std::lock_guard & segment_lock); void setDownloadFailed(std::lock_guard & segment_lock); @@ -218,6 +219,7 @@ private: std::atomic ref_count = 0; /// Used for getting snapshot state bool is_write_through_cache = false; + std::mutex detach_mutex; }; struct FileSegmentsHolder : private boost::noncopyable diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index ebc43801c39..18431d4ca1f 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -171,6 +171,9 @@ void WriteBufferFromS3::finalizeImpl() if (!multipart_upload_id.empty()) completeMultipartUpload(); + + if (cacheEnabled()) + cache_writer.finalize(); } void WriteBufferFromS3::createMultipartUpload()