Less seeks

This commit is contained in:
kssenii 2022-02-15 11:27:44 +01:00
parent 690dcea8be
commit 886b300b8d
4 changed files with 72 additions and 54 deletions

View File

@ -274,7 +274,7 @@ void FileSegment::complete()
{
std::lock_guard segment_lock(mutex);
if (download_state == State::SKIP_CACHE)
if (download_state == State::SKIP_CACHE || detached)
return;
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
@ -295,13 +295,13 @@ void FileSegment::complete()
void FileSegment::completeImpl(std::lock_guard<std::mutex> & /* segment_lock */)
{
std::lock_guard cache_lock(cache->mutex);
bool download_can_continue = false;
if (download_state == State::PARTIALLY_DOWNLOADED
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
{
std::lock_guard cache_lock(cache->mutex);
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock);
download_can_continue = !is_last_holder && download_state == State::PARTIALLY_DOWNLOADED;
@ -312,6 +312,8 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & /* segment_lock */)
download_state = State::SKIP_CACHE;
LOG_TEST(log, "Remove cell {} (downloaded: {})", range().toString(), downloaded_size);
cache->remove(key(), offset(), cache_lock);
detached = true;
}
else if (is_last_holder)
{
@ -323,6 +325,8 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & /* segment_lock */)
*/
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
cache->reduceSizeToDownloaded(key(), offset(), cache_lock);
detached = true;
}
}
}

View File

@ -141,6 +141,8 @@ private:
FileCache * cache;
Poco::Logger * log;
bool detached = false;
};
struct FileSegmentsHolder : boost::noncopyable

View File

@ -80,18 +80,16 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer([[mayb
* Implementation buffer from segment1 is passed to segment2 once segment1 is loaded.
*/
// auto remote_fs_segment_reader = file_segment->getRemoteFileReader();
auto remote_fs_segment_reader = file_segment->getRemoteFileReader();
// if (remote_fs_segment_reader)
// return remote_fs_segment_reader;
if (remote_fs_segment_reader)
return remote_fs_segment_reader;
// remote_fs_segment_reader = remote_file_reader_creator();
// file_segment->setRemoteFileReader(remote_fs_segment_reader);
remote_fs_segment_reader = remote_file_reader_creator();
file_segment->setRemoteFileReader(remote_fs_segment_reader);
// return remote_fs_segment_reader;
remote_file_reader = remote_file_reader_creator();
return remote_file_reader;
///TODO: add check for pending data
return remote_fs_segment_reader;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
@ -190,19 +188,6 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
break;
}
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
{
/// If downloader failed before downloading anything, it is determined
/// whether continuation is possible. In case of no continuation and
/// downloaded_size == 0 - cache cell is removed and state is switched to SKIP_CACHE.
// assert(file_segment->downloadOffset() > 0);
read_type = ReadType::CACHED;
implementation_buffer = getCacheReadBuffer(range.left);
break;
}
case FileSegment::State::PARTIALLY_DOWNLOADED:
{
size_t download_offset = file_segment->downloadOffset();
@ -225,7 +210,14 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
}
LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end);
if (download_offset + 1 < file_offset_of_buffer_end)
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
if (download_offset < file_offset_of_buffer_end)
{
/// segment{1}
/// cache: [_____|___________
@ -235,28 +227,34 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
/// ^
/// file_offset_of_buffer_end
/// TODO: optimize this with predownloading
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
// assert(first_segment_read_in_range);
// bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1;
break;
assert(file_offset_of_buffer_end > file_segment->downloadOffset());
bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset();
}
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
break;
}
download_state = file_segment->state();
continue;
}
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
{
size_t download_offset = file_segment->downloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
if (can_start_from_cache)
{
read_type = ReadType::CACHED;
implementation_buffer = getCacheReadBuffer(range.left);
}
else
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
}
break;
}
}
break;
@ -273,6 +271,9 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
/// TODO: For remote FS read need to set maximum possible right offset -- of
/// the last empty segment and in s3 buffer check > instead of !=.
// auto last_non_downloaded_offset = getLastNonDownloadedOffset();
// implementation_buffer->setReadUntilPosition(last_non_downloaded_offset ? *last_non_downloaded_offset : range.right + 1); /// [..., range.right]
implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right]
switch (read_type)
@ -305,17 +306,9 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
{
size_t download_offset = file_segment->downloadOffset();
implementation_buffer->seek(download_offset, SEEK_SET);
LOG_TEST(log, "Impl buffer seek to download offset: {}", download_offset);
}
else
{
LOG_TEST(log, "Impl buffer seek to : {}, buffer reading from {} to {}, file segment info: {}",
file_offset_of_buffer_end,
implementation_buffer->getRemainingReadRange().left,
*implementation_buffer->getRemainingReadRange().right,
file_segment->getInfoForLog());
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
}
@ -575,6 +568,7 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
}
else
{
download_current_segment = false;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
}
@ -600,10 +594,10 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
}
}
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
/// Local filesystem does not support bounded reads.
if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
size = std::min(size, remaining_size_to_read);
impl->buffer().resize(size);
}
@ -675,6 +669,22 @@ off_t CachedReadBufferFromRemoteFS::getPosition()
return file_offset_of_buffer_end - available();
}
std::optional<size_t> CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset() const
{
if (!file_segments_holder)
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segments holder not initialized");
const auto & file_segments = file_segments_holder->file_segments;
for (auto it = file_segments.rbegin(); it != file_segments.rend(); ++it)
{
const auto & file_segment = *it;
if (file_segment->state() != FileSegment::State::DOWNLOADED)
return file_segment->range().right;
}
return std::nullopt;
}
String CachedReadBufferFromRemoteFS::getInfoForLog()
{
return fmt::format("Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, internal buffer remaining read range: {}, file segment info: {}",

View File

@ -38,6 +38,8 @@ private:
SeekableReadBufferPtr getCacheReadBuffer(size_t offset) const;
std::optional<size_t> getLastNonDownloadedOffset() const;
enum class ReadType
{
CACHED,