mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Self-review fixes
This commit is contained in:
parent
9c2bbcf4a6
commit
413fbb6507
@ -206,7 +206,7 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegment & file_segment
|
||||
return remote_file_reader;
|
||||
|
||||
auto remote_fs_segment_reader = file_segment.extractRemoteFileReader();
|
||||
if (remote_fs_segment_reader && file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd())
|
||||
if (remote_fs_segment_reader && file_offset_of_buffer_end == remote_file_reader->getFileOffsetOfBufferEnd())
|
||||
remote_file_reader = remote_fs_segment_reader;
|
||||
else
|
||||
remote_file_reader = implementation_buffer_creator();
|
||||
@ -294,7 +294,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
|
||||
case FileSegment::State::EMPTY:
|
||||
case FileSegment::State::PARTIALLY_DOWNLOADED:
|
||||
{
|
||||
if (file_segment->getFirstNonDownloadedOffset() > file_offset_of_buffer_end)
|
||||
if (canStartFromCache(file_offset_of_buffer_end, *file_segment))
|
||||
{
|
||||
/// segment{k} state: PARTIALLY_DOWNLOADED
|
||||
/// cache: [______|___________
|
||||
|
@ -125,13 +125,6 @@ size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & /*
|
||||
return downloaded_size;
|
||||
}
|
||||
|
||||
size_t FileSegment::getAvailableSizeUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
||||
{
|
||||
auto current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
|
||||
chassert(reserved_size >= current_downloaded_size);
|
||||
return reserved_size - current_downloaded_size;
|
||||
}
|
||||
|
||||
size_t FileSegment::getRemainingSizeToDownload() const
|
||||
{
|
||||
std::unique_lock segment_lock(mutex);
|
||||
@ -177,9 +170,7 @@ String FileSegment::getOrSetDownloader()
|
||||
{
|
||||
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED;
|
||||
if (!allow_new_downloader)
|
||||
return "None";
|
||||
|
||||
chassert(download_state != State::DOWNLOADING);
|
||||
return "notAllowed:" + stateToString(download_state);
|
||||
|
||||
current_downloader = downloader_id = getCallerId();
|
||||
setDownloadState(State::DOWNLOADING);
|
||||
@ -313,27 +304,29 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
|
||||
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
|
||||
size, offset, first_non_downloaded_offset);
|
||||
|
||||
size_t available_size = getAvailableSizeUnlocked(segment_lock);
|
||||
if (available_size < size)
|
||||
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
|
||||
chassert(reserved_size >= current_downloaded_size);
|
||||
size_t free_reserved_size = reserved_size - current_downloaded_size;
|
||||
|
||||
if (free_reserved_size < size)
|
||||
throw Exception(
|
||||
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
|
||||
"Not enough space is reserved. Available: {}, expected: {}", available_size, size);
|
||||
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
|
||||
|
||||
if (getDownloadedSizeUnlocked(segment_lock) == range().size())
|
||||
if (current_downloaded_size == range().size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
|
||||
}
|
||||
|
||||
if (!cache_writer)
|
||||
{
|
||||
auto current_downloaded_size = getDownloadedSize();
|
||||
if (current_downloaded_size > 0)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache writer was finalized (downloaded size: {}, state: {})",
|
||||
current_downloaded_size, stateToString(download_state));
|
||||
if (!cache_writer)
|
||||
{
|
||||
if (current_downloaded_size > 0)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cache writer was finalized (downloaded size: {}, state: {})",
|
||||
current_downloaded_size, stateToString(download_state));
|
||||
|
||||
auto download_path = getPathInLocalCache();
|
||||
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
|
||||
auto download_path = getPathInLocalCache();
|
||||
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
@ -360,8 +353,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
std::unique_lock segment_lock(mutex);
|
||||
chassert(getFirstNonDownloadedOffsetUnlocked(segment_lock) == offset + size);
|
||||
chassert(getFirstNonDownloadedOffset() == offset + size);
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -679,11 +671,10 @@ void FileSegment::assertCorrectness() const
|
||||
|
||||
void FileSegment::assertCorrectnessUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
||||
{
|
||||
// auto current_downloader = getDownloaderUnlocked(false, segment_lock);
|
||||
LOG_TEST(log, "Checking correctness: {}", getInfoForLogUnlocked(segment_lock));
|
||||
// chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING));
|
||||
// chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING));
|
||||
// chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0);
|
||||
auto current_downloader = getDownloaderUnlocked(segment_lock);
|
||||
chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING));
|
||||
chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING));
|
||||
chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0);
|
||||
}
|
||||
|
||||
void FileSegment::throwIfDetachedUnlocked(std::unique_lock<std::mutex> & segment_lock) const
|
||||
@ -760,9 +751,12 @@ void FileSegment::detach(std::lock_guard<std::mutex> & /* cache_lock */, std::un
|
||||
if (is_detached)
|
||||
return;
|
||||
|
||||
resetDownloaderUnlocked(segment_lock);
|
||||
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
if (download_state == State::DOWNLOADING)
|
||||
resetDownloadingStateUnlocked(segment_lock);
|
||||
else
|
||||
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
|
||||
resetDownloaderUnlocked(segment_lock);
|
||||
detachAssumeStateFinalized(segment_lock);
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,6 @@ private:
|
||||
size_t getFirstNonDownloadedOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
|
||||
size_t getCurrentWriteOffsetUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
|
||||
size_t getDownloadedSizeUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
|
||||
size_t getAvailableSizeUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
|
||||
|
||||
String getInfoForLogUnlocked(std::unique_lock<std::mutex> & segment_lock) const;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user