This commit is contained in:
kssenii 2023-04-17 15:18:10 +02:00
parent f12e11cd5c
commit 53181c938a
7 changed files with 92 additions and 102 deletions

View File

@ -598,8 +598,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
chassert(file_segment.getCurrentWriteOffset(false) == static_cast<size_t>(implementation_buffer->getPosition()));
bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, file_segment);
if (success)
continue_predownload = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, file_segment);
if (continue_predownload)
{
current_offset += current_predownload_size;
@ -609,7 +609,6 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
else
{
LOG_TEST(log, "Bypassing cache because writeCache (in predownload) method failed");
continue_predownload = false;
}
}
@ -631,7 +630,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
/// TODO: allow seek more than once with seek avoiding.
bytes_to_predownload = 0;
file_segment.setBroken();
file_segment.completePartAndResetDownloader();
chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TEST(log, "Bypassing cache because for {}", file_segment.getInfoForLog());
@ -945,13 +944,6 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size);
}
if (!success)
{
file_segment.setBroken();
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
download_current_segment = false;
}
}
/// - If last file segment was read from remote fs, then we read up to segment->range().right,

View File

@ -88,7 +88,6 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
bool reserved = file_segment->reserve(size_to_write);
if (!reserved)
{
file_segment->setBroken();
appendFilesystemCacheLog(*file_segment);
LOG_DEBUG(

View File

@ -558,7 +558,8 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size)
auto iterate_func = [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
{
chassert(segment_metadata->file_segment->getQueueIterator());
chassert(segment_metadata->file_segment->assertCorrectness());
const bool is_persistent = allow_persistent_files && segment_metadata->file_segment->isPersistent();
const bool releasable = segment_metadata->releasable() && !is_persistent;
@ -821,13 +822,13 @@ void FileCache::loadMetadata()
auto file_segment_metadata_it = addFileSegment(
*locked_key, offset, size, FileSegment::State::DOWNLOADED, CreateFileSegmentSettings(segment_kind), &lock);
chassert(file_segment_metadata_it->second->file_segment->getQueueIterator());
chassert(file_segment_metadata_it->second->size() == size);
const auto & file_segment_metadata = file_segment_metadata_it->second;
chassert(file_segment_metadata->file_segment->assertCorrectness());
total_size += size;
queue_entries.emplace_back(
file_segment_metadata_it->second->file_segment->getQueueIterator(),
file_segment_metadata_it->second->file_segment);
file_segment_metadata->getQueueIterator(),
file_segment_metadata->file_segment);
}
else
{
@ -965,22 +966,14 @@ size_t FileCache::getFileSegmentsNum() const
void FileCache::assertCacheCorrectness()
{
metadata.iterate([&](const LockedKey & locked_key)
auto lock = cache_guard.lock();
main_priority->iterate([&](LockedKey &, FileSegmentMetadataPtr segment_metadata)
{
for (const auto & [offset, file_segment_metadata] : locked_key)
{
const auto & file_segment = *file_segment_metadata->file_segment;
if (file_segment.key() != locked_key.getKey())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected {} = {}", file_segment.key(), locked_key.getKey());
}
file_segment.assertCorrectness();
}
});
const auto & file_segment = *segment_metadata->file_segment;
UNUSED(file_segment);
chassert(file_segment.assertCorrectness());
return PriorityIterationResult::CONTINUE;
}, lock);
}
FileCache::QueryContextHolder::QueryContextHolder(

View File

@ -32,7 +32,7 @@ FileSegment::FileSegment(
const CreateFileSegmentSettings & settings,
FileCache * cache_,
std::weak_ptr<KeyMetadata> key_metadata_,
CachePriorityIterator queue_iterator_)
Priority::Iterator queue_iterator_)
: file_key(key_)
, segment_range(offset_, offset_ + size_ - 1)
, segment_kind(settings.kind)
@ -101,13 +101,13 @@ size_t FileSegment::getReservedSize() const
return reserved_size;
}
FileSegment::CachePriorityIterator FileSegment::getQueueIterator() const
FileSegment::Priority::Iterator FileSegment::getQueueIterator() const
{
auto lock = segment_guard.lock();
return queue_iterator;
}
void FileSegment::setQueueIterator(CachePriorityIterator iterator)
void FileSegment::setQueueIterator(Priority::Iterator iterator)
{
auto lock = segment_guard.lock();
if (queue_iterator)
@ -355,8 +355,6 @@ void FileSegment::write(const char * from, size_t size, size_t offset)
setDownloadFailedUnlocked(lock);
cv.notify_all();
throw;
}
@ -384,7 +382,7 @@ FileSegment::State FileSegment::wait(size_t offset)
{
return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true);
});
chassert(ok);
/// chassert(ok);
}
return download_state;
@ -457,18 +455,20 @@ bool FileSegment::reserve(size_t size_to_reserve)
size_t already_reserved_size = reserved_size - expected_downloaded_size;
bool reserved = already_reserved_size >= size_to_reserve;
if (reserved)
return reserved;
size_to_reserve = size_to_reserve - already_reserved_size;
/// This (resizable file segments) is allowed only for single threaded use of file segment.
/// Currently it is used only for temporary files through cache.
if (is_unbound && is_file_segment_size_exceeded)
segment_range.right = range().left + expected_downloaded_size + size_to_reserve;
reserved = cache->tryReserve(*this, size_to_reserve);
if (!reserved)
{
size_to_reserve = size_to_reserve - already_reserved_size;
/// This (resizable file segments) is allowed only for single threaded use of file segment.
/// Currently it is used only for temporary files through cache.
if (is_unbound && is_file_segment_size_exceeded)
segment_range.right = range().left + expected_downloaded_size + size_to_reserve;
reserved = cache->tryReserve(*this, size_to_reserve);
chassert(assertCorrectness());
}
setDownloadFailedUnlocked(segment_guard.lock());
return reserved;
}
@ -516,27 +516,15 @@ void FileSegment::completePartAndResetDownloader()
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("completePartAndResetDownloader", lock);
resetDownloadingStateUnlocked(lock);
resetDownloaderUnlocked(lock);
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lock));
}
void FileSegment::setBroken()
{
auto lock = segment_guard.lock();
SCOPE_EXIT({ cv.notify_all(); });
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("setBroken", lock);
chassert(download_state == State::DOWNLOADING
|| download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
if (download_state == State::DOWNLOADING)
resetDownloadingStateUnlocked(lock);
if (download_state != State::DOWNLOADED)
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
resetDownloaderUnlocked(lock);
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lock));
}
void FileSegment::complete()
@ -700,24 +688,48 @@ String FileSegment::stateToString(FileSegment::State state)
bool FileSegment::assertCorrectness() const
{
auto lock = segment_guard.lock();
return assertCorrectnessUnlocked(segment_guard.lock());
}
auto current_downloader = getDownloaderUnlocked(lock);
chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING));
chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING));
chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0);
chassert(reserved_size == 0 || queue_iterator);
if (queue_iterator)
bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const
{
auto check_iterator = [this](const Priority::Iterator & it)
{
const auto & entry = queue_iterator->getEntry();
if (isCompleted(false))
chassert(reserved_size == entry.size);
else
/// We cannot check == here because reserved_size is not
/// guarded by any mutex, it is just an atomic.
chassert(reserved_size <= entry.size);
if (!it)
return;
const auto entry = it->getEntry();
UNUSED(entry);
chassert(entry.size == reserved_size);
chassert(entry.key == key());
chassert(entry.offset == offset());
};
if (download_state == State::DOWNLOADED)
{
chassert(downloader_id.empty());
chassert(downloaded_size == reserved_size);
chassert(std::filesystem::file_size(getPathInLocalCache()) > 0);
chassert(queue_iterator);
check_iterator(queue_iterator);
}
else
{
if (download_state == State::DOWNLOADED)
{
chassert(!downloader_id.empty());
}
else if (download_state == State::PARTIALLY_DOWNLOADED
|| download_state == State::EMPTY)
{
chassert(downloader_id.empty());
}
chassert(reserved_size >= downloaded_size);
chassert((reserved_size == 0) || queue_iterator);
check_iterator(queue_iterator);
}
return true;
}

View File

@ -72,7 +72,7 @@ public:
using LocalCacheWriterPtr = std::unique_ptr<WriteBufferFromFile>;
using Downloader = std::string;
using DownloaderId = std::string;
using CachePriorityIterator = IFileCachePriority::Iterator;
using Priority = IFileCachePriority;
enum class State
{
@ -116,7 +116,7 @@ public:
const CreateFileSegmentSettings & create_settings = {},
FileCache * cache_ = nullptr,
std::weak_ptr<KeyMetadata> key_metadata_ = std::weak_ptr<KeyMetadata>(),
CachePriorityIterator queue_iterator_ = CachePriorityIterator{});
Priority::Iterator queue_iterator_ = Priority::Iterator{});
~FileSegment() = default;
@ -222,9 +222,9 @@ public:
FileSegmentGuard::Lock lock() const { return segment_guard.lock(); }
CachePriorityIterator getQueueIterator() const;
Priority::Iterator getQueueIterator() const;
void setQueueIterator(CachePriorityIterator iterator);
void setQueueIterator(Priority::Iterator iterator);
KeyMetadataPtr tryGetKeyMetadata() const;
@ -236,8 +236,6 @@ public:
* ========== Methods that must do cv.notify() ==================
*/
void setBroken();
void complete();
void completePartAndResetDownloader();
@ -285,6 +283,7 @@ private:
void assertNotDetached() const;
void assertNotDetachedUnlocked(const FileSegmentGuard::Lock &) const;
void assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock &) const;
bool assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const;
LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const;
@ -308,7 +307,7 @@ private:
mutable FileSegmentGuard segment_guard;
std::weak_ptr<KeyMetadata> key_metadata;
mutable CachePriorityIterator queue_iterator; /// Iterator is put here on first reservation attempt, if successful.
mutable Priority::Iterator queue_iterator; /// Iterator is put here on first reservation attempt, if successful.
FileCache * cache;
std::condition_variable cv;

View File

@ -353,13 +353,10 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
auto metadata = getByOffset(offset);
const auto & file_segment = metadata->file_segment;
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
const size_t downloaded_size = file_segment->getDownloadedSize(false);
const size_t full_size = file_segment->range().size();
chassert(downloaded_size <= file_segment->reserved_size);
if (downloaded_size == full_size)
if (downloaded_size == file_segment->range().size())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -367,20 +364,17 @@ void LockedKey::shrinkFileSegmentToDownloadedSize(
file_segment->getInfoForLogUnlocked(segment_lock));
}
CreateFileSegmentSettings create_settings(file_segment->getKind());
auto queue_iterator = file_segment->queue_iterator;
ssize_t diff = file_segment->reserved_size - downloaded_size;
metadata->file_segment = std::make_shared<FileSegment>(
getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED, create_settings,
file_segment->cache, key_metadata, queue_iterator);
getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED,
CreateFileSegmentSettings(file_segment->getKind()),
file_segment->cache, key_metadata, file_segment->queue_iterator);
chassert(queue_iterator->getEntry().size == file_segment->reserved_size);
ssize_t diff = file_segment->reserved_size - file_segment->downloaded_size;
if (diff)
queue_iterator->updateSize(-diff);
metadata->getQueueIterator()->updateSize(-diff);
chassert(file_segment->reserved_size == downloaded_size);
chassert(metadata->size() == queue_iterator->getEntry().size);
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));
}
std::shared_ptr<const FileSegmentMetadata> LockedKey::getByOffset(size_t offset) const

View File

@ -15,7 +15,6 @@ using CleanupQueuePtr = std::shared_ptr<CleanupQueue>;
struct FileSegmentMetadata : private boost::noncopyable
{
using Priority = IFileCachePriority;
using PriorityIterator = IFileCachePriority::Iterator;
explicit FileSegmentMetadata(FileSegmentPtr && file_segment_);
@ -25,6 +24,8 @@ struct FileSegmentMetadata : private boost::noncopyable
bool valid() const { return !removal_candidate.load(); }
Priority::Iterator getQueueIterator() { return file_segment->getQueueIterator(); }
FileSegmentPtr file_segment;
std::atomic<bool> removal_candidate{false};
};