diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 764a605fdee..7dedf1c84a1 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -412,21 +412,6 @@ LRUFileCache::FileSegmentCell * LRUFileCache::addCell( return &(it->second); } -FileSegmentsHolder LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size) -{ - std::lock_guard cache_lock(mutex); - - auto * cell = getCell(key, offset, cache_lock); - if (cell) - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Cache cell already exists for key `{}` and offset {}", - keyToStr(key), offset); - - auto file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock); - return FileSegmentsHolder(std::move(file_segments)); -} - bool LRUFileCache::tryReserve( const Key & key_, size_t offset_, size_t size, std::lock_guard & cache_lock) { @@ -799,6 +784,23 @@ std::vector LRUFileCache::tryGetCachePaths(const Key & key) return cache_paths; } +FileSegmentPtr LRUFileCache::setDownloading(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) +{ + auto * cell = getCell(key, offset, cache_lock); + if (cell) + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Cache cell already exists for key `{}` and offset {}", + keyToStr(key), offset); + + cell = addCell(key, offset, size, FileSegment::State::DOWNLOADING, cache_lock); + + if (!cell) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to add a new cell for download"); + + return cell->file_segment; +} + LRUFileCache::FileSegmentCell::FileSegmentCell(FileSegmentPtr file_segment_, LRUQueue & queue_) : file_segment(file_segment_) { diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index 6a50dbab6d8..0e61162b917 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -26,6 +26,7 @@ class IFileCache : private boost::noncopyable { friend class FileSegment; friend struct FileSegmentsHolder; +friend class FileSegmentRangeWriter; public: using Key = UInt128; @@ -83,8 +84,6 @@ public: */ virtual FileSegmentsHolder get(const Key & key, size_t offset, size_t size) = 0; - virtual FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) = 0; - virtual FileSegments getSnapshot() const = 0; /// For debug. @@ -121,6 +120,8 @@ protected: std::lock_guard & cache_lock, std::lock_guard & segment_lock) = 0; + virtual FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) = 0; + void assertInitialized() const; }; @@ -139,8 +140,6 @@ public: FileSegments getSnapshot() const override; - FileSegmentsHolder setDownloading(const Key & key, size_t offset, size_t size) override; - void initialize() override; void remove(const Key & key) override; @@ -229,6 +228,8 @@ private: void fillHolesWithEmptyFileSegments( FileSegments & file_segments, const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, std::lock_guard & cache_lock); + FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) override; + public: struct Stat { diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index b5faa88fc92..5d77d38e947 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -51,6 +51,8 @@ FileSegment::FileSegment( /// needed, downloader is set on file segment creation). case (State::DOWNLOADING): { + /// On write-through cache we do not check downloader id. + is_write_through_cache = true; downloader_id = getCallerId(); break; } @@ -79,6 +81,12 @@ size_t FileSegment::getDownloadedSize() const return getDownloadedSize(segment_lock); } +size_t FileSegment::getAvailableSize() const +{ + std::lock_guard segment_lock(mutex); + return range().size() - downloaded_size; +} + size_t FileSegment::getDownloadedSize(std::lock_guard & /* segment_lock */) const { if (download_state == State::DOWNLOADED) @@ -208,7 +216,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Not enough space is reserved. Available: {}, expected: {}", availableSize(), size); - if (!isDownloader()) + if (!isDownloader() && !is_write_through_cache) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})", getCallerId(), downloader_id); @@ -266,81 +274,6 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) assert(getDownloadOffset() == offset_ + size); } -void FileSegment::writeInMemory(const char * from, size_t size) -{ - if (!size) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Attempt to write zero size cache file"); - - if (availableSize() < size) - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Not enough space is reserved. Available: {}, expected: {}", availableSize(), size); - - std::lock_guard segment_lock(mutex); - - assertNotDetached(segment_lock); - - if (cache_writer) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer already initialized"); - - auto download_path = cache->getPathInLocalCache(key(), offset()); - cache_writer = std::make_unique(download_path, size + 1); - - try - { - cache_writer->write(from, size); - } - catch (Exception & e) - { - wrapWithCacheInfo(e, "while writing into cache", segment_lock); - - setDownloadFailed(segment_lock); - - cv.notify_all(); - - throw; - } -} - -size_t FileSegment::finalizeWrite() -{ - std::lock_guard segment_lock(mutex); - - if (!cache_writer) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer not initialized"); - - size_t size = cache_writer->offset(); - - if (size == 0) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed"); - - assertNotDetached(segment_lock); - - try - { - cache_writer->next(); - } - catch (Exception & e) - { - wrapWithCacheInfo(e, "while writing into cache", segment_lock); - - setDownloadFailed(segment_lock); - - cv.notify_all(); - - throw; - } - - downloaded_size += size; - - if (downloaded_size != range().size()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected downloaded size to equal file segment size ({} == {})", downloaded_size, range().size()); - - setDownloaded(segment_lock); - - return size; -} - FileSegment::State FileSegment::wait() { std::unique_lock segment_lock(mutex); @@ -375,7 +308,8 @@ bool FileSegment::reserve(size_t size) assertNotDetached(segment_lock); auto caller_id = getCallerId(); - if (downloader_id != caller_id) + bool is_downloader = caller_id == downloader_id; + if (!is_downloader && !is_write_through_cache) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id); if (downloaded_size + size > range().size()) @@ -461,7 +395,7 @@ void FileSegment::complete(State state) assertNotDetached(segment_lock); bool is_downloader = isDownloaderImpl(segment_lock); - if (!is_downloader) + if (!is_downloader && !is_write_through_cache) { cv.notify_all(); throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, @@ -764,4 +698,116 @@ String FileSegmentsHolder::toString() } +FileSegmentRangeWriter::FileSegmentRangeWriter( + IFileCache * cache_, + const FileSegment::Key & key_, + size_t max_file_segment_size_) + : cache(cache_) + , key(key_) + , max_file_segment_size(max_file_segment_size_) +{ +} + +void FileSegmentRangeWriter::allocateFileSegment(size_t offset) +{ + std::lock_guard cache_lock(cache->mutex); + + auto file_segment = cache->setDownloading(key, offset, max_file_segment_size, cache_lock); + current_file_segment = file_segment; + + file_segments_holder.add(std::move(file_segment)); +} + +bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset) +{ + /** + * We want to write eventually some size, which is not known until the very end. + * Therefore we allocate file segments lazily. Each file segment is assigned capacity + * of max_file_segment_size, but reserved_size remains 0, until call to tryReserve(). + * Once current file segment is full (reached max_file_segment_size), we allocate a + * new file segment. All allocated file segments resize in file segments holder. + * If at the end of all writes, the last file segment is not full, then it is resized. + */ + + std::lock_guard lock(mutex); + + if (finalized) + return false; + + if (current_file_segment.expired()) + allocateFileSegment(current_file_segment_start_offset); + + auto file_segment = current_file_segment.lock(); + if (file_segment->getAvailableSize() == 0) + { + file_segment->complete(FileSegment::State::DOWNLOADED); + + allocateFileSegment(current_file_segment_start_offset); + file_segment = current_file_segment.lock(); + } + + bool reserved = file_segment->reserve(size); + if (!reserved) + return false; + + file_segment->write(data, size, offset); + current_file_segment_start_offset += size; + + return true; +} + +void FileSegmentRangeWriter::finalize() +{ + std::lock_guard lock(mutex); + + if (finalized) + return; + + if (file_segments_holder.file_segments.empty() || current_file_segment.expired()) + return; + + auto file_segment = current_file_segment.lock(); + + std::lock_guard cache_lock(cache->mutex); + file_segment->complete(cache_lock); + + finalized = true; +} + +FileSegmentRangeWriter::~FileSegmentRangeWriter() +{ + try + { + if (!finalized) + finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +void FileSegmentRangeWriter::clearDownloaded() +{ + std::lock_guard lock(mutex); + + current_file_segment.reset(); + + auto & file_segments = file_segments_holder.file_segments; + if (file_segments.empty()) + return; + + std::lock_guard cache_lock(cache->mutex); + + for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();) + { + auto file_segment = *file_segment_it; + + std::lock_guard segment_lock(file_segment->mutex); + cache->remove(key, file_segment->offset(), cache_lock, segment_lock); + } + + file_segments.clear(); +} + } diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index 2faab5eba51..4aa9a934333 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -23,6 +23,7 @@ class FileSegment : boost::noncopyable friend class LRUFileCache; friend struct FileSegmentsHolder; +friend class FileSegmentRangeWriter; public: using Key = UInt128; @@ -105,8 +106,6 @@ public: */ void writeInMemory(const char * from, size_t size); - size_t finalizeWrite(); - RemoteFileReaderPtr getRemoteFileReader(); void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_); @@ -129,6 +128,8 @@ public: size_t getDownloadedSize() const; + size_t getAvailableSize() const; + void completeBatchAndResetDownloader(); void complete(State state); @@ -215,18 +216,62 @@ private: std::atomic is_downloaded{false}; std::atomic hits_count = 0; /// cache hits. std::atomic ref_count = 0; /// Used for getting snapshot state + + bool is_write_through_cache = false; }; struct FileSegmentsHolder : private boost::noncopyable { + FileSegmentsHolder() = default; + explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(std::move(file_segments_)) {} - FileSegmentsHolder(FileSegmentsHolder && other) : file_segments(std::move(other.file_segments)) {} + + FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {} ~FileSegmentsHolder(); + void add(FileSegmentPtr && file_segment) + { + file_segments.push_back(file_segment); + } + FileSegments file_segments{}; String toString(); }; +class FileSegmentRangeWriter +{ +public: + FileSegmentRangeWriter( + IFileCache * cache_, + const FileSegment::Key & key_, + size_t max_file_segment_size_); + + ~FileSegmentRangeWriter(); + + bool write(char * data, size_t size, size_t offset); + + void finalize(); + + /// If exception happened on remote fs write, we consider current cache invalid. + void clearDownloaded(); + +private: + void allocateFileSegment(size_t offset); + + IFileCache * cache; + FileSegment::Key key; + size_t max_file_segment_size; + + FileSegmentsHolder file_segments_holder; + + std::weak_ptr current_file_segment; + size_t current_file_segment_start_offset = 0; + + bool finalized = false; + + std::mutex mutex; +}; + } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 514434bc3ed..345994f2a01 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -72,6 +72,7 @@ WriteBufferFromS3::WriteBufferFromS3( , s3_settings(s3_settings_) , schedule(std::move(schedule_)) , cache(cache_) + , cache_writer(cache_.get(), cache_->hash(key), /* max_file_segment_size */s3_settings.max_single_part_upload_size) { allocateBuffer(); } @@ -88,46 +89,14 @@ void WriteBufferFromS3::nextImpl() size_t size = offset(); temporary_buffer->write(working_buffer.begin(), size); + if (size && cacheEnabled()) + cache_writer.write(working_buffer.begin(), size, current_cache_write_offset); + current_cache_write_offset += size; + ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup() ? CurrentThread::get().getThreadGroup() : MainThreadStatus::getInstance().getThreadGroup(); - if (cacheEnabled()) - { - auto cache_key = cache->hash(key); - - file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size)); - current_download_offset += size; - - size_t remaining_size = size; - auto & file_segments = file_segments_holder->file_segments; - for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end(); ++file_segment_it) - { - auto & file_segment = *file_segment_it; - size_t current_size = std::min(file_segment->range().size(), remaining_size); - remaining_size -= current_size; - - if (file_segment->reserve(current_size)) - { - file_segment->writeInMemory(working_buffer.begin(), current_size); - } - else - { - size_t upper_bound = file_segments.back()->range().right; - LOG_TRACE( - log, - "Space reservation failed, will skip caching for range: [{}, {}], current full range is [{}, {}]", - file_segment->range().left, upper_bound, file_segments.front()->range().right, upper_bound); - - for (auto reset_segment_it = file_segment_it; reset_segment_it != file_segments.end(); ++reset_segment_it) - (*reset_segment_it)->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - file_segments.erase(file_segment_it, file_segments.end()); - - break; - } - } - } - ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset()); last_part_size += offset(); @@ -143,8 +112,6 @@ void WriteBufferFromS3::nextImpl() allocateBuffer(); } - file_segments_holder.reset(); - waitForReadyBackGroundTasks(); } @@ -263,12 +230,6 @@ void WriteBufferFromS3::writePart() fillUploadRequest(task->req, part_number); - if (file_segments_holder) - { - task->cache_files.emplace(std::move(*file_segments_holder)); - file_segments_holder.reset(); - } - schedule([this, task]() { try @@ -280,8 +241,6 @@ void WriteBufferFromS3::writePart() task->exception = std::current_exception(); } - finalizeCacheIfNeeded(task->cache_files); - { std::lock_guard lock(bg_tasks_mutex); task->is_finised = true; @@ -298,14 +257,8 @@ void WriteBufferFromS3::writePart() { UploadPartTask task; fillUploadRequest(task.req, part_tags.size() + 1); - if (file_segments_holder) - { - task.cache_files.emplace(std::move(*file_segments_holder)); - file_segments_holder.reset(); - } processUploadRequest(task); part_tags.push_back(task.tag); - finalizeCacheIfNeeded(task.cache_files); } } @@ -394,11 +347,6 @@ void WriteBufferFromS3::makeSinglepartUpload() put_object_task = std::make_unique(); fillPutRequest(put_object_task->req); - if (file_segments_holder) - { - put_object_task->cache_files.emplace(std::move(*file_segments_holder)); - file_segments_holder.reset(); - } schedule([this]() { @@ -411,8 +359,6 @@ void WriteBufferFromS3::makeSinglepartUpload() put_object_task->exception = std::current_exception(); } - finalizeCacheIfNeeded(put_object_task->cache_files); - { std::lock_guard lock(bg_tasks_mutex); put_object_task->is_finised = true; @@ -428,13 +374,7 @@ void WriteBufferFromS3::makeSinglepartUpload() { PutObjectTask task; fillPutRequest(task.req); - if (file_segments_holder) - { - task.cache_files.emplace(std::move(*file_segments_holder)); - file_segments_holder.reset(); - } processPutRequest(task); - finalizeCacheIfNeeded(task.cache_files); } } @@ -462,25 +402,18 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task) throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } -void WriteBufferFromS3::finalizeCacheIfNeeded(std::optional & file_segments_holder) +void WriteBufferFromS3::clearCache() { - if (!file_segments_holder) + if (!cacheEnabled()) return; - auto & file_segments = file_segments_holder->file_segments; - for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();) + try { - try - { - size_t size = (*file_segment_it)->finalizeWrite(); - file_segment_it = file_segments.erase(file_segment_it); - - ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + cache_writer.clearDownloaded(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -500,6 +433,7 @@ void WriteBufferFromS3::waitForReadyBackGroundTasks() if (exception) { waitForAllBackGroundTasks(); + clearCache(); std::rethrow_exception(exception); } @@ -520,7 +454,10 @@ void WriteBufferFromS3::waitForAllBackGroundTasks() { auto & task = upload_object_tasks.front(); if (task.exception) + { + clearCache(); std::rethrow_exception(task.exception); + } part_tags.push_back(task.tag); @@ -531,7 +468,10 @@ void WriteBufferFromS3::waitForAllBackGroundTasks() { bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; }); if (put_object_task->exception) + { + clearCache(); std::rethrow_exception(put_object_task->exception); + } } } } diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 19faf0b1488..0b22b234cc8 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -119,9 +119,10 @@ private: Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); FileCachePtr cache; - size_t current_download_offset = 0; - std::optional file_segments_holder; - static void finalizeCacheIfNeeded(std::optional &); + FileSegmentRangeWriter cache_writer; + void clearCache(); + + size_t current_cache_write_offset = 0; }; }