Pass size of the file to the readers

v2: s/size/file_size/g
This commit is contained in:
Azat Khuzhin 2022-01-03 19:05:46 +03:00
parent b5ee8cb694
commit e548dae8d9
9 changed files with 111 additions and 47 deletions

View File

@ -30,8 +30,10 @@ AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
size_t buf_size,
int flags,
char * existing_memory,
size_t alignment)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment), file_name(file_name_)
size_t alignment,
std::optional<size_t> file_size_)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment, file_size_)
, file_name(file_name_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
@ -62,10 +64,10 @@ AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
const std::string & original_file_name,
size_t buf_size,
char * existing_memory,
size_t alignment)
:
AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, fd_, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
size_t alignment,
std::optional<size_t> file_size_)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, fd_, buf_size, existing_memory, alignment, file_size_)
, file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{
fd_ = -1;
}

View File

@ -14,17 +14,25 @@ protected:
public:
explicit AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_, Int32 priority_,
const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0);
AsynchronousReaderPtr reader_,
Int32 priority_,
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt);
/// Use pre-opened file descriptor.
explicit AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_, Int32 priority_,
AsynchronousReaderPtr reader_,
Int32 priority_,
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, size_t alignment = 0);
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt);
~AsynchronousReadBufferFromFile() override;
@ -48,11 +56,16 @@ private:
public:
AsynchronousReadBufferFromFileWithDescriptorsCache(
AsynchronousReaderPtr reader_, Int32 priority_,
const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment),
file_name(file_name_)
AsynchronousReaderPtr reader_,
Int32 priority_,
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment, file_size_)
, file_name(file_name_)
{
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();

View File

@ -35,10 +35,18 @@ protected:
public:
AsynchronousReadBufferFromFileDescriptor(
AsynchronousReaderPtr reader_, Int32 priority_,
int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
AsynchronousReaderPtr reader_,
Int32 priority_,
int fd_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, reader(std::move(reader_))
, priority(priority_)
, required_alignment(alignment)
, fd(fd_)
{
prefetch_buffer.alignment = alignment;
}
@ -64,7 +72,7 @@ public:
void rewind();
private:
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
std::future<IAsynchronousReader::Result> readInto(char * data, size_t file_size_);
};
}

View File

@ -28,8 +28,9 @@ ReadBufferFromFile::ReadBufferFromFile(
size_t buf_size,
int flags,
char * existing_memory,
size_t alignment)
: ReadBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment), file_name(file_name_)
size_t alignment,
std::optional<size_t> file_size_)
: ReadBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment, file_size_), file_name(file_name_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
@ -58,10 +59,10 @@ ReadBufferFromFile::ReadBufferFromFile(
const std::string & original_file_name,
size_t buf_size,
char * existing_memory,
size_t alignment)
:
ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
size_t alignment,
std::optional<size_t> file_size_)
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, file_size_)
, file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{
fd_ = -1;
}

View File

@ -23,15 +23,22 @@ protected:
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
public:
explicit ReadBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0);
explicit ReadBufferFromFile(
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt);
/// Use pre-opened file descriptor.
explicit ReadBufferFromFile(
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, size_t alignment = 0);
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt);
~ReadBufferFromFile() override;
@ -50,9 +57,14 @@ public:
class ReadBufferFromFilePRead : public ReadBufferFromFile
{
public:
ReadBufferFromFilePRead(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFile(file_name_, buf_size, flags, existing_memory, alignment)
ReadBufferFromFilePRead(
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: ReadBufferFromFile(file_name_, buf_size, flags, existing_memory, alignment, file_size_)
{
use_pread = true;
}
@ -68,10 +80,15 @@ private:
OpenedFileCache::OpenedFilePtr file;
public:
ReadBufferFromFilePReadWithDescriptorsCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment),
file_name(file_name_)
ReadBufferFromFilePReadWithDescriptorsCache(
const std::string & file_name_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment, file_size_)
, file_name(file_name_)
{
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();

View File

@ -7,8 +7,13 @@ ReadBufferFromFileBase::ReadBufferFromFileBase() : BufferWithOwnMemory<SeekableR
{
}
ReadBufferFromFileBase::ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment)
ReadBufferFromFileBase::ReadBufferFromFileBase(
size_t buf_size,
char * existing_memory,
size_t alignment,
std::optional<size_t> file_size_)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size, existing_memory, alignment)
, file_size(file_size_)
{
}

View File

@ -5,6 +5,7 @@
#include <base/time.h>
#include <functional>
#include <utility>
#include <string>
#include <sys/stat.h>
@ -22,7 +23,11 @@ class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
ReadBufferFromFileBase();
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
ReadBufferFromFileBase(
size_t buf_size,
char * existing_memory,
size_t alignment,
std::optional<size_t> file_size_ = std::nullopt);
~ReadBufferFromFileBase() override;
virtual std::string getFileName() const = 0;
@ -44,6 +49,7 @@ public:
}
protected:
std::optional<size_t> file_size;
ProfileCallback profile_callback;
clockid_t clock_type{};
};

View File

@ -27,8 +27,15 @@ protected:
std::string getFileName() const override;
public:
ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), required_alignment(alignment), fd(fd_)
ReadBufferFromFileDescriptor(
int fd_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, required_alignment(alignment)
, fd(fd_)
{
}
@ -63,8 +70,13 @@ private:
class ReadBufferFromFileDescriptorPRead : public ReadBufferFromFileDescriptor
{
public:
ReadBufferFromFileDescriptorPRead(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment)
ReadBufferFromFileDescriptorPRead(
int fd_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt)
: ReadBufferFromFileDescriptor(fd_, buf_size, existing_memory, alignment, file_size_)
{
use_pread = true;
}

View File

@ -63,23 +63,23 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
if (settings.local_fs_method == LocalFSReadMethod::read)
{
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment);
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment, size);
}
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
{
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment);
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment, size);
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment);
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, size);
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolReader>(16, 1000000);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment);
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, size);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");