Continuation

This commit is contained in:
kssenii 2022-05-25 16:49:40 +02:00
parent 35d2dec8d5
commit 0556237b68
40 changed files with 147 additions and 110 deletions

View File

@ -1121,7 +1121,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
if (need_render_progress && have_data_in_stdin) if (need_render_progress && have_data_in_stdin)
{ {
/// Set total_bytes_to_read for current fd. /// Set total_bytes_to_read for current fd.
FileProgress file_progress(0, std_in.size()); FileProgress file_progress(0, std_in.getFileSize());
progress_indication.updateProgress(Progress(file_progress)); progress_indication.updateProgress(Progress(file_progress));
/// Set callback to be called on file progress. /// Set callback to be called on file progress.

View File

@ -107,6 +107,13 @@ size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_l
return downloaded_size; return downloaded_size;
} }
void FileSegment::resizeToDownloadedSize(
std::lock_guard<std::mutex> & /* segment_lock */, std::lock_guard<std::mutex> & /* cache_lock */)
{
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
segment_range = Range(segment_range.left, segment_range.left + downloaded_size - 1);
}
String FileSegment::getCallerId() String FileSegment::getCallerId()
{ {
if (!CurrentThread::isInitialized() if (!CurrentThread::isInitialized()
@ -420,7 +427,7 @@ void FileSegment::completeBatchAndResetDownloader()
cv.notify_all(); cv.notify_all();
} }
void FileSegment::complete(State state) void FileSegment::complete(State state, bool auto_resize)
{ {
std::lock_guard cache_lock(cache->mutex); std::lock_guard cache_lock(cache->mutex);
std::lock_guard segment_lock(mutex); std::lock_guard segment_lock(mutex);
@ -445,8 +452,22 @@ void FileSegment::complete(State state)
} }
if (state == State::DOWNLOADED) if (state == State::DOWNLOADED)
{
if (auto_resize && downloaded_size != range().size())
{
resizeToDownloadedSize(segment_lock, cache_lock);
}
/// Update states and finalize cache write buffer.
setDownloaded(segment_lock); setDownloaded(segment_lock);
if (downloaded_size != range().size())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Cannot complete file segment as DOWNLOADED, because downloaded size ({}) does not match expected size ({})",
downloaded_size, range().size());
}
download_state = state; download_state = state;
try try
@ -539,8 +560,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & cache_lock, std::lo
* it only when nobody needs it. * it only when nobody needs it.
*/ */
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), current_downloaded_size); resizeToDownloadedSize(segment_lock, cache_lock);
cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock);
} }
markAsDetached(segment_lock); markAsDetached(segment_lock);
@ -819,7 +839,7 @@ bool FileSegmentRangeWriter::write(char * data, size_t size, size_t offset, bool
if ((*current_file_segment_it)->getAvailableSize() == 0) if ((*current_file_segment_it)->getAvailableSize() == 0)
{ {
(*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED); (*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED, true);
on_complete_file_segment_func(*current_file_segment_it); on_complete_file_segment_func(*current_file_segment_it);
current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent); current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent);
} }
@ -873,7 +893,7 @@ void FileSegmentRangeWriter::finalize()
if ((*current_file_segment_it)->getDownloadedSize() > 0) if ((*current_file_segment_it)->getDownloadedSize() > 0)
{ {
(*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED); (*current_file_segment_it)->complete(FileSegment::State::DOWNLOADED, true);
on_complete_file_segment_func(*current_file_segment_it); on_complete_file_segment_func(*current_file_segment_it);
} }
else else

View File

@ -133,7 +133,7 @@ public:
void completeBatchAndResetDownloader(); void completeBatchAndResetDownloader();
void complete(State state); void complete(State state, bool auto_resize = false);
String getInfoForLog() const; String getInfoForLog() const;
@ -166,6 +166,7 @@ private:
bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return is_detached; } bool isDetached(std::lock_guard<std::mutex> & /* segment_lock */) const { return is_detached; }
void markAsDetached(std::lock_guard<std::mutex> & segment_lock); void markAsDetached(std::lock_guard<std::mutex> & segment_lock);
[[noreturn]] void throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const; [[noreturn]] void throwIfDetachedUnlocked(std::lock_guard<std::mutex> & segment_lock) const;
void resizeToDownloadedSize(std::lock_guard<std::mutex> & segment_lock, std::lock_guard<std::mutex> & cache_lock);
void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const; void assertDetachedStatus(std::lock_guard<std::mutex> & segment_lock) const;
void assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const; void assertNotDetached(std::lock_guard<std::mutex> & segment_lock) const;
@ -191,7 +192,7 @@ private:
void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock); void resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock);
const Range segment_range; Range segment_range;
State download_state; State download_state;
String downloader_id; String downloader_id;

View File

@ -34,7 +34,6 @@ public:
Key() = default; Key() = default;
explicit Key(const UInt128 & key_) : key(key_) {} explicit Key(const UInt128 & key_) : key(key_) {}
bool operator<(const Key & other) const { return key < other.key; }
bool operator==(const Key & other) const { return key == other.key; } bool operator==(const Key & other) const { return key == other.key; }
}; };
@ -123,13 +122,6 @@ protected:
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0; std::lock_guard<std::mutex> & segment_lock) = 0;
/// If file segment was partially downloaded and then space reservation fails (because of no
/// space left), then update corresponding cache cell metadata (file segment size).
virtual void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) = 0;
virtual FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent, std::lock_guard<std::mutex> & cache_lock) = 0; virtual FileSegmentPtr setDownloading(const Key & key, size_t offset, size_t size, bool is_persistent, std::lock_guard<std::mutex> & cache_lock) = 0;
void assertInitialized() const; void assertInitialized() const;

View File

@ -742,39 +742,6 @@ void LRUFileCache::loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_l
#endif #endif
} }
void LRUFileCache::reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
{
/**
* In case file was partially downloaded and it's download cannot be continued
* because of no space left in cache, we need to be able to cut cell's size to downloaded_size.
*/
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"No cell found for key: {}, offset: {}",
key.toString(), offset);
}
auto file_segment = cell->file_segment;
size_t downloaded_size = file_segment->downloaded_size;
if (downloaded_size == file_segment->range().size())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Nothing to reduce, file segment fully downloaded, key: {}, offset: {}",
key.toString(), offset);
}
cell->file_segment = std::make_shared<FileSegment>(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED, file_segment->isPersistent());
}
bool LRUFileCache::isLastFileSegmentHolder( bool LRUFileCache::isLastFileSegmentHolder(
const Key & key, size_t offset, const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */) std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)

View File

@ -142,11 +142,6 @@ private:
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) override; std::lock_guard<std::mutex> & segment_lock) override;
void reduceSizeToDownloaded(
const Key & key, size_t offset,
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock) override;
size_t getAvailableCacheSize() const; size_t getAvailableCacheSize() const;
void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock); void loadCacheInfoIntoMemory(std::lock_guard<std::mutex> & cache_lock);

View File

@ -18,6 +18,7 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -30,6 +31,7 @@ namespace ErrorCodes
extern const int SYSTEM_ERROR; extern const int SYSTEM_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int CANNOT_STAT; extern const int CANNOT_STAT;
extern const int CANNOT_FSTAT;
extern const int CANNOT_STATVFS; extern const int CANNOT_STATVFS;
extern const int PATH_ACCESS_DENIED; extern const int PATH_ACCESS_DENIED;
extern const int CANNOT_CREATE_FILE; extern const int CANNOT_CREATE_FILE;
@ -216,6 +218,20 @@ bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path
return fileOrSymlinkPathStartsWith(filesystem_path, filesystem_prefix_path); return fileOrSymlinkPathStartsWith(filesystem_path, filesystem_prefix_path);
} }
size_t getSizeFromFileDescriptor(int fd, const String & file_name)
{
struct stat buf;
int res = fstat(fd, &buf);
if (-1 == res)
{
throwFromErrnoWithPath(
"Cannot execute fstat" + (file_name.empty() ? "" : " file: " + file_name),
file_name,
ErrorCodes::CANNOT_FSTAT);
}
return buf.st_size;
}
} }

View File

@ -64,6 +64,8 @@ bool pathStartsWith(const String & path, const String & prefix_path);
/// (Path is made absolute and normalized.) /// (Path is made absolute and normalized.)
bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path); bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path);
size_t getSizeFromFileDescriptor(int fd, const String & file_name = "");
} }
namespace FS namespace FS

View File

@ -67,7 +67,7 @@ String AsynchronousReadIndirectBufferFromRemoteFS::getInfoForLog()
return impl->getInfoForLog(); return impl->getInfoForLog();
} }
std::optional<size_t> AsynchronousReadIndirectBufferFromRemoteFS::getFileSize() size_t AsynchronousReadIndirectBufferFromRemoteFS::getFileSize()
{ {
return impl->getFileSize(); return impl->getFileSize();
} }

View File

@ -27,7 +27,7 @@ struct ReadSettings;
* *
* We pass either `memory` or `prefetch_buffer` through all this chain and return it back. * We pass either `memory` or `prefetch_buffer` through all this chain and return it back.
*/ */
class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase, public WithFileSize class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{ {
public: public:
explicit AsynchronousReadIndirectBufferFromRemoteFS( explicit AsynchronousReadIndirectBufferFromRemoteFS(
@ -51,7 +51,7 @@ public:
String getInfoForLog() override; String getInfoForLog() override;
std::optional<size_t> getFileSize() override; size_t getFileSize() override;
bool isIntegratedWithFilesystemCache() const override { return true; } bool isIntegratedWithFilesystemCache() const override { return true; }

View File

@ -40,8 +40,9 @@ CachedReadBufferFromFile::CachedReadBufferFromFile(
RemoteFSFileReaderCreator remote_file_reader_creator_, RemoteFSFileReaderCreator remote_file_reader_creator_,
const ReadSettings & settings_, const ReadSettings & settings_,
const String & query_id_, const String & query_id_,
size_t read_until_position_) size_t file_size_,
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0) std::optional<size_t> read_until_position_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
#ifndef NDEBUG #ifndef NDEBUG
, log(&Poco::Logger::get("CachedReadBufferFromFile(" + source_file_path_ + ")")) , log(&Poco::Logger::get("CachedReadBufferFromFile(" + source_file_path_ + ")"))
#else #else
@ -51,7 +52,7 @@ CachedReadBufferFromFile::CachedReadBufferFromFile(
, source_file_path(source_file_path_) , source_file_path(source_file_path_)
, cache(cache_) , cache(cache_)
, settings(settings_) , settings(settings_)
, read_until_position(read_until_position_) , read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, remote_file_reader_creator(remote_file_reader_creator_) , remote_file_reader_creator(remote_file_reader_creator_)
, is_persistent(settings_.cache_file_as_persistent) , is_persistent(settings_.cache_file_as_persistent)
, query_id(query_id_) , query_id(query_id_)
@ -128,8 +129,8 @@ SeekableReadBufferPtr CachedReadBufferFromFile::getCacheReadBuffer(size_t offset
local_read_settings.local_fs_method = LocalFSReadMethod::pread; local_read_settings.local_fs_method = LocalFSReadMethod::pread;
auto buf = createReadBufferFromFileBase(path, local_read_settings); auto buf = createReadBufferFromFileBase(path, local_read_settings);
auto * from_fd = dynamic_cast<ReadBufferFromFileDescriptor*>(buf.get()); auto * from_fd = dynamic_cast<ReadBufferFromFileBase*>(buf.get());
if (from_fd && from_fd->size() == 0) if (from_fd && from_fd->getFileSize() == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {}", path); throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {}", path);
return buf; return buf;
@ -407,8 +408,8 @@ SeekableReadBufferPtr CachedReadBufferFromFile::getImplementationBuffer(FileSegm
case ReadType::CACHED: case ReadType::CACHED:
{ {
#ifndef NDEBUG #ifndef NDEBUG
auto * file_reader = dynamic_cast<ReadBufferFromFileDescriptor *>(read_buffer_for_file_segment.get()); auto * file_reader = dynamic_cast<ReadBufferFromFileBase *>(read_buffer_for_file_segment.get());
size_t file_size = file_reader->size(); size_t file_size = file_reader->getFileSize();
if (file_size == 0 || range.left + file_size <= file_offset_of_buffer_end) if (file_size == 0 || range.left + file_size <= file_offset_of_buffer_end)
throw Exception( throw Exception(
@ -835,9 +836,9 @@ bool CachedReadBufferFromFile::nextImplStep()
if (!result) if (!result)
{ {
#ifndef NDEBUG #ifndef NDEBUG
if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFileDescriptor *>(implementation_buffer.get())) if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFileBase *>(implementation_buffer.get()))
{ {
auto cache_file_size = cache_file_reader->size(); auto cache_file_size = cache_file_reader->getFileSize();
if (cache_file_size == 0) if (cache_file_size == 0)
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {} (just before actual read)", cache_file_size); ErrorCodes::LOGICAL_ERROR, "Attempt to read from an empty cache file: {} (just before actual read)", cache_file_size);
@ -950,8 +951,8 @@ bool CachedReadBufferFromFile::nextImplStep()
if (size == 0 && file_offset_of_buffer_end < read_until_position) if (size == 0 && file_offset_of_buffer_end < read_until_position)
{ {
std::optional<size_t> cache_file_size; std::optional<size_t> cache_file_size;
if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFileDescriptor *>(implementation_buffer.get())) if (auto * cache_file_reader = dynamic_cast<ReadBufferFromFileBase *>(implementation_buffer.get()))
cache_file_size = cache_file_reader->size(); cache_file_size = cache_file_reader->getFileSize();
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,

View File

@ -29,7 +29,8 @@ public:
RemoteFSFileReaderCreator remote_file_reader_creator_, RemoteFSFileReaderCreator remote_file_reader_creator_,
const ReadSettings & settings_, const ReadSettings & settings_,
const String & query_id_, const String & query_id_,
size_t read_until_position_); size_t file_size_,
std::optional<size_t> read_until_position_ = std::nullopt);
~CachedReadBufferFromFile() override; ~CachedReadBufferFromFile() override;

View File

@ -63,7 +63,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(con
{ {
return std::make_shared<CachedReadBufferFromFile>( return std::make_shared<CachedReadBufferFromFile>(
remote_path, settings.remote_fs_cache, remote_file_reader_creator, remote_path, settings.remote_fs_cache, remote_file_reader_creator,
settings, query_id, read_until_position ? read_until_position : file_size); settings, query_id, file_size, read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
} }
return remote_file_reader_creator(); return remote_file_reader_creator();

View File

@ -19,6 +19,10 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
{ {
} }
size_t ReadIndirectBufferFromRemoteFS::getFileSize()
{
return impl->getFileSize();
}
off_t ReadIndirectBufferFromRemoteFS::getPosition() off_t ReadIndirectBufferFromRemoteFS::getPosition()
{ {

View File

@ -33,6 +33,8 @@ public:
bool isIntegratedWithFilesystemCache() const override { return true; } bool isIntegratedWithFilesystemCache() const override { return true; }
size_t getFileSize() override;
private: private:
bool nextImpl() override; bool nextImpl() override;

View File

@ -7,6 +7,7 @@
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <IO/AsynchronousReadBufferFromFileDescriptor.h> #include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/filesystemHelpers.h>
namespace ProfileEvents namespace ProfileEvents
@ -242,4 +243,9 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
file_offset_of_buffer_end = 0; file_offset_of_buffer_end = 0;
} }
size_t AsynchronousReadBufferFromFileDescriptor::getFileSize()
{
return getSizeFromFileDescriptor(fd, getFileName());
}
} }

View File

@ -64,9 +64,10 @@ public:
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind(); void rewind();
size_t getFileSize() override;
private: private:
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size); std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
}; };
} }

View File

@ -21,7 +21,7 @@ public:
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
off_t getPosition() override; off_t getPosition() override;
std::optional<size_t> getFileSize() override { return total_size; } size_t getFileSize() override { return total_size; }
private: private:
bool nextImpl() override; bool nextImpl() override;

View File

@ -6,6 +6,7 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/filesystemHelpers.h>
#include <base/getPageSize.h> #include <base/getPageSize.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/MMapReadBufferFromFileDescriptor.h> #include <IO/MMapReadBufferFromFileDescriptor.h>
@ -86,4 +87,8 @@ off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence)
return new_pos; return new_pos;
} }
size_t MMapReadBufferFromFileDescriptor::getFileSize()
{
return getSizeFromFileDescriptor(getFD(), getFileName());
}
} }

View File

@ -33,9 +33,12 @@ public:
void finish(); void finish();
off_t getPosition() override; off_t getPosition() override;
std::string getFileName() const override; std::string getFileName() const override;
int getFD() const; int getFD() const;
size_t getFileSize() override;
}; };
} }

View File

@ -142,7 +142,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
return offset; return offset;
} }
std::optional<size_t> ParallelReadBuffer::getFileSize() size_t ParallelReadBuffer::getFileSize()
{ {
return reader_factory->getFileSize(); return reader_factory->getFileSize();
} }

View File

@ -43,7 +43,7 @@ public:
~ParallelReadBuffer() override { finishAndWait(); } ~ParallelReadBuffer() override { finishAndWait(); }
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
std::optional<size_t> getFileSize(); size_t getFileSize();
off_t getPosition() override; off_t getPosition() override;
const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; } const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }

View File

@ -19,6 +19,7 @@ private:
std::string getFileName() const override { return "<empty>"; } std::string getFileName() const override { return "<empty>"; }
off_t seek(off_t /*off*/, int /*whence*/) override { return 0; } off_t seek(off_t /*off*/, int /*whence*/) override { return 0; }
off_t getPosition() override { return 0; } off_t getPosition() override { return 0; }
size_t getFileSize() override { return 0; }
}; };
} }

View File

@ -30,6 +30,8 @@ public:
void setReadUntilEnd() override { in->setReadUntilEnd(); } void setReadUntilEnd() override { in->setReadUntilEnd(); }
size_t getFileSize() override { return in->getFileSize(); }
private: private:
bool nextImpl() override; bool nextImpl() override;

View File

@ -3,6 +3,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0) ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableReadBuffer>(0)
{ {
} }
@ -19,4 +24,11 @@ ReadBufferFromFileBase::ReadBufferFromFileBase(
ReadBufferFromFileBase::~ReadBufferFromFileBase() = default; ReadBufferFromFileBase::~ReadBufferFromFileBase() = default;
size_t ReadBufferFromFileBase::getFileSize()
{
if (file_size)
return *file_size;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getFileSize() is not implemented for read buffer");
}
} }

View File

@ -20,7 +20,8 @@
namespace DB namespace DB
{ {
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileName
class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileName, public WithFileSize
{ {
public: public:
ReadBufferFromFileBase(); ReadBufferFromFileBase();
@ -48,6 +49,8 @@ public:
clock_type = clock_type_; clock_type = clock_type_;
} }
size_t getFileSize() override;
protected: protected:
std::optional<size_t> file_size; std::optional<size_t> file_size;
ProfileCallback profile_callback; ProfileCallback profile_callback;

View File

@ -53,4 +53,9 @@ bool ReadBufferFromFileDecorator::nextImpl()
return result; return result;
} }
size_t ReadBufferFromFileDecorator::getFileSize()
{
return getFileSizeFromReadBuffer(*impl);
}
} }

View File

@ -29,6 +29,8 @@ public:
bool isIntegratedWithFilesystemCache() const override { return impl->isIntegratedWithFilesystemCache(); } bool isIntegratedWithFilesystemCache() const override { return impl->isIntegratedWithFilesystemCache(); }
size_t getFileSize() override;
protected: protected:
std::unique_ptr<SeekableReadBuffer> impl; std::unique_ptr<SeekableReadBuffer> impl;
String file_name; String file_name;

View File

@ -9,6 +9,7 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/Progress.h> #include <IO/Progress.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -246,13 +247,9 @@ bool ReadBufferFromFileDescriptor::poll(size_t timeout_microseconds)
} }
off_t ReadBufferFromFileDescriptor::size() size_t ReadBufferFromFileDescriptor::getFileSize()
{ {
struct stat buf; return getSizeFromFileDescriptor(fd, getFileName());
int res = fstat(fd, &buf);
if (-1 == res)
throwFromErrnoWithPath("Cannot execute fstat " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSTAT);
return buf.st_size;
} }

View File

@ -58,7 +58,7 @@ public:
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind(); void rewind();
off_t size(); size_t getFileSize() override;
void setProgressCallback(ContextPtr context); void setProgressCallback(ContextPtr context);

View File

@ -217,20 +217,15 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset; return offset;
} }
std::optional<size_t> ReadBufferFromS3::getFileSize() size_t ReadBufferFromS3::getFileSize()
{ {
if (file_size) if (file_size)
return file_size; return *file_size;
auto object_size = S3::getObjectSize(client_ptr, bucket, key, version_id, false); auto object_size = S3::getObjectSize(client_ptr, bucket, key, version_id, false);
if (!object_size)
{
return std::nullopt;
}
file_size = object_size; file_size = object_size;
return file_size; return *file_size;
} }
off_t ReadBufferFromS3::getPosition() off_t ReadBufferFromS3::getPosition()
@ -334,7 +329,7 @@ off_t ReadBufferS3Factory::seek(off_t off, [[maybe_unused]] int whence)
return off; return off;
} }
std::optional<size_t> ReadBufferS3Factory::getFileSize() size_t ReadBufferS3Factory::getFileSize()
{ {
return object_size; return object_size;
} }

View File

@ -65,7 +65,7 @@ public:
off_t getPosition() override; off_t getPosition() override;
std::optional<size_t> getFileSize() override; size_t getFileSize() override;
void setReadUntilPosition(size_t position) override; void setReadUntilPosition(size_t position) override;
@ -122,7 +122,7 @@ public:
off_t seek(off_t off, [[maybe_unused]] int whence) override; off_t seek(off_t off, [[maybe_unused]] int whence) override;
std::optional<size_t> getFileSize() override; size_t getFileSize() override;
String getFileName() const override { return bucket + "/" + key; } String getFileName() const override { return bucket + "/" + key; }

View File

@ -199,7 +199,7 @@ namespace detail
} }
} }
std::optional<size_t> getFileSize() override size_t getFileSize() override
{ {
if (read_range.end) if (read_range.end)
return *read_range.end - getRangeBegin(); return *read_range.end - getRangeBegin();
@ -221,7 +221,7 @@ namespace detail
if (response.hasContentLength()) if (response.hasContentLength())
read_range.end = getRangeBegin() + response.getContentLength(); read_range.end = getRangeBegin() + response.getContentLength();
return read_range.end; return *read_range.end;
} }
String getFileName() const override { return uri.toString(); } String getFileName() const override { return uri.toString(); }
@ -749,7 +749,7 @@ public:
return off; return off;
} }
std::optional<size_t> getFileSize() override { return total_object_size; } size_t getFileSize() override { return total_object_size; }
String getFileName() const override { return uri.toString(); } String getFileName() const override { return uri.toString(); }

View File

@ -7,18 +7,23 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int UNKNOWN_FILE_SIZE;
}
template <typename T> template <typename T>
static std::optional<size_t> getFileSize(T & in) static size_t getFileSize(T & in)
{ {
if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in)) if (auto * with_file_size = dynamic_cast<WithFileSize *>(&in))
{ {
return with_file_size->getFileSize(); return with_file_size->getFileSize();
} }
return std::nullopt; throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size");
} }
std::optional<size_t> getFileSizeFromReadBuffer(ReadBuffer & in) size_t getFileSizeFromReadBuffer(ReadBuffer & in)
{ {
if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in)) if (auto * delegate = dynamic_cast<ReadBufferFromFileDecorator *>(&in))
{ {

View File

@ -10,12 +10,12 @@ class ReadBuffer;
class WithFileSize class WithFileSize
{ {
public: public:
virtual std::optional<size_t> getFileSize() = 0; virtual size_t getFileSize() = 0;
virtual ~WithFileSize() = default; virtual ~WithFileSize() = default;
}; };
bool isBufferWithFileSize(const ReadBuffer & in); bool isBufferWithFileSize(const ReadBuffer & in);
std::optional<size_t> getFileSizeFromReadBuffer(ReadBuffer & in); size_t getFileSizeFromReadBuffer(ReadBuffer & in);
} }

View File

@ -179,7 +179,7 @@ void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock() bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock()
{ {
ReadBufferFromFile reader(out->getFileName()); ReadBufferFromFile reader(out->getFileName());
auto fsize = reader.size(); auto fsize = reader.getFileSize();
if (fsize > 3) if (fsize > 3)
{ {
std::array<char, 3> result; std::array<char, 3> result;

View File

@ -64,8 +64,6 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::GetSize()
{ {
if (isBufferWithFileSize(in)) if (isBufferWithFileSize(in))
file_size = getFileSizeFromReadBuffer(in); file_size = getFileSizeFromReadBuffer(in);
if (!file_size)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out size of file");
} }
return arrow::Result<int64_t>(*file_size); return arrow::Result<int64_t>(*file_size);
} }

View File

@ -53,7 +53,7 @@ public:
bool nextImpl() override; bool nextImpl() override;
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
off_t getPosition() override; off_t getPosition() override;
std::optional<size_t> getFileSize() override { return remote_file_size; } size_t getFileSize() override { return remote_file_size; }
private: private:
std::unique_ptr<LocalFileHolder> local_file_holder; std::unique_ptr<LocalFileHolder> local_file_holder;

View File

@ -16,6 +16,7 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE; extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FILE_SIZE;
} }
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default; ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
@ -58,11 +59,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin); hdfsCloseFile(fs.get(), fin);
} }
std::optional<size_t> getFileSize() const size_t getFileSize() const
{ {
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str()); auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info) if (!file_info)
return std::nullopt; throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
return file_info->mSize; return file_info->mSize;
} }
@ -130,7 +131,7 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
{ {
} }
std::optional<size_t> ReadBufferFromHDFS::getFileSize() size_t ReadBufferFromHDFS::getFileSize()
{ {
return impl->getFileSize(); return impl->getFileSize();
} }

View File

@ -37,7 +37,7 @@ public:
off_t getPosition() override; off_t getPosition() override;
std::optional<size_t> getFileSize() override; size_t getFileSize() override;
size_t getFileOffsetOfBufferEnd() const override; size_t getFileOffsetOfBufferEnd() const override;