Hopefully fix assertion failure in CachedOnDiskReadBufferFromFile

This commit is contained in:
Michael Kolupaev 2023-04-12 01:36:25 +00:00
parent 87be78e6de
commit 473f212c82
7 changed files with 91 additions and 29 deletions

View File

@ -151,6 +151,11 @@ CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segm
/// Do not allow to use asynchronous version of LocalFSReadMethod.
local_read_settings.local_fs_method = LocalFSReadMethod::pread;
// The buffer will unnecessarily allocate a Memory of size local_fs_buffer_size, which will then
// most likely be unused because we're swap()ping our own internal_buffer into
// implementation_buffer before each read. But we can't just set local_fs_buffer_size = 0 here
// because some buffer implementations actually use that memory (e.g. for prefetching).
auto buf = createReadBufferFromFileBase(path, local_read_settings);
if (getFileSizeFromReadBuffer(*buf) == 0)
@ -176,7 +181,7 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegment & file_segment
* [___________] -- read_range_1 for query1
* [_______________] -- read_range_2 for query2
* ^___________^______^
* | segment1 | segment2
* | segment1 | segment2
*
* So query2 can reuse implementation buffer, which downloaded segment1.
* Implementation buffer from segment1 is passed to segment2 once segment1 is loaded.
@ -195,6 +200,10 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegment & file_segment
file_segment.setRemoteFileReader(remote_fs_segment_reader);
}
else
{
chassert(remote_fs_segment_reader->getFileOffsetOfBufferEnd() == file_segment.getCurrentWriteOffset());
}
return remote_fs_segment_reader;
}
@ -447,8 +456,7 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_se
{
read_buffer_for_file_segment->seek(file_offset_of_buffer_end, SEEK_SET);
assert(static_cast<size_t>(read_buffer_for_file_segment->getPosition()) == file_offset_of_buffer_end);
assert(static_cast<size_t>(read_buffer_for_file_segment->getFileOffsetOfBufferEnd()) == file_offset_of_buffer_end);
assert(read_buffer_for_file_segment->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end);
}
auto current_write_offset = file_segment->getCurrentWriteOffset();
@ -456,11 +464,12 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_se
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Buffer's offsets mismatch. Cached buffer offset: {}, current_write_offset: {}, implementation buffer offset: {}, file "
"segment info: {}",
"Buffer's offsets mismatch. Cached buffer offset: {}, current_write_offset: {}, implementation buffer position: {}, "
"implementation buffer end position: {}, file segment info: {}",
file_offset_of_buffer_end,
current_write_offset,
read_buffer_for_file_segment->getPosition(),
read_buffer_for_file_segment->getFileOffsetOfBufferEnd(),
file_segment->getInfoForLog());
}
@ -468,6 +477,8 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_se
}
}
chassert(!read_buffer_for_file_segment->hasPendingData());
return read_buffer_for_file_segment;
}
@ -537,7 +548,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
/// download from offset a'' < a', but return buffer from offset a'.
LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId());
chassert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment->getCurrentWriteOffset());
chassert(static_cast<size_t>(implementation_buffer->getPosition()) == file_segment->getCurrentWriteOffset());
size_t current_offset = file_segment->getCurrentWriteOffset();
const auto & current_range = file_segment->range();
@ -794,6 +805,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
if (current_file_segment_it == file_segments_holder->file_segments.end())
return false;
bool implementation_buffer_can_be_reused = false;
SCOPE_EXIT({
try
{
@ -810,7 +822,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
bool need_complete_file_segment = file_segment->isDownloader();
if (need_complete_file_segment)
{
if (!implementation_buffer_can_be_reused)
file_segment->resetRemoteFileReader();
file_segment->completePartAndResetDownloader();
}
}
chassert(!file_segment->isDownloader());
@ -839,6 +856,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
chassert(!internal_buffer.empty());
// Pass a valid external buffer for implementation_buffer to read into.
// We then take it back with another swap() after reading is done.
// (If we get an exception in between, we'll be left with an invalid internal_buffer. That's ok, as long as
// the caller doesn't try to use this CachedOnDiskReadBufferFromFile after it threw an exception.)
swap(*implementation_buffer);
auto & file_segment = *current_file_segment_it;
@ -846,9 +867,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
LOG_TEST(
log,
"Current count: {}, position: {}, file segment: {}",
"Current count: {}, position: {}, buffer end: {}, file segment: {}",
implementation_buffer->count(),
implementation_buffer->getPosition(),
implementation_buffer->getFileOffsetOfBufferEnd(),
file_segment->getInfoForLog());
chassert(current_read_range.left <= file_offset_of_buffer_end);
@ -905,13 +927,16 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentReadMicroseconds, elapsed);
// We don't support implementation_buffer implementations that use nextimpl_working_buffer_offset.
chassert(implementation_buffer->position() == implementation_buffer->buffer().begin());
size = implementation_buffer->buffer().size();
LOG_TEST(
log,
"Read {} bytes, read type {}, position: {}, offset: {}",
"Read {} bytes, read type {}, position: {}, offset: {}, segment end: {}",
size, toString(read_type), implementation_buffer->getPosition(),
implementation_buffer->getFileOffsetOfBufferEnd());
implementation_buffer->getFileOffsetOfBufferEnd(), file_segment->range().right);
if (read_type == ReadType::CACHED)
{
@ -945,6 +970,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|| file_segment->getCurrentWriteOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
LOG_TEST(log, "Successfully written {} bytes", size);
// The implementation_buffer is valid and positioned correctly (at file_segment->getCurrentWriteOffset()).
// Later reads for this file segment can reuse it.
// (It's reusable even if we don't reach the swap(*implementation_buffer) below,
// because the reuser must assign implementation_buffer's buffer anyway.)
implementation_buffer_can_be_reused = true;
}
else
{
@ -981,12 +1012,15 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
}
file_offset_of_buffer_end += size;
chassert(file_offset_of_buffer_end <= read_until_position);
}
swap(*implementation_buffer);
current_file_segment_counters.increment(ProfileEvents::FileSegmentUsedBytes, available());
// No necessary because of the SCOPE_EXIT above, but useful for logging below.
if (download_current_segment)
file_segment->completePartAndResetDownloader();
@ -1119,6 +1153,8 @@ off_t CachedOnDiskReadBufferFromFile::seek(off_t offset, int whence)
implementation_buffer.reset();
initialized = false;
LOG_TEST(log, "Reset state for seek to position {}", new_pos);
return new_pos;
}
@ -1153,6 +1189,8 @@ void CachedOnDiskReadBufferFromFile::setReadUntilPosition(size_t position)
initialized = false;
read_until_position = position;
LOG_TEST(log, "Set read_until_position to {}", read_until_position);
}
void CachedOnDiskReadBufferFromFile::setReadUntilEnd()
@ -1197,11 +1235,13 @@ String CachedOnDiskReadBufferFromFile::getInfoForLog()
current_file_segment_info = "None";
return fmt::format(
"Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, "
"read_type: {}, last caller: {}, file segment info: {}",
"Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, read_until_position: {}, "
"internal buffer end: {}, read_type: {}, last caller: {}, file segment info: {}",
source_file_path,
getHexUIntLowercase(cache_key),
file_offset_of_buffer_end,
read_until_position,
implementation_buffer ? std::to_string(implementation_buffer->getFileOffsetOfBufferEnd()) : "None",
toString(read_type),
last_caller_id,
current_file_segment_info);

View File

@ -54,8 +54,12 @@ public:
// FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing.
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); }
/** read next data and fill a buffer with it; set position to the beginning;
* return `false` in case of end, `true` otherwise; throw an exception, if something is wrong
/** read next data and fill a buffer with it; set position to the beginning of the new data
* (but not necessarily to the beginning of working_buffer!);
* return `false` in case of end, `true` otherwise; throw an exception, if something is wrong;
*
* if an exception was thrown, is the ReadBuffer left in a usable state? this varies across implementations;
* can the caller retry next() after an exception, or call other methods? not recommended
*/
bool next()
{

View File

@ -125,8 +125,8 @@ struct ReadSettings
ReadSettings adjustBufferSize(size_t file_size) const
{
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size);
res.local_fs_buffer_size = std::min(std::max(1ul, file_size), local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(std::max(1ul, file_size), remote_fs_buffer_size);
return res;
}
};

View File

@ -55,7 +55,7 @@ public:
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* As long as pointers to returned file segments are hold
* As long as pointers to returned file segments are held
* it is guaranteed that these file segments are not removed from cache.
*/
FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);

View File

@ -228,7 +228,7 @@ void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, std:
{
auto caller = getCallerId();
auto current_downloader = getDownloaderUnlocked(segment_lock);
LOG_TEST(log, "Downloader id: {}, caller id: {}", current_downloader, caller);
LOG_TEST(log, "Downloader id: {}, caller id: {}, operation: {}", current_downloader, caller, operation);
if (caller != current_downloader)
{
@ -289,9 +289,6 @@ void FileSegment::resetRemoteFileReader()
std::unique_lock segment_lock(mutex);
assertIsDownloaderUnlocked("resetRemoteFileReader", segment_lock);
if (!remote_file_reader)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist");
remote_file_reader.reset();
}
@ -358,7 +355,21 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer was detached");
auto download_path = getPathInLocalCache();
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
try
{
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
}
catch (Exception & e)
{
wrapWithCacheInfo(e, "while opening file in local cache", segment_lock);
setDownloadFailedUnlocked(segment_lock);
cv.notify_all();
throw;
}
}
}
@ -498,7 +509,7 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock<std::m
void FileSegment::setDownloadFailedUnlocked(std::unique_lock<std::mutex> & segment_lock)
{
LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(segment_lock));
LOG_INFO(log, "Setting download as failed: {}", getInfoForLogUnlocked(segment_lock));
setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
resetDownloaderUnlocked(segment_lock);
@ -507,8 +518,9 @@ void FileSegment::setDownloadFailedUnlocked(std::unique_lock<std::mutex> & segme
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
remote_file_reader.reset();
}
void FileSegment::completePartAndResetDownloader()
@ -591,10 +603,13 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
resetDownloaderUnlocked(segment_lock);
}
if (cache_writer && (is_downloader || is_last_holder))
if (is_downloader || is_last_holder)
{
cache_writer->finalize();
cache_writer.reset();
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
}
remote_file_reader.reset();
}

View File

@ -221,7 +221,7 @@ public:
* ========== Methods for _only_ file segment's `downloader` ==================
*/
/// Try to reserve exactly `size` bytes.
/// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded).
/// Returns true if reservation was successful, false otherwise.
bool reserve(size_t size_to_reserve);
@ -243,6 +243,11 @@ public:
void resetDownloader();
// Invariant: if state() != DOWNLOADING and remote file reader is present, the reader's
// available() == 0, and getFileOffsetOfBufferEnd() == our getCurrentWriteOffset().
//
// The reader typically requires its internal_buffer to be assigned from the outside before
// calling next().
RemoteFileReaderPtr getRemoteFileReader();
RemoteFileReaderPtr extractRemoteFileReader();

View File

@ -639,8 +639,6 @@ StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const S
return {.buf = createAsyncS3ReadBuffer(key, read_settings, object_size)};
}
chassert(object_size > 0);
auto factory = std::make_unique<ReadBufferS3Factory>(
client, bucket, key, version_id, object_size, request_settings, read_settings);
return {.buf_factory = std::move(factory)};