diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 9cc31df0b43..3f83f71a6de 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -1121,7 +1121,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des if (need_render_progress && have_data_in_stdin) { /// Set total_bytes_to_read for current fd. - FileProgress file_progress(0, std_in.size()); + FileProgress file_progress(0, std_in.getFileSize()); progress_indication.updateProgress(Progress(file_progress)); /// Set callback to be called on file progress. diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 6deee57bf43..0070f678b67 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -107,6 +107,13 @@ size_t FileSegment::getDownloadedSize(std::lock_guard & /* segment_l return downloaded_size; } +void FileSegment::resizeToDownloadedSize( + std::lock_guard & /* segment_lock */, std::lock_guard & /* cache_lock */) +{ + LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size); + segment_range = Range(segment_range.left, segment_range.left + downloaded_size - 1); +} + String FileSegment::getCallerId() { if (!CurrentThread::isInitialized() @@ -420,7 +427,7 @@ void FileSegment::completeBatchAndResetDownloader() cv.notify_all(); } -void FileSegment::complete(State state) +void FileSegment::complete(State state, bool auto_resize) { std::lock_guard cache_lock(cache->mutex); std::lock_guard segment_lock(mutex); @@ -445,8 +452,22 @@ void FileSegment::complete(State state) } if (state == State::DOWNLOADED) + { + if (auto_resize && downloaded_size != range().size()) + { + resizeToDownloadedSize(segment_lock, cache_lock); + } + + /// Update states and finalize cache write buffer. setDownloaded(segment_lock); + if (downloaded_size != range().size()) + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Cannot complete file segment as DOWNLOADED, because downloaded size ({}) does not match expected size ({})", + downloaded_size, range().size()); + } + download_state = state; try @@ -539,8 +560,7 @@ void FileSegment::completeImpl(std::lock_guard & cache_lock, std::lo * it only when nobody needs it. */ download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; - LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size); - cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock); + resizeToDownloadedSize(segment_lock, cache_lock); } markAsDetached(segment_lock); @@ -819,7 +839,7 @@ bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, bool if ((*current_file_segment_it)->getAvailableSize() == 0) { - (*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED); + (*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED, true); on_complete_file_segment_func(*current_file_segment_it); current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent); } @@ -873,7 +893,7 @@ void FileSegmentRangeWriter::finalize() if ((*current_file_segment_it)->getDownloadedSize() > 0) { - (*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED); + (*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED, true); on_complete_file_segment_func(*current_file_segment_it); } else diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index 29ce8a4991e..e08a72580b8 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -133,7 +133,7 @@ public: void completeBatchAndResetDownloader(); - void complete(State state); + void complete(State state, bool auto_resize = false); String getInfoForLog() const; @@ -166,6 +166,7 @@ private: bool isDetached(std::lock_guard & /* segment_lock */) const { return is_detached; } void markAsDetached(std::lock_guard & segment_lock); [[noreturn]] void throwIfDetachedUnlocked(std::lock_guard & segment_lock) const; + void resizeToDownloadedSize(std::lock_guard & segment_lock, std::lock_guard & cache_lock); void assertDetachedStatus(std::lock_guard & segment_lock) const; void assertNotDetached(std::lock_guard & segment_lock) const; @@ -191,7 +192,7 @@ private: void resetDownloaderImpl(std::lock_guard & segment_lock); - const Range segment_range; + Range segment_range; State download_state; String downloader_id; diff --git a/src/Common/IFileCache.h b/src/Common/IFileCache.h index 1befd050781..135a9eb9e9d 100644 --- a/src/Common/IFileCache.h +++ b/src/Common/IFileCache.h @@ -34,7 +34,6 @@ public: Key() = default; explicit Key(const UInt128 & key_) : key(key_) {} - bool operator<(const Key & other) const { return key < other.key; } bool operator==(const Key & other) const { return key == other.key; } }; @@ -123,13 +122,6 @@ protected: std::lock_guard & cache_lock, std::lock_guard & segment_lock) = 0; - /// If file segment was partially downloaded and then space reservation fails (because of no - /// space left), then update corresponding cache cell metadata (file segment size). - virtual void reduceSizeToDownloaded( - const Key & key, size_t offset, - std::lock_guard & cache_lock, - std::lock_guard & segment_lock) = 0; - virtual FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent, std::lock_guard & cache_lock) = 0; void assertInitialized() const; diff --git a/src/Common/LRUFileCache.cpp b/src/Common/LRUFileCache.cpp index cef2b824d20..151c50e07a7 100644 --- a/src/Common/LRUFileCache.cpp +++ b/src/Common/LRUFileCache.cpp @@ -742,39 +742,6 @@ void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_l #endif } -void LRUFileCache::reduceSizeToDownloaded( - const Key & key, size_t offset, - std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) -{ - /** - * In case file was partially downloaded and it's download cannot be continued - * because of no space left in cache, we need to be able to cut cell's size to downloaded_size. - */ - - auto * cell = getCell(key, offset, cache_lock); - - if (!cell) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "No cell found for key: {}, offset: {}", - key.toString(), offset); - } - - auto file_segment = cell->file_segment; - - size_t downloaded_size = file_segment->downloaded_size; - if (downloaded_size == file_segment->range().size()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Nothing to reduce, file segment fully downloaded, key: {}, offset: {}", - key.toString(), offset); - } - - cell->file_segment = std::make_shared(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED, file_segment->isPersistent()); -} - bool LRUFileCache::isLastFileSegmentHolder( const Key & key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) diff --git a/src/Common/LRUFileCache.h b/src/Common/LRUFileCache.h index 01f70a3ceed..bacce8a351e 100644 --- a/src/Common/LRUFileCache.h +++ b/src/Common/LRUFileCache.h @@ -142,11 +142,6 @@ private: std::lock_guard & cache_lock, std::lock_guard & segment_lock) override; - void reduceSizeToDownloaded( - const Key & key, size_t offset, - std::lock_guard & cache_lock, - std::lock_guard & segment_lock) override; - size_t getAvailableCacheSize() const; void loadCacheInfoIntoMemory(std::lock_guard & cache_lock); diff --git a/src/Common/filesystemHelpers.cpp b/src/Common/filesystemHelpers.cpp index ca06b21ab3a..6007b824100 100644 --- a/src/Common/filesystemHelpers.cpp +++ b/src/Common/filesystemHelpers.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace fs = std::filesystem; @@ -30,6 +31,7 @@ namespace ErrorCodes extern const int SYSTEM_ERROR; extern const int NOT_IMPLEMENTED; extern const int CANNOT_STAT; + extern const int CANNOT_FSTAT; extern const int CANNOT_STATVFS; extern const int PATH_ACCESS_DENIED; extern const int CANNOT_CREATE_FILE; @@ -216,6 +218,20 @@ bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path return fileOrSymlinkPathStartsWith(filesystem_path, filesystem_prefix_path); } +size_t getSizeFromFileDescriptor(int fd, const String & file_name) +{ + struct stat buf; + int res = fstat(fd, &buf); + if (-1 == res) + { + throwFromErrnoWithPath( + "Cannot execute fstat" + (file_name.empty() ? "" : " file: " + file_name), + file_name, + ErrorCodes::CANNOT_FSTAT); + } + return buf.st_size; +} + } diff --git a/src/Common/filesystemHelpers.h b/src/Common/filesystemHelpers.h index b15073796a0..6ba785ccf53 100644 --- a/src/Common/filesystemHelpers.h +++ b/src/Common/filesystemHelpers.h @@ -64,6 +64,8 @@ bool pathStartsWith(const String & path, const String & prefix_path); /// (Path is made absolute and normalized.) bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path); +size_t getSizeFromFileDescriptor(int fd, const String & file_name = ""); + } namespace FS diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 5f2e19162f4..d2b999aa635 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -67,7 +67,7 @@ String AsynchronousReadIndirectBufferFromRemoteFS::getInfoForLog() return impl->getInfoForLog(); } -std::optional AsynchronousReadIndirectBufferFromRemoteFS::getFileSize() +size_t AsynchronousReadIndirectBufferFromRemoteFS::getFileSize() { return impl->getFileSize(); } diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index e89c1cc2ec4..251110653ba 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -27,7 +27,7 @@ struct ReadSettings; * * We pass either `memory` or `prefetch_buffer` through all this chain and return it back. */ -class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase, public WithFileSize +class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase { public: explicit AsynchronousReadIndirectBufferFromRemoteFS( @@ -51,7 +51,7 @@ public: String getInfoForLog() override; - std::optional getFileSize() override; + size_t getFileSize() override; bool isIntegratedWithFilesystemCache() const override { return true; } diff --git a/src/Disks/IO/CachedReadBufferFromFile.cpp b/src/Disks/IO/CachedReadBufferFromFile.cpp index 8a57bd7f1f8..91207d0d5e9 100644 --- a/src/Disks/IO/CachedReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedReadBufferFromFile.cpp @@ -40,8 +40,9 @@ CachedReadBufferFromFile::CachedReadBufferFromFile( RemoteFSFileReaderCreator remote_file_reader_creator_, const ReadSettings & settings_, const String & query_id_, - size_t read_until_position_) - : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0) + size_t file_size_, + std::optional read_until_position_) + : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_) #ifndef NDEBUG , log(&Poco::Logger::get("CachedReadBufferFromFile(" + source_file_path_ + ")")) #else @@ -51,7 +52,7 @@ CachedReadBufferFromFile::CachedReadBufferFromFile( , source_file_path(source_file_path_) , cache(cache_) , settings(settings_) - , read_until_position(read_until_position_) + , read_until_position(read_until_position_ ? *read_until_position_ : file_size_) , remote_file_reader_creator(remote_file_reader_creator_) , is_persistent(settings_.cache_file_as_persistent) , query_id(query_id_) @@ -128,8 +129,8 @@ SeekableReadBufferPtr CachedReadBufferFromFile::getCacheReadBuffer(size_t offset local_read_settings.local_fs_method = LocalFSReadMethod::pread; auto buf = createReadBufferFromFileBase(path, local_read_settings); - auto * from_fd = dynamic_cast(buf.get()); - if (from_fd && from_fd->size() == 0) + auto * from_fd = dynamic_cast(buf.get()); + if (from_fd && from_fd->getFileSize() == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {}", path); return buf; @@ -407,8 +408,8 @@ SeekableReadBufferPtr CachedReadBufferFromFile::getImplementationBuffer(FileSegm case ReadType::CACHED: { #ifndef NDEBUG - auto * file_reader = dynamic_cast(read_buffer_for_file_segment.get()); - size_t file_size = file_reader->size(); + auto * file_reader = dynamic_cast(read_buffer_for_file_segment.get()); + size_t file_size = file_reader->getFileSize(); if (file_size == 0 || range.left + file_size <= file_offset_of_buffer_end) throw Exception( @@ -835,9 +836,9 @@ bool CachedReadBufferFromFile::nextImplStep() if (!result) { #ifndef NDEBUG - if (auto * cache_file_reader = dynamic_cast(implementation_buffer.get())) + if (auto * cache_file_reader = dynamic_cast(implementation_buffer.get())) { - auto cache_file_size = cache_file_reader->size(); + auto cache_file_size = cache_file_reader->getFileSize(); if (cache_file_size == 0) throw Exception( ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {} (just before actual read)", cache_file_size); @@ -950,8 +951,8 @@ bool CachedReadBufferFromFile::nextImplStep() if (size == 0 && file_offset_of_buffer_end < read_until_position) { std::optional cache_file_size; - if (auto * cache_file_reader = dynamic_cast(implementation_buffer.get())) - cache_file_size = cache_file_reader->size(); + if (auto * cache_file_reader = dynamic_cast(implementation_buffer.get())) + cache_file_size = cache_file_reader->getFileSize(); throw Exception( ErrorCodes::LOGICAL_ERROR, diff --git a/src/Disks/IO/CachedReadBufferFromFile.h b/src/Disks/IO/CachedReadBufferFromFile.h index 080d5ca365b..be1854a00e5 100644 --- a/src/Disks/IO/CachedReadBufferFromFile.h +++ b/src/Disks/IO/CachedReadBufferFromFile.h @@ -29,7 +29,8 @@ public: RemoteFSFileReaderCreator remote_file_reader_creator_, const ReadSettings & settings_, const String & query_id_, - size_t read_until_position_); + size_t file_size_, + std::optional read_until_position_ = std::nullopt); ~CachedReadBufferFromFile() override; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 7984bbfcf5c..e60f649e215 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -63,7 +63,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(con { return std::make_shared( remote_path, settings.remote_fs_cache, remote_file_reader_creator, - settings, query_id, read_until_position ? read_until_position : file_size); + settings, query_id, file_size, read_until_position ? std::optional(read_until_position) : std::nullopt); } return remote_file_reader_creator(); diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp index 699f8380cb8..3f7b378dee4 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -19,6 +19,10 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( { } +size_t ReadIndirectBufferFromRemoteFS::getFileSize() +{ + return impl->getFileSize(); +} off_t ReadIndirectBufferFromRemoteFS::getPosition() { diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.h b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.h index b7a1e5bf2e4..648424782ab 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.h +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.h @@ -33,6 +33,8 @@ public: bool isIntegratedWithFilesystemCache() const override { return true; } + size_t getFileSize() override; + private: bool nextImpl() override; diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index 5b97bf64d22..59f5d7bbf79 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -7,6 +7,7 @@ #include #include #include +#include namespace ProfileEvents @@ -242,4 +243,9 @@ void AsynchronousReadBufferFromFileDescriptor::rewind() file_offset_of_buffer_end = 0; } +size_t AsynchronousReadBufferFromFileDescriptor::getFileSize() +{ + return getSizeFromFileDescriptor(fd, getFileName()); +} + } diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.h b/src/IO/AsynchronousReadBufferFromFileDescriptor.h index a74f24d62a0..c3d53afa749 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.h +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.h @@ -64,9 +64,10 @@ public: /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. void rewind(); + size_t getFileSize() override; + private: std::future readInto(char * data, size_t size); }; } - diff --git a/src/IO/ConcatSeekableReadBuffer.h b/src/IO/ConcatSeekableReadBuffer.h index fd9417cef8a..5d7dca82524 100644 --- a/src/IO/ConcatSeekableReadBuffer.h +++ b/src/IO/ConcatSeekableReadBuffer.h @@ -21,7 +21,7 @@ public: off_t seek(off_t off, int whence) override; off_t getPosition() override; - std::optional getFileSize() override { return total_size; } + size_t getFileSize() override { return total_size; } private: bool nextImpl() override; diff --git a/src/IO/MMapReadBufferFromFileDescriptor.cpp b/src/IO/MMapReadBufferFromFileDescriptor.cpp index 463252ca78d..5a636971fa0 100644 --- a/src/IO/MMapReadBufferFromFileDescriptor.cpp +++ b/src/IO/MMapReadBufferFromFileDescriptor.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -86,4 +87,8 @@ off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence) return new_pos; } +size_t MMapReadBufferFromFileDescriptor::getFileSize() +{ + return getSizeFromFileDescriptor(getFD(), getFileName()); +} } diff --git a/src/IO/MMapReadBufferFromFileDescriptor.h b/src/IO/MMapReadBufferFromFileDescriptor.h index 1715c2200fb..1a4bcd4f3ed 100644 --- a/src/IO/MMapReadBufferFromFileDescriptor.h +++ b/src/IO/MMapReadBufferFromFileDescriptor.h @@ -33,9 +33,12 @@ public: void finish(); off_t getPosition() override; + std::string getFileName() const override; + int getFD() const; + + size_t getFileSize() override; }; } - diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index 512f1c856b7..bfe4e79345d 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -142,7 +142,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence) return offset; } -std::optional ParallelReadBuffer::getFileSize() +size_t ParallelReadBuffer::getFileSize() { return reader_factory->getFileSize(); } diff --git a/src/IO/ParallelReadBuffer.h b/src/IO/ParallelReadBuffer.h index 83b978848f8..9881d463ed4 100644 --- a/src/IO/ParallelReadBuffer.h +++ b/src/IO/ParallelReadBuffer.h @@ -43,7 +43,7 @@ public: ~ParallelReadBuffer() override { finishAndWait(); } off_t seek(off_t off, int whence) override; - std::optional getFileSize(); + size_t getFileSize(); off_t getPosition() override; const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; } diff --git a/src/IO/ReadBufferFromEmptyFile.h b/src/IO/ReadBufferFromEmptyFile.h index 0a14c07dd5c..f21f2f507dc 100644 --- a/src/IO/ReadBufferFromEmptyFile.h +++ b/src/IO/ReadBufferFromEmptyFile.h @@ -19,6 +19,7 @@ private: std::string getFileName() const override { return ""; } off_t seek(off_t /*off*/, int /*whence*/) override { return 0; } off_t getPosition() override { return 0; } + size_t getFileSize() override { return 0; } }; } diff --git a/src/IO/ReadBufferFromEncryptedFile.h b/src/IO/ReadBufferFromEncryptedFile.h index 05854c0d20c..267477b3b98 100644 --- a/src/IO/ReadBufferFromEncryptedFile.h +++ b/src/IO/ReadBufferFromEncryptedFile.h @@ -30,6 +30,8 @@ public: void setReadUntilEnd() override { in->setReadUntilEnd(); } + size_t getFileSize() override { return in->getFileSize(); } + private: bool nextImpl() override; diff --git a/src/IO/ReadBufferFromFileBase.cpp b/src/IO/ReadBufferFromFileBase.cpp index 4db64755abf..7b1a0ead6fd 100644 --- a/src/IO/ReadBufferFromFileBase.cpp +++ b/src/IO/ReadBufferFromFileBase.cpp @@ -3,6 +3,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory(0) { } @@ -19,4 +24,11 @@ ReadBufferFromFileBase::ReadBufferFromFileBase( ReadBufferFromFileBase::~ReadBufferFromFileBase() = default; +size_t ReadBufferFromFileBase::getFileSize() +{ + if (file_size) + return *file_size; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getFileSize() is not implemented for read buffer"); +} + } diff --git a/src/IO/ReadBufferFromFileBase.h b/src/IO/ReadBufferFromFileBase.h index b076510a0d5..d28be034eb5 100644 --- a/src/IO/ReadBufferFromFileBase.h +++ b/src/IO/ReadBufferFromFileBase.h @@ -20,7 +20,8 @@ namespace DB { -class ReadBufferFromFileBase : public BufferWithOwnMemory, public WithFileName + +class ReadBufferFromFileBase : public BufferWithOwnMemory, public WithFileName, public WithFileSize { public: ReadBufferFromFileBase(); @@ -48,6 +49,8 @@ public: clock_type = clock_type_; } + size_t getFileSize() override; + protected: std::optional file_size; ProfileCallback profile_callback; diff --git a/src/IO/ReadBufferFromFileDecorator.cpp b/src/IO/ReadBufferFromFileDecorator.cpp index f4a996fc278..6e803586cd6 100644 --- a/src/IO/ReadBufferFromFileDecorator.cpp +++ b/src/IO/ReadBufferFromFileDecorator.cpp @@ -53,4 +53,9 @@ bool ReadBufferFromFileDecorator::nextImpl() return result; } +size_t ReadBufferFromFileDecorator::getFileSize() +{ + return getFileSizeFromReadBuffer(*impl); +} + } diff --git a/src/IO/ReadBufferFromFileDecorator.h b/src/IO/ReadBufferFromFileDecorator.h index 96d4d0c26d4..1d035e8d74b 100644 --- a/src/IO/ReadBufferFromFileDecorator.h +++ b/src/IO/ReadBufferFromFileDecorator.h @@ -29,6 +29,8 @@ public: bool isIntegratedWithFilesystemCache() const override { return impl->isIntegratedWithFilesystemCache(); } + size_t getFileSize() override; + protected: std::unique_ptr impl; String file_name; diff --git a/src/IO/ReadBufferFromFileDescriptor.cpp b/src/IO/ReadBufferFromFileDescriptor.cpp index 78881005e2b..b14b0f6d412 100644 --- a/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/src/IO/ReadBufferFromFileDescriptor.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -246,13 +247,9 @@ bool ReadBufferFromFileDescriptor::poll(size_t timeout_microseconds) } -off_t ReadBufferFromFileDescriptor::size() +size_t ReadBufferFromFileDescriptor::getFileSize() { - struct stat buf; - int res = fstat(fd, &buf); - if (-1 == res) - throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT); - return buf.st_size; + return getSizeFromFileDescriptor(fd, getFileName()); } diff --git a/src/IO/ReadBufferFromFileDescriptor.h b/src/IO/ReadBufferFromFileDescriptor.h index 66f85e77262..4c9b8b02c61 100644 --- a/src/IO/ReadBufferFromFileDescriptor.h +++ b/src/IO/ReadBufferFromFileDescriptor.h @@ -58,7 +58,7 @@ public: /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. void rewind(); - off_t size(); + size_t getFileSize() override; void setProgressCallback(ContextPtr context); diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index c1b2ec7db0f..cf8db260b28 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -217,20 +217,15 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence) return offset; } -std::optional ReadBufferFromS3::getFileSize() +size_t ReadBufferFromS3::getFileSize() { if (file_size) - return file_size; + return *file_size; auto object_size = S3::getObjectSize(client_ptr, bucket, key, version_id, false); - if (!object_size) - { - return std::nullopt; - } - file_size = object_size; - return file_size; + return *file_size; } off_t ReadBufferFromS3::getPosition() @@ -334,7 +329,7 @@ off_t ReadBufferS3Factory::seek(off_t off, [[maybe_unused]] int whence) return off; } -std::optional ReadBufferS3Factory::getFileSize() +size_t ReadBufferS3Factory::getFileSize() { return object_size; } diff --git a/src/IO/ReadBufferFromS3.h b/src/IO/ReadBufferFromS3.h index 87efe5f3504..6d9c0925e68 100644 --- a/src/IO/ReadBufferFromS3.h +++ b/src/IO/ReadBufferFromS3.h @@ -65,7 +65,7 @@ public: off_t getPosition() override; - std::optional getFileSize() override; + size_t getFileSize() override; void setReadUntilPosition(size_t position) override; @@ -122,7 +122,7 @@ public: off_t seek(off_t off, [[maybe_unused]] int whence) override; - std::optional getFileSize() override; + size_t getFileSize() override; String getFileName() const override { return bucket + "/" + key; } diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 2bb56bfe4fa..c450ffe1747 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -199,7 +199,7 @@ namespace detail } } - std::optional getFileSize() override + size_t getFileSize() override { if (read_range.end) return *read_range.end - getRangeBegin(); @@ -221,7 +221,7 @@ namespace detail if (response.hasContentLength()) read_range.end = getRangeBegin() + response.getContentLength(); - return read_range.end; + return *read_range.end; } String getFileName() const override { return uri.toString(); } @@ -749,7 +749,7 @@ public: return off; } - std::optional getFileSize() override { return total_object_size; } + size_t getFileSize() override { return total_object_size; } String getFileName() const override { return uri.toString(); } diff --git a/src/IO/WithFileSize.cpp b/src/IO/WithFileSize.cpp index c05a32291e3..28542db7a73 100644 --- a/src/IO/WithFileSize.cpp +++ b/src/IO/WithFileSize.cpp @@ -7,18 +7,23 @@ namespace DB { +namespace ErrorCodes +{ + extern const int UNKNOWN_FILE_SIZE; +} + template -static std::optional getFileSize(T & in) +static size_t getFileSize(T & in) { if (auto * with_file_size = dynamic_cast(&in)) { return with_file_size->getFileSize(); } - return std::nullopt; + throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size"); } -std::optional getFileSizeFromReadBuffer(ReadBuffer & in) +size_t getFileSizeFromReadBuffer(ReadBuffer & in) { if (auto * delegate = dynamic_cast(&in)) { diff --git a/src/IO/WithFileSize.h b/src/IO/WithFileSize.h index b0d0517b23a..060626faed2 100644 --- a/src/IO/WithFileSize.h +++ b/src/IO/WithFileSize.h @@ -10,12 +10,12 @@ class ReadBuffer; class WithFileSize { public: - virtual std::optional getFileSize() = 0; + virtual size_t getFileSize() = 0; virtual ~WithFileSize() = default; }; bool isBufferWithFileSize(const ReadBuffer & in); -std::optional getFileSizeFromReadBuffer(ReadBuffer & in); +size_t getFileSizeFromReadBuffer(ReadBuffer & in); } diff --git a/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp b/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp index 85da576b417..459f486af18 100644 --- a/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp +++ b/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp @@ -179,7 +179,7 @@ void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock() bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock() { ReadBufferFromFile reader(out->getFileName()); - auto fsize = reader.size(); + auto fsize = reader.getFileSize(); if (fsize > 3) { std::array result; diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp index c8e8cf900f4..a30d6002954 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp @@ -64,8 +64,6 @@ arrow::Result RandomAccessFileFromSeekableReadBuffer::GetSize() { if (isBufferWithFileSize(in)) file_size = getFileSizeFromReadBuffer(in); - if (!file_size) - throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out size of file"); } return arrow::Result(*file_size); } diff --git a/src/Storages/Cache/ExternalDataSourceCache.h b/src/Storages/Cache/ExternalDataSourceCache.h index 18d3d5ca699..937801c4767 100644 --- a/src/Storages/Cache/ExternalDataSourceCache.h +++ b/src/Storages/Cache/ExternalDataSourceCache.h @@ -53,7 +53,7 @@ public: bool nextImpl() override; off_t seek(off_t off, int whence) override; off_t getPosition() override; - std::optional getFileSize() override { return remote_file_size; } + size_t getFileSize() override { return remote_file_size; } private: std::unique_ptr local_file_holder; diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index 45905fdcaf4..d22e9efaa90 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -16,6 +16,7 @@ namespace ErrorCodes extern const int CANNOT_SEEK_THROUGH_FILE; extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int LOGICAL_ERROR; + extern const int UNKNOWN_FILE_SIZE; } ReadBufferFromHDFS::~ReadBufferFromHDFS() = default; @@ -58,11 +59,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory getFileSize() const + size_t getFileSize() const { auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str()); if (!file_info) - return std::nullopt; + throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path); return file_info->mSize; } @@ -130,7 +131,7 @@ ReadBufferFromHDFS::ReadBufferFromHDFS( { } -std::optional ReadBufferFromHDFS::getFileSize() +size_t ReadBufferFromHDFS::getFileSize() { return impl->getFileSize(); } diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h index 0eeec221ff8..be278600fab 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/HDFS/ReadBufferFromHDFS.h @@ -37,7 +37,7 @@ public: off_t getPosition() override; - std::optional getFileSize() override; + size_t getFileSize() override; size_t getFileOffsetOfBufferEnd() const override;