dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-03-10 20:14:07 +03:00
parent 882f902118
commit fde57c1a78
6 changed files with 42 additions and 26 deletions

View File

@ -280,7 +280,7 @@ namespace ErrorCodes
AIO_SUBMIT_ERROR,
AIO_COMPLETION_ERROR,
AIO_WRITE_ERROR,
AIO_UNALIGNED_BUFFER_ERROR,
AIO_UNALIGNED_SIZE_ERROR,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -5,12 +5,11 @@
namespace DB
{
/** Этот интерфейс определяет функции, которые ReadBufferAIO и WriteBufferAIO реализуют.
/** Этот интерфейс определяет функции, которые классы ReadBufferAIO и WriteBufferAIO реализуют.
*/
class IBufferAIO
{
public:
IBufferAIO() = default;
virtual ~IBufferAIO() = default;
virtual std::string getFileName() const noexcept = 0;
virtual int getFD() const noexcept = 0;
@ -19,7 +18,9 @@ public:
static const size_t BLOCK_SIZE = 512;
protected:
/// Ждать окончания текущей асинхронной задачи.
virtual void waitForCompletion() = 0;
/// Менять местами основной и дублирующий буферы.
virtual void swapBuffers() noexcept = 0;
};

View File

@ -13,9 +13,10 @@
namespace DB
{
using ReadBufferWithOwnMemory = BufferWithOwnMemory<ReadBuffer>;
class ReadBufferAIO : public IBufferAIO, public ReadBufferWithOwnMemory
/** Класс для асинхронной чтения данных.
* Все размеры и смещения должны быть кратны 512 байтам.
*/
class ReadBufferAIO : public IBufferAIO, public BufferWithOwnMemory<ReadBuffer>
{
public:
ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
@ -37,7 +38,8 @@ private:
void swapBuffers() noexcept override;
private:
ReadBufferWithOwnMemory fill_buffer; // buffer asynchronously read from disk
/// Буфер для асинхронных операций чтения данных.
BufferWithOwnMemory<ReadBuffer> fill_buffer;
const std::string filename;
AIOContext aio_context;
@ -45,14 +47,18 @@ private:
std::vector<iocb *> request_ptrs;
std::vector<io_event> events;
int fd = -1; // file descriptor
int fd = -1;
size_t max_bytes_read = std::numeric_limits<size_t>::max();
size_t total_bytes_read = 0;
off_t pos_in_file = 0;
/// Асинхронная операция чтения ещё не завершилась.
bool is_pending_read = false;
/// Было получено исключение.
bool got_exception = false;
/// Конец файла достигнут.
bool is_eof = false;
/// Был отправлен хоть один запрос на асинхронную операцию чтения.
bool is_started = false;
};

View File

@ -11,9 +11,10 @@
namespace DB
{
using WriteBufferWithOwnMemory = BufferWithOwnMemory<WriteBuffer>;
class WriteBufferAIO : public IBufferAIO, public WriteBufferWithOwnMemory
/** Класс для асинхронной записи данных.
* Все размеры и смещения должны быть кратны 512 байтам.
*/
class WriteBufferAIO : public IBufferAIO, public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
@ -35,18 +36,21 @@ private:
void swapBuffers() noexcept override;
private:
WriteBufferWithOwnMemory flush_buffer; // buffer asynchronously flushed to disk
const std::string filename; // name of the file to which we flush data.
/// Буфер для асинхронных операций записи данных.
BufferWithOwnMemory<WriteBuffer> flush_buffer;
const std::string filename;
AIOContext aio_context;
iocb cb;
std::vector<iocb *> request_ptrs;
std::vector<io_event> events;
int fd = -1; // file descriptor
int fd = -1;
size_t total_bytes_written = 0;
/// Асинхронная операция записи ещё не завершилась.
bool is_pending_write = false;
/// Было получено исключение.
bool got_exception = false;
};

View File

@ -2,7 +2,6 @@
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/ErrorCodes.h>
#include <cerrno>
#include <sys/types.h>
#include <sys/stat.h>
@ -56,7 +55,7 @@ void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
if ((max_bytes_read_ % BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Invalid maximum number of bytes to read from file " + filename, ErrorCodes::AIO_UNALIGNED_BUFFER_ERROR);
throw Exception("Invalid maximum number of bytes to read from file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
max_bytes_read = max_bytes_read_;
}
@ -65,7 +64,7 @@ void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
off_t ReadBufferAIO::seek(off_t off, int whence)
{
if ((off % DB::ReadBufferAIO::BLOCK_SIZE) != 0)
throw Exception("Invalid offset for ReadBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_BUFFER_ERROR);
throw Exception("Invalid offset for ReadBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
waitForCompletion();
@ -117,7 +116,7 @@ bool ReadBufferAIO::nextImpl()
if (is_eof)
return true;
// Create request.
// Создать запрос.
::memset(&cb, 0, sizeof(cb));
cb.aio_lio_opcode = IOCB_CMD_PREAD;
@ -127,7 +126,7 @@ bool ReadBufferAIO::nextImpl()
cb.aio_offset = pos_in_file;
cb.aio_reqprio = 0;
// Submit request.
// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
if (errno != EINTR)
{
@ -162,7 +161,7 @@ void ReadBufferAIO::waitForCompletion()
if ((bytes_read % BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Received unaligned number of bytes from file " + filename, ErrorCodes::AIO_UNALIGNED_BUFFER_ERROR);
throw Exception("Received unaligned number of bytes from file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
pos_in_file += bytes_read;

View File

@ -49,7 +49,7 @@ WriteBufferAIO::~WriteBufferAIO()
off_t WriteBufferAIO::seek(off_t off, int whence)
{
if ((off % DB::WriteBufferAIO::BLOCK_SIZE) != 0)
throw Exception("Invalid offset for WriteBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_BUFFER_ERROR);
throw Exception("Invalid offset for WriteBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
waitForCompletion();
@ -65,7 +65,7 @@ off_t WriteBufferAIO::seek(off_t off, int whence)
void WriteBufferAIO::truncate(off_t length)
{
if ((length % DB::WriteBufferAIO::BLOCK_SIZE) != 0)
throw Exception("Invalid length for WriteBufferAIO::ftruncate", ErrorCodes::AIO_UNALIGNED_BUFFER_ERROR);
throw Exception("Invalid length for WriteBufferAIO::ftruncate", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
waitForCompletion();
@ -79,8 +79,14 @@ void WriteBufferAIO::truncate(off_t length)
void WriteBufferAIO::sync()
{
/// Если в буфере ещё остались данные - запишем их.
next();
waitForCompletion();
::fsync(fd);
/// Попросим ОС сбросить данные на диск.
int res = ::fsync(fd);
if (res == -1)
throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC);
}
void WriteBufferAIO::nextImpl()
@ -91,7 +97,7 @@ void WriteBufferAIO::nextImpl()
waitForCompletion();
swapBuffers();
// Create request.
/// Создать запрос.
::memset(&cb, 0, sizeof(cb));
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
@ -104,10 +110,10 @@ void WriteBufferAIO::nextImpl()
if ((cb.aio_nbytes % BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Illegal attempt to write unaligned data to file " + filename, ErrorCodes::AIO_UNALIGNED_BUFFER_ERROR);
throw Exception("Illegal attempt to write unaligned data to file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
// Submit request.
/// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
if (errno != EINTR)
{