Reimplement write-through cache

This commit is contained in:
kssenii 2022-05-02 17:56:05 +02:00
parent 8cf83a2daf
commit 70e2bb6264
6 changed files with 218 additions and 183 deletions

View File

@ -412,21 +412,6 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell(
return &(it->second);
}
FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size)
{
std::lock_guard cache_lock(mutex);
auto * cell = getCell(key, offset, cache_lock);
if (cell)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache cell already exists for key `{}` and offset {}",
keyToStr(key), offset);
auto file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock);
return FileSegmentsHolder(std::move(file_segments));
}
bool LRUFileCache::tryReserve(
const Key & key_, size_t offset_, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
@ -799,6 +784,23 @@ std::vector<String> LRUFileCache::tryGetCachePaths(const Key & key)
return cache_paths;
}
FileSegmentPtr LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock)
{
auto * cell = getCell(key, offset, cache_lock);
if (cell)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cache cell already exists for key `{}` and offset {}",
keyToStr(key), offset);
cell = addCell(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock);
if (!cell)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to add a new cell for download");
return cell->file_segment;
}
LRUFileCache::FileSegmentCell::FileSegmentCell(FileSegmentPtr file_segment_, LRUQueue & queue_)
: file_segment(file_segment_)
{

View File

@ -26,6 +26,7 @@ class IFileCache : private boost::noncopyable
{
friend class FileSegment;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
@ -83,8 +84,6 @@ public:
*/
virtual FileSegmentsHolder get(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0;
virtual FileSegments getSnapshot() const = 0;
/// For debug.
@ -121,6 +120,8 @@ protected:
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
virtual FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock) = 0;
void assertInitialized() const;
};
@ -139,8 +140,6 @@ public:
FileSegments getSnapshot() const override;
FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override;
void initialize() override;
void remove(const Key & key) override;
@ -229,6 +228,8 @@ private:
void fillHolesWithEmptyFileSegments(
FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard<std::mutex> & cache_lock);
FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, std::lock_guard<std::mutex> & cache_lock) override;
public:
struct Stat
{

View File

@ -51,6 +51,8 @@ FileSegment::FileSegment(
/// needed, downloader is set on file segment creation).
case (State::DOWNLOADING):
{
/// On write-through cache we do not check downloader id.
is_write_through_cache = true;
downloader_id = getCallerId();
break;
}
@ -79,6 +81,12 @@ size_t FileSegment::getDownloadedSize() const
return getDownloadedSize(segment_lock);
}
size_t FileSegment::getAvailableSize() const
{
std::lock_guard segment_lock(mutex);
return range().size() - downloaded_size;
}
size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const
{
if (download_state == State::DOWNLOADED)
@ -208,7 +216,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
if (!isDownloader())
if (!isDownloader() && !is_write_through_cache)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
@ -266,81 +274,6 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
assert(getDownloadOffset() == offset_ + size);
}
void FileSegment::writeInMemory(const char * from, size_t size)
{
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Attempt to write zero size cache file");
if (availableSize() < size)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
std::lock_guard segment_lock(mutex);
assertNotDetached(segment_lock);
if (cache_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer already initialized");
auto download_path = cache->getPathInLocalCache(key(), offset());
cache_writer = std::make_unique<WriteBufferFromFile>(download_path, size + 1);
try
{
cache_writer->write(from, size);
}
catch (Exception & e)
{
wrapWithCacheInfo(e, "while writing into cache", segment_lock);
setDownloadFailed(segment_lock);
cv.notify_all();
throw;
}
}
size_t FileSegment::finalizeWrite()
{
std::lock_guard segment_lock(mutex);
if (!cache_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer not initialized");
size_t size = cache_writer->offset();
if (size == 0)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
assertNotDetached(segment_lock);
try
{
cache_writer->next();
}
catch (Exception & e)
{
wrapWithCacheInfo(e, "while writing into cache", segment_lock);
setDownloadFailed(segment_lock);
cv.notify_all();
throw;
}
downloaded_size += size;
if (downloaded_size != range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected downloaded size to equal file segment size ({} == {})", downloaded_size, range().size());
setDownloaded(segment_lock);
return size;
}
FileSegment::State FileSegment::wait()
{
std::unique_lock segment_lock(mutex);
@ -375,7 +308,8 @@ bool FileSegment::reserve(size_t size)
assertNotDetached(segment_lock);
auto caller_id = getCallerId();
if (downloader_id != caller_id)
bool is_downloader = caller_id == downloader_id;
if (!is_downloader && !is_write_through_cache)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
if (downloaded_size + size > range().size())
@ -461,7 +395,7 @@ void FileSegment::complete(State state)
assertNotDetached(segment_lock);
bool is_downloader = isDownloaderImpl(segment_lock);
if (!is_downloader)
if (!is_downloader && !is_write_through_cache)
{
cv.notify_all();
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
@ -764,4 +698,116 @@ String FileSegmentsHolder::toString()
}
FileSegmentRangeWriter::FileSegmentRangeWriter(
IFileCache * cache_,
const FileSegment::Key & key_,
size_t max_file_segment_size_)
: cache(cache_)
, key(key_)
, max_file_segment_size(max_file_segment_size_)
{
}
void FileSegmentRangeWriter::allocateFileSegment(size_t offset)
{
std::lock_guard cache_lock(cache->mutex);
auto file_segment = cache->setDownloading(key, offset, max_file_segment_size, cache_lock);
current_file_segment = file_segment;
file_segments_holder.add(std::move(file_segment));
}
bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset)
{
/**
* We want to write eventually some size, which is not known until the very end.
* Therefore we allocate file segments lazily. Each file segment is assigned capacity
* of max_file_segment_size, but reserved_size remains 0, until call to tryReserve().
* Once current file segment is full (reached max_file_segment_size), we allocate a
* new file segment. All allocated file segments resize in file segments holder.
* If at the end of all writes, the last file segment is not full, then it is resized.
*/
std::lock_guard lock(mutex);
if (finalized)
return false;
if (current_file_segment.expired())
allocateFileSegment(current_file_segment_start_offset);
auto file_segment = current_file_segment.lock();
if (file_segment->getAvailableSize() == 0)
{
file_segment->complete(FileSegment::State::DOWNLOADED);
allocateFileSegment(current_file_segment_start_offset);
file_segment = current_file_segment.lock();
}
bool reserved = file_segment->reserve(size);
if (!reserved)
return false;
file_segment->write(data, size, offset);
current_file_segment_start_offset += size;
return true;
}
void FileSegmentRangeWriter::finalize()
{
std::lock_guard lock(mutex);
if (finalized)
return;
if (file_segments_holder.file_segments.empty() || current_file_segment.expired())
return;
auto file_segment = current_file_segment.lock();
std::lock_guard cache_lock(cache->mutex);
file_segment->complete(cache_lock);
finalized = true;
}
FileSegmentRangeWriter::~FileSegmentRangeWriter()
{
try
{
if (!finalized)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void FileSegmentRangeWriter::clearDownloaded()
{
std::lock_guard lock(mutex);
current_file_segment.reset();
auto & file_segments = file_segments_holder.file_segments;
if (file_segments.empty())
return;
std::lock_guard cache_lock(cache->mutex);
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
{
auto file_segment = *file_segment_it;
std::lock_guard segment_lock(file_segment->mutex);
cache->remove(key, file_segment->offset(), cache_lock, segment_lock);
}
file_segments.clear();
}
}

View File

@ -23,6 +23,7 @@ class FileSegment : boost::noncopyable
friend class LRUFileCache;
friend struct FileSegmentsHolder;
friend class FileSegmentRangeWriter;
public:
using Key = UInt128;
@ -105,8 +106,6 @@ public:
*/
void writeInMemory(const char * from, size_t size);
size_t finalizeWrite();
RemoteFileReaderPtr getRemoteFileReader();
void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_);
@ -129,6 +128,8 @@ public:
size_t getDownloadedSize() const;
size_t getAvailableSize() const;
void completeBatchAndResetDownloader();
void complete(State state);
@ -215,18 +216,62 @@ private:
std::atomic<bool> is_downloaded{false};
std::atomic<size_t> hits_count = 0; /// cache hits.
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state
bool is_write_through_cache = false;
};
struct FileSegmentsHolder : private boost::noncopyable
{
FileSegmentsHolder() = default;
explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(std::move(file_segments_)) {}
FileSegmentsHolder(FileSegmentsHolder && other) : file_segments(std::move(other.file_segments)) {}
FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {}
~FileSegmentsHolder();
void add(FileSegmentPtr && file_segment)
{
file_segments.push_back(file_segment);
}
FileSegments file_segments{};
String toString();
};
class FileSegmentRangeWriter
{
public:
FileSegmentRangeWriter(
IFileCache * cache_,
const FileSegment::Key & key_,
size_t max_file_segment_size_);
~FileSegmentRangeWriter();
bool write(char * data, size_t size, size_t offset);
void finalize();
/// If exception happened on remote fs write, we consider current cache invalid.
void clearDownloaded();
private:
void allocateFileSegment(size_t offset);
IFileCache * cache;
FileSegment::Key key;
size_t max_file_segment_size;
FileSegmentsHolder file_segments_holder;
std::weak_ptr<FileSegment> current_file_segment;
size_t current_file_segment_start_offset = 0;
bool finalized = false;
std::mutex mutex;
};
}

View File

@ -72,6 +72,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, s3_settings(s3_settings_)
, schedule(std::move(schedule_))
, cache(cache_)
, cache_writer(cache_.get(), cache_->hash(key), /* max_file_segment_size */s3_settings.max_single_part_upload_size)
{
allocateBuffer();
}
@ -88,46 +89,14 @@ void WriteBufferFromS3::nextImpl()
size_t size = offset();
temporary_buffer->write(working_buffer.begin(), size);
if (size && cacheEnabled())
cache_writer.write(working_buffer.begin(), size, current_cache_write_offset);
current_cache_write_offset += size;
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (cacheEnabled())
{
auto cache_key = cache->hash(key);
file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size));
current_download_offset += size;
size_t remaining_size = size;
auto & file_segments = file_segments_holder->file_segments;
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end(); ++file_segment_it)
{
auto & file_segment = *file_segment_it;
size_t current_size = std::min(file_segment->range().size(), remaining_size);
remaining_size -= current_size;
if (file_segment->reserve(current_size))
{
file_segment->writeInMemory(working_buffer.begin(), current_size);
}
else
{
size_t upper_bound = file_segments.back()->range().right;
LOG_TRACE(
log,
"Space reservation failed, will skip caching for range: [{}, {}], current full range is [{}, {}]",
file_segment->range().left, upper_bound, file_segments.front()->range().right, upper_bound);
for (auto reset_segment_it = file_segment_it; reset_segment_it != file_segments.end(); ++reset_segment_it)
(*reset_segment_it)->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
file_segments.erase(file_segment_it, file_segments.end());
break;
}
}
}
ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset());
last_part_size += offset();
@ -143,8 +112,6 @@ void WriteBufferFromS3::nextImpl()
allocateBuffer();
}
file_segments_holder.reset();
waitForReadyBackGroundTasks();
}
@ -263,12 +230,6 @@ void WriteBufferFromS3::writePart()
fillUploadRequest(task->req, part_number);
if (file_segments_holder)
{
task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
schedule([this, task]()
{
try
@ -280,8 +241,6 @@ void WriteBufferFromS3::writePart()
task->exception = std::current_exception();
}
finalizeCacheIfNeeded(task->cache_files);
{
std::lock_guard lock(bg_tasks_mutex);
task->is_finised = true;
@ -298,14 +257,8 @@ void WriteBufferFromS3::writePart()
{
UploadPartTask task;
fillUploadRequest(task.req, part_tags.size() + 1);
if (file_segments_holder)
{
task.cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
processUploadRequest(task);
part_tags.push_back(task.tag);
finalizeCacheIfNeeded(task.cache_files);
}
}
@ -394,11 +347,6 @@ void WriteBufferFromS3::makeSinglepartUpload()
put_object_task = std::make_unique<PutObjectTask>();
fillPutRequest(put_object_task->req);
if (file_segments_holder)
{
put_object_task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
schedule([this]()
{
@ -411,8 +359,6 @@ void WriteBufferFromS3::makeSinglepartUpload()
put_object_task->exception = std::current_exception();
}
finalizeCacheIfNeeded(put_object_task->cache_files);
{
std::lock_guard lock(bg_tasks_mutex);
put_object_task->is_finised = true;
@ -428,13 +374,7 @@ void WriteBufferFromS3::makeSinglepartUpload()
{
PutObjectTask task;
fillPutRequest(task.req);
if (file_segments_holder)
{
task.cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
processPutRequest(task);
finalizeCacheIfNeeded(task.cache_files);
}
}
@ -462,25 +402,18 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void WriteBufferFromS3::finalizeCacheIfNeeded(std::optional<FileSegmentsHolder> & file_segments_holder)
void WriteBufferFromS3::clearCache()
{
if (!file_segments_holder)
if (!cacheEnabled())
return;
auto & file_segments = file_segments_holder->file_segments;
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
try
{
try
{
size_t size = (*file_segment_it)->finalizeWrite();
file_segment_it = file_segments.erase(file_segment_it);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
cache_writer.clearDownloaded();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -500,6 +433,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks()
if (exception)
{
waitForAllBackGroundTasks();
clearCache();
std::rethrow_exception(exception);
}
@ -520,7 +454,10 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
{
auto & task = upload_object_tasks.front();
if (task.exception)
{
clearCache();
std::rethrow_exception(task.exception);
}
part_tags.push_back(task.tag);
@ -531,7 +468,10 @@ void WriteBufferFromS3::waitForAllBackGroundTasks()
{
bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; });
if (put_object_task->exception)
{
clearCache();
std::rethrow_exception(put_object_task->exception);
}
}
}
}

View File

@ -119,9 +119,10 @@ private:
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
FileCachePtr cache;
size_t current_download_offset = 0;
std::optional<FileSegmentsHolder> file_segments_holder;
static void finalizeCacheIfNeeded(std::optional<FileSegmentsHolder> &);
FileSegmentRangeWriter cache_writer;
void clearCache();
size_t current_cache_write_offset = 0;
};
}