This commit is contained in:
kssenii 2022-02-01 22:10:56 +03:00
parent 218a633fec
commit 5cba11428a
5 changed files with 192 additions and 132 deletions

View File

@ -57,6 +57,12 @@ String FileSegment::getOrSetDownloader()
if (downloader_id.empty())
{
if (download_state != State::EMPTY
&& download_state != State::PARTIALLY_DOWNLOADED)
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
"Can set downloader only for file segment with state EMPTY or PARTIALLY_DOWNLOADED, but got: {}",
download_state);
downloader_id = getCallerId();
LOG_TEST(log, "Set downloader: {}, prev state: {}", downloader_id, stateToString(download_state));
download_state = State::DOWNLOADING;
@ -77,6 +83,7 @@ String FileSegment::getDownloader() const
bool FileSegment::isDownloader() const
{
std::lock_guard segment_lock(mutex);
LOG_TEST(log, "Checking for current downloader. Caller: {}, downloader: {}, current state: {}", getCallerId(), downloader_id, stateToString(download_state));
return getCallerId() == downloader_id;
}
@ -121,6 +128,8 @@ void FileSegment::write(const char * from, size_t size)
}
cache_writer->write(from, size);
cache_writer->next();
downloaded_size += size;
}
@ -135,7 +144,8 @@ FileSegment::State FileSegment::wait()
{
LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id);
assert(!downloader_id.empty() && downloader_id != getCallerId());
assert(!downloader_id.empty());
assert(downloader_id != getCallerId());
#ifndef NDEBUG
{
@ -192,6 +202,8 @@ void FileSegment::completeBatchAndResetDownloader()
std::lock_guard segment_lock(mutex);
bool is_downloader = downloader_id == getCallerId();
std::cerr << "caller id: " << getCallerId() << "\n";
std::cerr << "downloader id: " << downloader_id << "\n";
if (!is_downloader)
{
cv.notify_all();
@ -214,7 +226,7 @@ void FileSegment::completeBatchAndResetDownloader()
cv.notify_all();
}
void FileSegment::complete(State state)
void FileSegment::complete(State state, bool error)
{
{
std::lock_guard segment_lock(mutex);
@ -226,6 +238,10 @@ void FileSegment::complete(State state)
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
"File segment can be completed only by downloader or downloader's FileSegmentsHodler");
}
else if (error)
{
remote_file_reader.reset();
}
if (state != State::DOWNLOADED
&& state != State::PARTIALLY_DOWNLOADED

View File

@ -113,7 +113,7 @@ public:
void completeBatchAndResetDownloader();
void complete(State state);
void complete(State state, bool error = false);
private:
size_t availableSize() const { return reserved_size - downloaded_size; }

View File

@ -64,7 +64,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
{
switch (read_type_)
{
case ReadType::REMOTE_FS_AND_PUT_IN_CACHE:
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
/**
* Implementation (s3, hdfs, web) buffer might be passed through file segments.
@ -106,6 +106,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(FileSegmentPtr file_segment)
{
assert(!file_segment->isDownloader());
auto range = file_segment->range();
[[maybe_unused]] bool first_segment_read_in_range = impl == nullptr;
bytes_to_predownload = 0;
@ -132,7 +134,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
read_type = ReadType::REMOTE_FS_AND_PUT_IN_CACHE;
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
break;
@ -177,61 +179,65 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
}
case FileSegment::State::PARTIALLY_DOWNLOADED:
{
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
size_t download_offset = file_segment->downloadOffset();
bool can_start_from_cache = download_offset && download_offset >= file_offset_of_buffer_end;
if (can_start_from_cache)
{
size_t download_offset = file_segment->downloadOffset();
bool can_start_from_cache = download_offset && download_offset >= file_offset_of_buffer_end;
/// segment{k}
/// cache: [______|___________
/// ^
/// download_offset
/// requested_range: [__________]
/// ^
/// file_offset_of_buffer_end
if (can_start_from_cache)
{
/// segment{k}
/// cache: [______|___________
/// ^
/// download_offset
/// requested_range: [__________]
/// ^
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
implementation_buffer = getCacheReadBuffer(range.left);
}
else
{
read_type = ReadType::REMOTE_FS_AND_PUT_IN_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end);
if (download_offset && download_offset + 1 < file_offset_of_buffer_end)
{
/// segment{1}
/// cache: [_____|___________
/// ^
/// download_offset
/// requested_range: [__________]
/// ^
/// file_offset_of_buffer_end
assert(first_segment_read_in_range);
bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1;
LOG_TEST(log, "Bytes to predownload {} for {}", bytes_to_predownload, downloader_id);
}
}
read_type = ReadType::CACHED;
implementation_buffer = getCacheReadBuffer(range.left);
break;
}
else
LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end);
if (download_offset && download_offset + 1 < file_offset_of_buffer_end)
{
download_state = FileSegment::State::DOWNLOADING;
continue;
/// segment{1}
/// cache: [_____|___________
/// ^
/// download_offset
/// requested_range: [__________]
/// ^
/// file_offset_of_buffer_end
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
// assert(first_segment_read_in_range);
// bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1;
break;
}
auto downloader_id = file_segment->getOrSetDownloader();
if (downloader_id == file_segment->getCallerId())
{
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type);
break;
}
download_state = FileSegment::State::DOWNLOADING;
continue;
}
}
break;
}
[[maybe_unused]] auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
assert(download_current_segment == file_segment->isDownloader());
assert(file_segment->range() == range);
assert(file_offset_of_buffer_end >= range.left && file_offset_of_buffer_end <= range.right);
@ -250,8 +256,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
implementation_buffer->seek(seek_offset, SEEK_SET);
auto * file_reader = dynamic_cast<ReadBufferFromFile *>(implementation_buffer.get());
LOG_TEST(log, "Cache file: {}. Cached seek to: {}, file size: {}",
file_reader->getFileName(), seek_offset, file_reader->size());
size_t file_size = file_reader->size();
auto state = file_segment->state();
LOG_TEST(log, "Cache file: {}. Cached seek to: {}, file size: {}, file segment state: {}, download offset: {}",
file_reader->getFileName(), seek_offset, file_size, state, file_segment->downloadOffset());
assert(file_size > 0);
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
@ -259,8 +270,10 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
break;
}
case ReadType::REMOTE_FS_AND_PUT_IN_CACHE:
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
assert(file_segment->isDownloader());
if (bytes_to_predownload)
{
size_t download_offset = file_segment->downloadOffset();
@ -271,11 +284,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
}
else
{
assert(!first_segment_read_in_range || file_offset_of_buffer_end == range.left);
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
LOG_TEST(log, "Impl buffer seek to : {}, range: {}, download_offset: {}",
file_offset_of_buffer_end, range.toString(), file_segment->downloadOffset());
LOG_TEST(log, "Impl buffer seek to current offset: {}", file_offset_of_buffer_end);
// assert(!first_segment_read_in_range || file_offset_of_buffer_end == range.left);
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
}
break;
}
}
@ -338,7 +353,9 @@ bool CachedReadBufferFromRemoteFS::checkForPartialDownload()
if (file_offset_of_buffer_end > last_downloaded_offset)
{
if (file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED)
read_type = ReadType::REMOTE_FS_AND_PUT_IN_CACHE;
{
impl = getReadBufferForFileSegment(*current_file_segment_it);
}
else
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
@ -349,29 +366,10 @@ bool CachedReadBufferFromRemoteFS::checkForPartialDownload()
LOG_TEST(
log, "Changing read buffer from cache to remote filesystem read for file segment: {}, starting from offset: {}",
file_segment->range().toString(), file_offset_of_buffer_end);
return true;
}
}
}
if (read_type == ReadType::REMOTE_FS_AND_PUT_IN_CACHE)
{
/**
* ReadType::REMOTE_FS_AND_PUT_IN_CACHE means that on previous getReadBufferForFileSegment() call
* current buffer successfully called file_segment->getOrSetDownloader() and became a downloader
* for this file segment. However, the downloader's term has a lifespan of 1 nextImpl() call,
* e.g. downloader reads buffer_size byte and calls completeBatchAndResetDownloader() and some other
* thread can become a downloader if it calls getOrSetDownloader() faster.
*
* So downloader is committed to download only buffer_size bytes and then is not a downloader anymore,
* because there is no guarantee on a higher level, that current buffer will not disappear without
* being destructed till the end of query or without finishing the read range, which he was supposed
* to read by marks range given to him. Therefore, each nextImpl() call, in case of
* READ_AND_PUT_IN_CACHE, starts with getOrSetDownloader().
*/
impl = getReadBufferForFileSegment(*current_file_segment_it);
return true;
return true;
}
}
return false;
@ -389,6 +387,11 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
if (impl)
{
{
auto & file_segment = *current_file_segment_it;
LOG_TEST(log, "Prev init. current read type: {}, range: {}, state: {}", toString(read_type), file_segment->range().toString(), file_segment->state());
}
if (!use_external_buffer)
{
impl->position() = position();
@ -400,21 +403,44 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
assert(current_read_range.left <= file_offset_of_buffer_end);
bool new_buf = false;
if (file_offset_of_buffer_end > current_read_range.right)
{
if (!completeFileSegmentAndGetNext())
new_buf = completeFileSegmentAndGetNext();
if (!new_buf)
return false;
}
if (current_state == FileSegment::State::PARTIALLY_DOWNLOADED
|| current_state == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
{
checkForPartialDownload();
new_buf = checkForPartialDownload();
}
if (!new_buf && read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE)
{
/**
* ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE means that on previous getReadBufferForFileSegment() call
* current buffer successfully called file_segment->getOrSetDownloader() and became a downloader
* for this file segment. However, the downloader's term has a lifespan of 1 nextImpl() call,
* e.g. downloader reads buffer_size byte and calls completeBatchAndResetDownloader() and some other
* thread can become a downloader if it calls getOrSetDownloader() faster.
*
* So downloader is committed to download only buffer_size bytes and then is not a downloader anymore,
* because there is no guarantee on a higher level, that current buffer will not disappear without
* being destructed till the end of query or without finishing the read range, which he was supposed
* to read by marks range given to him. Therefore, each nextImpl() call, in case of
* READ_AND_PUT_IN_CACHE, starts with getOrSetDownloader().
*/
impl = getReadBufferForFileSegment(*current_file_segment_it);
}
LOG_TEST(log, "Non-first initialization");
}
else
{
impl = getReadBufferForFileSegment(*current_file_segment_it);
LOG_TEST(log, "First initialization");
}
if (use_external_buffer)
@ -444,52 +470,66 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
/// download from offset a'' < a', but return buffer from offset a'.
LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId());
while (true)
{
if (bytes_to_predownload
&& file_segment->downloadOffset() + 1 != file_offset_of_buffer_end
&& !impl->eof())
{
result = impl->hasPendingData();
size = impl->available();
// while (true)
// {
// if (!bytes_to_predownload || impl->eof())
// {
// if (file_segment->downloadOffset() + 1 != file_offset_of_buffer_end)
// throw Exception(
// ErrorCodes::LOGICAL_ERROR,
// "Failed to predownload. Current file segment: {}, current download offset: {}, expected: {}, eof: {}",
// file_segment->range().toString(), file_segment->downloadOffset(), file_offset_of_buffer_end, impl->eof());
break;
}
// result = impl->hasPendingData();
// size = impl->available();
if (file_segment->reserve(impl->buffer().size()))
{
size_t size_to_cache = std::min(bytes_to_predownload, impl->buffer().size());
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, impl->buffer().size());
// size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
// remaining_size_to_read = std::min(impl->available(), remaining_size_to_read);
file_segment->write(impl->buffer().begin(), size_to_cache);
// if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
// {
// LOG_TEST(log, "Resize. Offset: {}, remaining size: {}, file offset: {}, range: {}",
// offset(), remaining_size_to_read, file_offset_of_buffer_end, file_segment->range().toString());
// impl->buffer().resize(offset() + remaining_size_to_read);
// }
bytes_to_predownload -= size_to_cache;
impl->position() += size_to_cache;
}
else
{
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
bytes_to_predownload = 0;
// break;
// }
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
impl = getRemoteFSReadBuffer(file_segment, read_type);
impl->seek(file_offset_of_buffer_end, SEEK_SET);
// if (file_segment->reserve(impl->buffer().size()))
// {
// size_t size_to_cache = std::min(bytes_to_predownload, impl->buffer().size());
// LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, impl->buffer().size());
LOG_TEST(
log, "Predownload failed because of space limit. Will read from remote filesystem starting from offset: {}",
file_offset_of_buffer_end);
// file_segment->write(impl->buffer().begin(), size_to_cache);
break;
}
// bytes_to_predownload -= size_to_cache;
// impl->position() += size_to_cache;
// }
// else
// {
// file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
// bytes_to_predownload = 0;
if (file_segment->downloadOffset() + 1 != file_offset_of_buffer_end
&& read_type == ReadType::REMOTE_FS_AND_PUT_IN_CACHE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Predownloading failed");
}
// read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
// impl = getRemoteFSReadBuffer(file_segment, read_type);
// impl->seek(file_offset_of_buffer_end, SEEK_SET);
// LOG_TEST(
// log, "Predownload failed because of space limit. Will read from remote filesystem starting from offset: {}",
// file_offset_of_buffer_end);
// break;
// }
// }
}
auto download_current_segment = read_type == ReadType::REMOTE_FS_AND_PUT_IN_CACHE;
assert(!download_current_segment || file_segment->isDownloader());
auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment != file_segment->isDownloader())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}",
toString(read_type), file_segment->getCallerId(), file_segment->getDownloader(), file_segment->state());
try
{
@ -497,6 +537,14 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
{
result = impl->next();
size = impl->buffer().size();
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size = std::min(size, remaining_size_to_read);
impl->buffer().resize(size);
}
}
}
catch (...)
@ -504,7 +552,7 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
tryLogCurrentException(__PRETTY_FUNCTION__);
if (download_current_segment)
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED);
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED, true);
/// Note: If exception happens in another place -- out of scope of this buffer, then
/// downloader's FileSegmentsHolder is responsible to call file_segment->complete().
@ -515,6 +563,8 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
if (result)
{
file_offset_of_buffer_end += size;
if (download_current_segment)
{
if (file_segment->reserve(size))
@ -528,31 +578,22 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
}
}
/// just implement setReadUntilPosition() for local filesysteam read buffer?
if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
impl->buffer().resize(std::min(impl->buffer().size(), remaining_size_to_read));
}
file_offset_of_buffer_end += impl->buffer().size();
switch (read_type)
{
case ReadType::CACHED:
{
ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, working_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, size);
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
{
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, working_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
break;
}
case ReadType::REMOTE_FS_AND_PUT_IN_CACHE:
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
{
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, working_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, working_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
break;
}
}

View File

@ -38,7 +38,7 @@ private:
{
CACHED,
REMOTE_FS_READ_BYPASS_CACHE,
REMOTE_FS_AND_PUT_IN_CACHE,
REMOTE_FS_READ_AND_PUT_IN_CACHE,
};
SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr file_segment, ReadType read_type_);
@ -80,8 +80,8 @@ private:
return "CACHED";
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
return "REMOTE_FS_READ_BYPASS_CACHE";
case ReadType::REMOTE_FS_AND_PUT_IN_CACHE:
return "REMOTE_FS_AND_PUT_IN_CACHE";
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
return "REMOTE_FS_READ_AND_PUT_IN_CACHE";
}
}
};

View File

@ -160,7 +160,10 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset;
if (impl && restricted_seek)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
throw Exception(
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Seek is allowed only before first read attempt from the buffer (current offset: {}, new offset: {})",
offset, offset_);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);