Merge pull request #36253 from kssenii/fix-s3-tests

Fix exception "File segment can be completed only by downloader" in tests
This commit is contained in:
Kseniia Sumarokova 2022-04-16 01:24:39 +02:00 committed by GitHub
commit 0abc6290b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 8 deletions

View File

@ -163,6 +163,11 @@ bool FileSegment::isDownloader() const
return getCallerId() == downloader_id;
}
bool FileSegment::isDownloaderImpl(std::lock_guard<std::mutex> & /* segment_lock */) const
{
return getCallerId() == downloader_id;
}
FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader()
{
if (!isDownloader())
@ -397,6 +402,9 @@ bool FileSegment::reserve(size_t size)
void FileSegment::setDownloaded(std::lock_guard<std::mutex> & /* segment_lock */)
{
if (is_downloaded)
return;
download_state = State::DOWNLOADED;
is_downloaded = true;
downloader_id.clear();
@ -426,8 +434,7 @@ void FileSegment::completeBatchAndResetDownloader()
{
std::lock_guard segment_lock(mutex);
bool is_downloader = downloader_id == getCallerId();
if (!is_downloader)
if (!isDownloaderImpl(segment_lock))
{
cv.notify_all();
throw Exception(
@ -448,7 +455,7 @@ void FileSegment::complete(State state)
std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex);
bool is_downloader = downloader_id == getCallerId();
bool is_downloader = isDownloaderImpl(segment_lock);
if (!is_downloader)
{
cv.notify_all();
@ -465,6 +472,9 @@ void FileSegment::complete(State state)
"Cannot complete file segment with state: {}", stateToString(state));
}
if (state == State::DOWNLOADED)
setDownloaded(segment_lock);
download_state = state;
assertNotDetached();
@ -475,7 +485,7 @@ void FileSegment::complete(State state)
}
catch (...)
{
if (!downloader_id.empty() && downloader_id == getCallerIdImpl())
if (!downloader_id.empty() && is_downloader)
downloader_id.clear();
cv.notify_all();
@ -492,8 +502,12 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
if (download_state == State::SKIP_CACHE || detached)
return;
if (download_state != State::DOWNLOADED && getDownloadedSize(segment_lock) == range().size())
if (isDownloaderImpl(segment_lock)
&& download_state != State::DOWNLOADED
&& getDownloadedSize(segment_lock) == range().size())
{
setDownloaded(segment_lock);
}
assertNotDetached();
@ -502,7 +516,7 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
/// downloader or the only owner of the segment.
bool can_update_segment_state = downloader_id == getCallerIdImpl()
bool can_update_segment_state = isDownloaderImpl(segment_lock)
|| cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
if (can_update_segment_state)
@ -515,7 +529,7 @@ void FileSegment::complete(std::lock_guard<std::mutex> & cache_lock)
}
catch (...)
{
if (!downloader_id.empty() && downloader_id == getCallerIdImpl())
if (!downloader_id.empty() && isDownloaderImpl(segment_lock))
downloader_id.clear();
cv.notify_all();
@ -561,7 +575,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
}
}
if (!downloader_id.empty() && (downloader_id == getCallerIdImpl() || is_last_holder))
if (!downloader_id.empty() && (isDownloaderImpl(segment_lock) || is_last_holder))
{
LOG_TEST(log, "Clearing downloader id: {}, current state: {}", downloader_id, stateToString(download_state));
downloader_id.clear();

View File

@ -154,6 +154,7 @@ private:
void setDownloaded(std::lock_guard<std::mutex> & segment_lock);
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
bool isDownloaderImpl(std::lock_guard<std::mutex> & segment_lock) const;
void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard<std::mutex> & segment_lock) const;

View File

@ -613,7 +613,10 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
{
LOG_TEST(log, "Resetting downloader {} from scope exit", file_segment->getDownloader());
file_segment->completeBatchAndResetDownloader();
}
}
catch (...)
{