#pragma once #include #include #include #include #include namespace Poco { class Logger; } namespace DB { class FileCache; class FileSegment; using FileSegmentPtr = std::shared_ptr; using FileSegments = std::list; class FileSegment : boost::noncopyable { friend class LRUFileCache; friend struct FileSegmentsHolder; public: using Key = UInt128; using RemoteFileReaderPtr = std::shared_ptr; using LocalCacheWriterPtr = std::unique_ptr; enum class State { DOWNLOADED, /** * When file segment is first created and returned to user, it has state EMPTY. * EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully * by any owner of EMPTY state file segment. */ EMPTY, /** * A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader * because each cache user might acquire multiple file segments and reads them one by one, * so only user which actually needs to read this segment earlier than others - becomes a downloader. */ DOWNLOADING, /** * Space reservation for a file segment is incremental, i.e. downaloder reads buffer_size bytes * from remote fs -> tries to reserve buffer_size bytes to put them to cache -> writes to cache * on successful reservation and stops cache write otherwise. Those, who waited for the same file * file segment, will read downloaded part from cache and remaining part directly from remote fs. */ PARTIALLY_DOWNLOADED_NO_CONTINUATION, /** * If downloader did not finish download of current file segment for any reason apart from running * out of cache space, then download can be continued by other owners of this file segment. */ PARTIALLY_DOWNLOADED, /** * If file segment cannot possibly be downloaded (first space reservation attempt failed), mark * this file segment as out of cache scope. */ SKIP_CACHE, }; FileSegment( size_t offset_, size_t size_, const Key & key_, FileCache * cache_, State download_state_); State state() const; static String stateToString(FileSegment::State state); /// Represents an interval [left, right] including both boundaries. struct Range { size_t left; size_t right; Range(size_t left_, size_t right_) : left(left_), right(right_) {} bool operator==(const Range & other) const { return left == other.left && right == other.right; } size_t size() const { return right - left + 1; } String toString() const { return '[' + std::to_string(left) + ',' + std::to_string(right) + ']'; } }; const Range & range() const { return segment_range; } const Key & key() const { return file_key; } size_t offset() const { return range().left; } State wait(); bool reserve(size_t size); void write(const char * from, size_t size); RemoteFileReaderPtr getRemoteFileReader(); void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_); String getOrSetDownloader(); String getDownloader() const; bool isDownloader() const; static String getCallerId(); size_t downloadOffset() const; void completeBatchAndResetDownloader(); void complete(State state, bool error = false); String getInfoForLog() const; private: size_t availableSize() const { return reserved_size - downloaded_size; } bool lastFileSegmentHolder() const; void complete(); void completeImpl(std::lock_guard & /* segment_lock */); const Range segment_range; State download_state; String downloader_id; RemoteFileReaderPtr remote_file_reader; LocalCacheWriterPtr cache_writer; size_t downloaded_size = 0; size_t reserved_size = 0; mutable std::mutex mutex; std::condition_variable cv; Key file_key; FileCache * cache; Poco::Logger * log; }; struct FileSegmentsHolder : boost::noncopyable { explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(file_segments_) {} FileSegmentsHolder(FileSegmentsHolder && other) : file_segments(std::move(other.file_segments)) {} ~FileSegmentsHolder() { /// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from /// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here /// remain only uncompleted file segments. for (auto & segment : file_segments) segment->complete(); } FileSegments file_segments{}; }; }