Finally fixed

This commit is contained in:
kssenii 2022-03-14 17:33:29 +01:00
parent 59c0fb0244
commit 1b6e7fea2f
6 changed files with 83 additions and 37 deletions

View File

@ -325,7 +325,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
if (files[key].contains(offset))
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache already exists for key: `{}`, offset: {}, size: {}, current cache structure: {}",
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
keyToStr(key), offset, size, dumpStructureImpl(key, cache_lock));
auto file_segment = std::make_shared<FileSegment>(offset, size, key, this, state);

View File

@ -46,7 +46,16 @@ FileSegment::State FileSegment::state() const
size_t FileSegment::getDownloadOffset() const
{
std::lock_guard segment_lock(mutex);
return range().left + downloaded_size;
return range().left + getDownloadedSize(segment_lock);
}
size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const
{
if (download_state == State::DOWNLOADED)
return downloaded_size;
std::lock_guard download_lock(download_mutex);
return downloaded_size;
}
String FileSegment::getCallerId()
@ -165,17 +174,17 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
{
std::lock_guard segment_lock(mutex);
auto download_offset = range().left + downloaded_size;
if (offset_ != download_offset)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to write {} bytes to offset: {}, but current download offset is {} ({})",
size, offset_, download_offset, getInfoForLogImpl(segment_lock));
}
auto download_offset = range().left + downloaded_size;
if (offset_ != download_offset)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to write {} bytes to offset: {}, but current download offset is {} ({})",
size, offset_, download_offset);
if (!cache_writer)
{
if (downloaded_size > 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer should be finalized (downloaded size: {})", downloaded_size);
auto download_path = cache->getPathInLocalCache(key(), offset());
cache_writer = std::make_unique<WriteBufferFromFile>(download_path);
}
@ -183,12 +192,15 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
try
{
cache_writer->write(from, size);
std::lock_guard download_lock(download_mutex);
cache_writer->next();
downloaded_size += size;
}
catch (...)
{
std::lock_guard segment_lock(mutex);
LOG_ERROR(log, "Failed to write to cache. File segment info: {}", getInfoForLog());
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
@ -201,8 +213,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
throw;
}
std::lock_guard segment_lock(mutex);
downloaded_size += size;
assert(getDownloadOffset() == offset_ + size);
}
FileSegment::State FileSegment::wait()
@ -347,7 +358,7 @@ void FileSegment::complete()
if (download_state == State::SKIP_CACHE || detached)
return;
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
if (download_state != State::DOWNLOADED && getDownloadedSize(segment_lock) == range().size())
setDownloaded(segment_lock);
}
@ -384,10 +395,11 @@ void FileSegment::completeImpl(bool allow_non_strict_checking)
if (!download_can_continue)
{
if (!downloaded_size)
size_t current_downloaded_size = getDownloadedSize(segment_lock);
if (current_downloaded_size == 0)
{
download_state = State::SKIP_CACHE;
LOG_TEST(log, "Remove cell {} (downloaded: {})", range().toString(), downloaded_size);
LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString());
cache->remove(key(), offset(), cache_lock, segment_lock);
detached = true;
@ -400,7 +412,7 @@ void FileSegment::completeImpl(bool allow_non_strict_checking)
* in FileSegmentsHolder represent a contiguous range, so we can resize
* it only when nobody needs it.
*/
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size);
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
detached = true;
@ -430,12 +442,12 @@ String FileSegment::getInfoForLog() const
return getInfoForLogImpl(segment_lock);
}
String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & /* segment_lock */) const
String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const
{
WriteBufferFromOwnString info;
info << "File segment: " << range().toString() << ", ";
info << "state: " << download_state << ", ";
info << "downloaded size: " << downloaded_size << ", ";
info << "downloaded size: " << getDownloadedSize(segment_lock) << ", ";
info << "downloader id: " << downloader_id << ", ";
info << "caller id: " << getCallerId();

View File

@ -130,6 +130,7 @@ private:
static String getCallerIdImpl(bool allow_non_strict_checking = false);
void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock);
String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const;
size_t getDownloadedSize(std::lock_guard<std::mutex> & segment_lock) const;
const Range segment_range;
@ -145,6 +146,14 @@ private:
mutable std::mutex mutex;
std::condition_variable cv;
/// Protects downloaded_size access with actual write into fs.
/// downloaded_size is not protected by download_mutex in methods which
/// can never be run in parallel to FileSegment::write() method
/// as downloaded_size is updated only in FileSegment::write() method.
/// Such methods are identified by isDownloader() check at their start,
/// e.g. they are executed strictly by the same thread, sequentially.
mutable std::mutex download_mutex;
Key file_key;
IFileCache * cache;

View File

@ -242,6 +242,9 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(
assert(file_offset_of_buffer_end > file_segment->getDownloadOffset());
bytes_to_predownload = file_offset_of_buffer_end - file_segment->getDownloadOffset();
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
return getRemoteFSReadBuffer(file_segment, read_type);
}
download_offset = file_segment->getDownloadOffset();
@ -300,6 +303,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
{
case ReadType::CACHED:
{
#ifdef NDEBUG
auto * file_reader = assert_cast<ReadBufferFromFile *>(read_buffer_for_file_segment.get());
size_t file_size = file_reader->size();
@ -309,14 +313,11 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
"Unexpected state of cache file. Cache file size: {}, cache file offset: {}, "
"expected file size to be non-zero and file downloaded size to exceed current file read offset (expected: {} > {})",
file_size, range.left, range.left + file_size, file_offset_of_buffer_end);
#endif
size_t seek_offset = file_offset_of_buffer_end - range.left;
read_buffer_for_file_segment->seek(seek_offset, SEEK_SET);
auto state = file_segment->state();
LOG_TEST(log, "Cache file: {}. Cached seek to: {}, file size: {}, file segment state: {}, download offset: {}",
file_reader->getFileName(), seek_offset, file_size, state, file_segment->getDownloadOffset());
break;
}
case ReadType::REMOTE_FS_READ_BYPASS_CACHE:
@ -424,12 +425,23 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
}
size_t current_predownload_size = std::min(implementation_buffer->buffer().size(), bytes_to_predownload);
if (std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size_t remaining_size_to_read = std::min(file_segment->range().right, read_until_position - 1) - file_offset_of_buffer_end + 1;
current_predownload_size = std::min(current_predownload_size, remaining_size_to_read);
}
if (file_segment->reserve(current_predownload_size))
{
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, implementation_buffer->buffer().size());
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset);
assert(file_segment->getDownloadOffset() <= std::min(file_segment->range().right + 1, read_until_position));
assert(std::next(current_file_segment_it) == file_segments_holder->file_segments.end() || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
current_offset += current_predownload_size;
bytes_to_predownload -= current_predownload_size;
@ -551,6 +563,8 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
bool CachedReadBufferFromRemoteFS::nextImplStep()
{
last_caller_id = FileSegment::getCallerId();
if (IFileCache::shouldBypassCache())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Using cache when not allowed");
@ -626,15 +640,14 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
if (download_current_segment != file_segment->isDownloader())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}",
toString(read_type), file_segment->getCallerId(), file_segment->getDownloader(), file_segment->state());
}
if (!result)
{
#ifdef NDEBUG
if (auto * cache_file_reader = typeid_cast<ReadBufferFromFile *>(implementation_buffer.get()))
{
auto cache_file_size = cache_file_reader->size();
@ -644,6 +657,7 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
"Attempt to read from an empty cache file: {} (just before actual read)",
cache_file_size);
}
#endif
result = implementation_buffer->next();
size = implementation_buffer->buffer().size();
@ -651,13 +665,26 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
if (result)
{
if (std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
size = std::min(size, remaining_size_to_read);
assert(implementation_buffer->buffer().size() <= nextimpl_working_buffer_offset + size);
implementation_buffer->buffer().resize(nextimpl_working_buffer_offset + size);
}
if (download_current_segment)
{
assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right);
if (file_segment->reserve(size))
{
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
file_segment->write(needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(), size, file_offset_of_buffer_end);
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(std::next(current_file_segment_it) == file_segments_holder->file_segments.end() || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
}
else
{
@ -687,13 +714,6 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
}
}
if (std::next(current_file_segment_it) == file_segments_holder->file_segments.end())
{
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
size = std::min(size, remaining_size_to_read);
implementation_buffer->buffer().resize(nextimpl_working_buffer_offset + size);
}
file_offset_of_buffer_end += size;
}
@ -796,12 +816,13 @@ String CachedReadBufferFromRemoteFS::getInfoForLog()
auto current_file_segment_info = current_file_segment_it == file_segments_holder->file_segments.end() ? "None" : (*current_file_segment_it)->getInfoForLog();
return fmt::format("Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, internal buffer remaining read range: {}, "
"read_type: {}, file segment info: {}",
"read_type: {}, last caller: {}, file segment info: {}",
remote_fs_object_path,
getHexUIntLowercase(cache_key),
file_offset_of_buffer_end,
implementation_buffer_read_range_str,
toString(read_type),
last_caller_id,
current_file_segment_info);
}

View File

@ -100,6 +100,7 @@ private:
size_t first_offset = 0;
String nextimpl_step_log_info;
String last_caller_id;
};
}

View File

@ -1,5 +1,6 @@
#include "ThreadPoolRemoteFSReader.h"
#include <base/scope_guard_safe.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
@ -62,6 +63,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
if (running_group)
thread_status.attachQuery(running_group);
SCOPE_EXIT_SAFE({
if (running_group)
CurrentThread::detachQuery();
});
setThreadName("VFSRead");
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
@ -74,9 +80,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read);
if (running_group)
thread_status.detachQuery();
return Result{ .size = bytes_read, .offset = offset };
});