dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-03-16 17:56:12 +03:00
parent ee22aac2fe
commit 9c9adf878b
11 changed files with 117 additions and 30 deletions

View File

@ -0,0 +1,20 @@
#pragma once
#include <string>
#include <fcntl.h>
#include <sys/types.h>
namespace DB
{
class IReadFileOperations
{
public:
virtual ~IReadFileOperations() = default;
virtual off_t seek(off_t off, int whence) = 0;
virtual off_t getPositionInFile() = 0;
virtual std::string getFileName() const noexcept = 0;
virtual int getFD() const noexcept = 0;
};
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <string>
#include <fcntl.h>
#include <sys/types.h>
namespace DB
{
class IWriteFileOperations
{
public:
virtual ~IWriteFileOperations() = default;
virtual off_t seek(off_t off, int whence) = 0;
virtual off_t getPositionInFile() = 0;
virtual void truncate(off_t length) = 0;
virtual void sync() = 0;
virtual std::string getFileName() const noexcept = 0;
virtual int getFD() const noexcept = 0;
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <DB/IO/ReadBufferFromFileBase.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <statdaemons/AIO.h>
@ -15,7 +16,7 @@ namespace DB
/** Класс для асинхронного чтения данных.
* Все размеры и смещения должны быть кратны DEFAULT_AIO_FILE_BLOCK_SIZE байтам.
*/
class ReadBufferAIO : public BufferWithOwnMemory<ReadBuffer>
class ReadBufferAIO : public ReadBufferFromFileBase
{
public:
ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
@ -26,10 +27,10 @@ public:
ReadBufferAIO & operator=(const ReadBufferAIO &) = delete;
void setMaxBytes(size_t max_bytes_read_);
off_t seek(off_t off, int whence = SEEK_SET);
off_t getPositionInFile();
std::string getFileName() const noexcept { return filename; }
int getFD() const noexcept { return fd; }
off_t seek(off_t off, int whence) override;
off_t getPositionInFile() override;
std::string getFileName() const noexcept override { return filename; }
int getFD() const noexcept override { return fd; }
private:
off_t getPositionInFileRelaxed() const noexcept;

View File

@ -33,7 +33,7 @@ public:
close(fd);
}
virtual std::string getFileName()
std::string getFileName() const noexcept override
{
return file_name;
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <DB/IO/IReadFileOperations.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/ReadBuffer.h>
namespace DB
{
class ReadBufferFromFileBase : public IReadFileOperations, public BufferWithOwnMemory<ReadBuffer>
{
public:
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment) {}
virtual ~ReadBufferFromFileBase() = default;
};
}

View File

@ -8,6 +8,7 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBufferFromFileBase.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/BufferWithOwnMemory.h>
@ -18,7 +19,7 @@ namespace DB
/** Работает с готовым файловым дескриптором. Не открывает и не закрывает файл.
*/
class ReadBufferFromFileDescriptor : public BufferWithOwnMemory<ReadBuffer>
class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
int fd;
@ -53,22 +54,22 @@ protected:
}
/// Имя или описание файла
virtual std::string getFileName()
virtual std::string getFileName() const noexcept override
{
return "(fd = " + toString(fd) + ")";
}
public:
ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {}
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {}
int getFD()
int getFD() const noexcept override
{
return fd;
}
/// Если offset такой маленький, что мы не выйдем за пределы буфера, настоящий seek по файлу не делается.
off_t seek(off_t offset, int whence = SEEK_SET)
off_t seek(off_t offset, int whence = SEEK_CUR) override
{
off_t new_pos = offset;
if (whence == SEEK_CUR)
@ -99,7 +100,7 @@ public:
}
}
off_t getPositionInFile()
off_t getPositionInFile() override
{
return pos_in_file - (working_buffer.end() - pos);
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <DB/IO/WriteBufferFromFileBase.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <statdaemons/AIO.h>
@ -14,7 +15,7 @@ namespace DB
/** Класс для асинхронной записи данных.
* Все размеры и смещения должны быть кратны DEFAULT_AIO_FILE_BLOCK_SIZE байтам.
*/
class WriteBufferAIO : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferAIO : public WriteBufferFromFileBase
{
public:
WriteBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
@ -24,12 +25,12 @@ public:
WriteBufferAIO(const WriteBufferAIO &) = delete;
WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
off_t seek(off_t off, int whence = SEEK_SET);
off_t getPositionInFile();
void truncate(off_t length = 0);
void sync();
std::string getFileName() const noexcept { return filename; }
int getFD() const noexcept { return fd; }
off_t seek(off_t off, int whence = SEEK_SET) override;
off_t getPositionInFile() override;
void truncate(off_t length = 0) override;
void sync() override;
std::string getFileName() const noexcept override { return filename; }
int getFD() const noexcept override { return fd; }
private:
/// Если в буфере ещё остались данные - запишем их.

View File

@ -0,0 +1,18 @@
#pragma once
#include <DB/IO/IWriteFileOperations.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/WriteBuffer.h>
namespace DB
{
class WriteBufferFromFileBase : public IWriteFileOperations, public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment) {}
virtual ~WriteBufferFromFileBase() = default;
};
}

View File

@ -6,6 +6,7 @@
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteBufferFromFileBase.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/BufferWithOwnMemory.h>
@ -16,7 +17,7 @@ namespace DB
/** Работает с готовым файловым дескриптором. Не открывает и не закрывает файл.
*/
class WriteBufferFromFileDescriptor : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromFileDescriptor : public WriteBufferFromFileBase
{
protected:
int fd;
@ -40,14 +41,14 @@ protected:
}
/// Имя или описание файла
virtual std::string getFileName()
virtual std::string getFileName() const noexcept override
{
return "(fd = " + toString(fd) + ")";
}
public:
WriteBufferFromFileDescriptor(int fd_ = -1, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), fd(fd_) {}
: WriteBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_) {}
/** Можно вызывать для инициализации, если нужный fd не был передан в конструктор.
* Менять fd во время работы нельзя.
@ -69,12 +70,12 @@ public:
}
}
int getFD()
int getFD() const noexcept override
{
return fd;
}
off_t seek(off_t offset, int whence = SEEK_SET)
off_t seek(off_t offset, int whence = SEEK_SET) override
{
off_t res = lseek(fd, offset, whence);
if (-1 == res)
@ -82,14 +83,19 @@ public:
return res;
}
void truncate(off_t length = 0)
off_t getPositionInFile() override
{
return seek(0, SEEK_CUR);
}
void truncate(off_t length = 0) override
{
int res = ftruncate(fd, length);
if (-1 == res)
throwFromErrno("Cannot truncate file " + getFileName(), ErrorCodes::CANNOT_TRUNCATE_FILE);
}
void sync()
void sync() override
{
/// Если в буфере ещё остались данные - запишем их.
next();

View File

@ -11,8 +11,8 @@ namespace DB
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
: ReadBufferFromFileBase(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory<ReadBuffer>(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
@ -64,7 +64,7 @@ void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
max_bytes_read = max_bytes_read_;
}
off_t ReadBufferAIO::seek(off_t off, int whence)
off_t ReadBufferAIO::seek(off_t off, int whence = SEEK_CUR)
{
if ((off % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid offset for ReadBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);

View File

@ -12,7 +12,7 @@ namespace DB
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
: WriteBufferFromFileBase(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{