diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 50a1ecb4a6c..9fe95c0d8cd 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -163,6 +163,11 @@ bool FileSegment::isDownloader() const return getCallerId() == downloader_id; } +bool FileSegment::isDownloaderImpl(std::lock_guard & /* segment_lock */) const +{ + return getCallerId() == downloader_id; +} + FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader() { if (!isDownloader()) @@ -397,6 +402,9 @@ bool FileSegment::reserve(size_t size) void FileSegment::setDownloaded(std::lock_guard & /* segment_lock */) { + if (is_downloaded) + return; + download_state = State::DOWNLOADED; is_downloaded = true; downloader_id.clear(); @@ -426,8 +434,7 @@ void FileSegment::completeBatchAndResetDownloader() { std::lock_guard segment_lock(mutex); - bool is_downloader = downloader_id == getCallerId(); - if (!is_downloader) + if (!isDownloaderImpl(segment_lock)) { cv.notify_all(); throw Exception( @@ -448,7 +455,7 @@ void FileSegment::complete(State state) std::lock_guard cache_lock(cache->mutex); std::lock_guard segment_lock(mutex); - bool is_downloader = downloader_id == getCallerId(); + bool is_downloader = isDownloaderImpl(segment_lock); if (!is_downloader) { cv.notify_all(); @@ -465,6 +472,9 @@ void FileSegment::complete(State state) "Cannot complete file segment with state: {}", stateToString(state)); } + if (state == State::DOWNLOADED) + setDownloaded(segment_lock); + download_state = state; assertNotDetached(); @@ -475,7 +485,7 @@ void FileSegment::complete(State state) } catch (...) { - if (!downloader_id.empty() && downloader_id == getCallerIdImpl()) + if (!downloader_id.empty() && is_downloader) downloader_id.clear(); cv.notify_all(); @@ -492,8 +502,12 @@ void FileSegment::complete(std::lock_guard & cache_lock) if (download_state == State::SKIP_CACHE || detached) return; - if (download_state != State::DOWNLOADED && getDownloadedSize(segment_lock) == range().size()) + if (isDownloaderImpl(segment_lock) + && download_state != State::DOWNLOADED + && getDownloadedSize(segment_lock) == range().size()) + { setDownloaded(segment_lock); + } assertNotDetached(); @@ -502,7 +516,7 @@ void FileSegment::complete(std::lock_guard & cache_lock) /// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the /// downloader or the only owner of the segment. - bool can_update_segment_state = downloader_id == getCallerIdImpl() + bool can_update_segment_state = isDownloaderImpl(segment_lock) || cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); if (can_update_segment_state) @@ -515,7 +529,7 @@ void FileSegment::complete(std::lock_guard & cache_lock) } catch (...) { - if (!downloader_id.empty() && downloader_id == getCallerIdImpl()) + if (!downloader_id.empty() && isDownloaderImpl(segment_lock)) downloader_id.clear(); cv.notify_all(); @@ -561,7 +575,7 @@ void FileSegment::completeImpl(std::lock_guard & cache_lock, std::lo } } - if (!downloader_id.empty() && (downloader_id == getCallerIdImpl() || is_last_holder)) + if (!downloader_id.empty() && (isDownloaderImpl(segment_lock) || is_last_holder)) { LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state)); downloader_id.clear(); diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index 8439389fdeb..e3011eeb0fa 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -154,6 +154,7 @@ private: void setDownloaded(std::lock_guard & segment_lock); void setDownloadFailed(std::lock_guard & segment_lock); + bool isDownloaderImpl(std::lock_guard & segment_lock) const; void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard & segment_lock) const; diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index 90a1d4ad43e..e6188a96a33 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -613,7 +613,10 @@ bool CachedReadBufferFromRemoteFS::nextImplStep() { bool need_complete_file_segment = file_segment->isDownloader(); if (need_complete_file_segment) + { + LOG_TEST(log, "Resetting downloader {} from scope exit", file_segment->getDownloader()); file_segment->completeBatchAndResetDownloader(); + } } catch (...) {