This commit is contained in:
kssenii 2022-03-06 20:33:07 +01:00
parent 5a8606e10b
commit 2dcfe66236
6 changed files with 102 additions and 63 deletions

View File

@ -70,16 +70,27 @@ String FileSegment::getOrSetDownloader()
download_state);
downloader_id = getCallerId();
LOG_TEST(log, "Set downloader: {}, prev state: {}", downloader_id, stateToString(download_state));
download_state = State::DOWNLOADING;
}
else if (downloader_id == getCallerId())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to set the same downloader for segment {} for the second time", range().toString());
LOG_TEST(log, "Returning with downloader: {} and state: {}", downloader_id, stateToString(download_state));
return downloader_id;
}
void FileSegment::resetDownloader()
{
std::lock_guard segment_lock(mutex);
if (downloader_id.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no downloader");
if (getCallerId() != downloader_id)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Downloader can be reset only by downloader");
downloader_id.clear();
}
String FileSegment::getDownloader() const
{
std::lock_guard segment_lock(mutex);
@ -158,6 +169,9 @@ FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
if (downloader_id.empty())
return download_state;
if (download_state == State::EMPTY)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cannot wait on a file segment with empty state");

View File

@ -105,6 +105,8 @@ public:
String getDownloader() const;
void resetDownloader();
bool isDownloader() const;
bool isDownloaded() const { return is_downloaded.load(); }

View File

@ -4,6 +4,7 @@
#include <Common/hex.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#include <base/scope_guard.h>
namespace ProfileEvents
@ -65,7 +66,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getCacheReadBuffer(size_t of
return std::make_shared<ReadBufferFromFile>(cache->getPathInLocalCache(cache_key, offset), settings.local_fs_buffer_size);
}
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSegmentPtr file_segment, ReadType read_type_)
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_)
{
switch (read_type_)
{
@ -114,7 +115,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
}
}
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(FileSegmentPtr file_segment)
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
{
auto range = file_segment->range();
@ -184,28 +185,29 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
}
case FileSegment::State::PARTIALLY_DOWNLOADED:
{
size_t download_offset = file_segment->getDownloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
if (can_start_from_cache)
{
/// segment{k}
/// cache: [______|___________
/// ^
/// download_offset
/// requested_range: [__________]
/// ^
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
}
LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end);
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
size_t download_offset = file_segment->getDownloadOffset();
bool can_start_from_cache = download_offset > file_offset_of_buffer_end;
LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end);
if (can_start_from_cache)
{
/// segment{k}
/// cache: [______|___________
/// ^
/// download_offset
/// requested_range: [__________]
/// ^
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
file_segment->resetDownloader();
return getCacheReadBuffer(range.left);
}
if (download_offset < file_offset_of_buffer_end)
{
/// segment{1}
@ -220,6 +222,10 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
bytes_to_predownload = file_offset_of_buffer_end - file_segment->getDownloadOffset();
}
download_offset = file_segment->getDownloadOffset();
can_start_from_cache = download_offset > file_offset_of_buffer_end;
assert(!can_start_from_cache);
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
return getRemoteFSReadBuffer(file_segment, read_type);
}
@ -247,7 +253,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
}
}
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(FileSegmentPtr file_segment)
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(FileSegmentPtr & file_segment)
{
assert(!file_segment->isDownloader());
assert(file_offset_of_buffer_end >= file_segment->range().left);
@ -266,9 +272,21 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
LOG_TEST(log, "Current file segment: {}, read type: {}, current file offset: {}",
range.toString(), toString(read_type), file_offset_of_buffer_end);
// auto last_non_downloaded_offset = getLastNonDownloadedOffset();
// read_buffer_for_file_segment->setReadUntilPosition(last_non_downloaded_offset ? *last_non_downloaded_offset : range.right + 1); /// [..., range.right]
read_buffer_for_file_segment->setReadUntilPosition(range.right + 1); /// [..., range.right]
if (read_type != ReadType::CACHED)
{
// auto last_non_downloaded_offset = getLastNonDownloadedOffset();
// read_buffer_for_file_segment->setReadUntilPosition(last_non_downloaded_offset ? *last_non_downloaded_offset : range.right + 1); /// [..., range.right]
read_buffer_for_file_segment->setReadUntilPosition(range.right + 1); /// [..., range.right]
auto current_segment_file_offset = read_buffer_for_file_segment->getFileOffsetOfBufferEnd();
auto current_segment_file_read_position = current_segment_file_offset - read_buffer_for_file_segment->available();
if (read_buffer_for_file_segment->hasPendingData()
&& file_offset_of_buffer_end < current_segment_file_offset
&& file_offset_of_buffer_end >= current_segment_file_read_position)
{
}
}
switch (read_type)
{
@ -282,7 +300,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
auto state = file_segment->state();
LOG_TEST(log, "Cache file: {}. Cached seek to: {}, file size: {}, file segment state: {}, download offset: {}",
file_reader->getFileName(), seek_offset, file_size, state, file_segment->getDownloadOffset());
file_reader->getFileName(), seek_offset, file_size, state, file_segment->getDownloadOffset());
assert(file_size > 0);
break;
@ -308,12 +326,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
auto impl_range = read_buffer_for_file_segment->getRemainingReadRange();
auto download_offset = file_segment->getDownloadOffset();
if (download_offset != impl_range.left)
if (download_offset != static_cast<size_t>(read_buffer_for_file_segment->getPosition()))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Buffer's offsets mismatch; cached buffer offset: {}, implementation buffer offset: {}, "
"Buffer's offsets mismatch; cached buffer offset: {}, download_offset: {}, position: {}, implementation buffer offset: {}, "
"implementation buffer reading until: {}, file segment info: {}",
file_offset_of_buffer_end, impl_range.left, *impl_range.right, file_segment->getInfoForLog());
file_offset_of_buffer_end, download_offset, read_buffer_for_file_segment->getPosition(),
impl_range.left, *impl_range.right, file_segment->getInfoForLog());
break;
}
@ -348,7 +367,7 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
return true;
}
void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr file_segment)
void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
{
if (bytes_to_predownload)
{
@ -428,11 +447,28 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
if (current_file_segment_it == file_segments_holder->file_segments.end())
return false;
// SCOPE_EXIT({
// if (current_file_segment_it == file_segments_holder->file_segments.end())
// return;
// auto & file_segment = *current_file_segment_it;
// bool download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
// if (download_current_segment)
// {
// bool file_segment_already_completed = !file_segment->isDownloader();
// if (!file_segment_already_completed)
// file_segment->completeBatchAndResetDownloader();
// }
// assert(!file_segment->isDownloader());
// });
bytes_to_predownload = 0;
if (implementation_buffer)
{
auto file_segment = *current_file_segment_it;
auto & file_segment = *current_file_segment_it;
auto current_read_range = file_segment->range();
auto current_state = file_segment->state();
@ -524,26 +560,10 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
"Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}",
toString(read_type), file_segment->getCallerId(), file_segment->getDownloader(), file_segment->state());
try
if (!result)
{
if (!result)
{
result = implementation_buffer->next();
size = implementation_buffer->buffer().size();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (download_current_segment)
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED, true);
/// Note: If exception happens in another place -- out of scope of this buffer, then
/// downloader's FileSegmentsHolder is responsible to call file_segment->complete().
/// (download_path (if exists) is removed from inside cache)
throw;
result = implementation_buffer->next();
size = implementation_buffer->buffer().size();
}
if (result)

View File

@ -34,15 +34,15 @@ public:
private:
void initialize(size_t offset, size_t size);
SeekableReadBufferPtr getImplementationBuffer(FileSegmentPtr file_segment);
SeekableReadBufferPtr getImplementationBuffer(FileSegmentPtr & file_segment);
SeekableReadBufferPtr getReadBufferForFileSegment(FileSegmentPtr file_segment);
SeekableReadBufferPtr getReadBufferForFileSegment(FileSegmentPtr & file_segment);
SeekableReadBufferPtr getCacheReadBuffer(size_t offset) const;
std::optional<size_t> getLastNonDownloadedOffset() const;
void predownload(FileSegmentPtr file_segment);
void predownload(FileSegmentPtr & file_segment);
enum class ReadType
{
@ -51,7 +51,7 @@ private:
REMOTE_FS_READ_AND_PUT_IN_CACHE,
};
SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr file_segment, ReadType read_type_);
SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_);
size_t getTotalSizeToRead();
bool completeFileSegmentAndGetNext();

View File

@ -37,15 +37,17 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
{
current_path = path;
auto cache = settings.remote_fs_cache;
bool with_cache = cache && settings.remote_fs_enable_cache && !IFileCache::shouldBypassCache();
auto remote_file_reader_creator = [=, this]()
{
return std::make_unique<ReadBufferFromS3>(
client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries,
settings, /* use_external_buffer */true, read_until_position, true);
settings, /* use_external_buffer */true, read_until_position, /* restricted_seek */true);
};
auto cache = settings.remote_fs_cache;
if (cache && settings.remote_fs_enable_cache && !IFileCache::shouldBypassCache())
if (with_cache)
{
return std::make_shared<CachedReadBufferFromRemoteFS>(
path, cache, remote_file_reader_creator, settings, read_until_position ? read_until_position : file_size);

View File

@ -158,10 +158,11 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset;
if (impl && restricted_seek)
throw Exception(
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Seek is allowed only before first read attempt from the buffer (current offset: {}, new offset: {}, reading until position: {})",
offset, offset_, read_until_position);
impl.reset(); /// TODO: Fix this
// throw Exception(
// ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
// "Seek is allowed only before first read attempt from the buffer (current offset: {}, new offset: {}, reading until position: {}, available: {})",
// offset, offset_, read_until_position, available());
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);