dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-04-07 14:43:23 +03:00
parent 52e6f5d428
commit ec556cddac
3 changed files with 23 additions and 43 deletions

View File

@ -45,10 +45,10 @@ private:
void skip(); void skip();
/// Ждать окончания текущей асинхронной задачи. /// Ждать окончания текущей асинхронной задачи.
bool waitForAIOCompletion(); bool waitForAIOCompletion();
/// Подготовить асинхронный запрос. /// Подготовить запрос.
void prepare(); void prepare();
/// Подготовить к чтению дублирующий буфер содержащий данные от /// Подготовить к чтению дублирующий буфер содержащий данные от
/// последнего асинхронного запроса. /// последнего запроса.
void publish(); void publish();
private: private:

View File

@ -9,7 +9,6 @@
#include <string> #include <string>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/uio.h>
namespace DB namespace DB
{ {
@ -33,15 +32,15 @@ public:
int getFD() const noexcept override { return fd; } int getFD() const noexcept override { return fd; }
private: private:
///
void nextImpl() override;
/// ///
off_t doSeek(off_t off, int whence) override; off_t doSeek(off_t off, int whence) override;
/// Если в буфере ещё остались данные - запишем их. /// Если в буфере ещё остались данные - запишем их.
void flush(); void flush();
///
void nextImpl() override;
/// Ждать окончания текущей асинхронной задачи. /// Ждать окончания текущей асинхронной задачи.
bool waitForAIOCompletion(); bool waitForAIOCompletion();
/// /// Подготовить асинхронный запрос.
void prepare(); void prepare();
/// ///
void finalize(); void finalize();
@ -57,13 +56,12 @@ private:
AIOContext aio_context{1}; AIOContext aio_context{1};
iovec iov[3];
const std::string filename; const std::string filename;
/// Количество байтов, которые будут записаны на диск. /// Количество байтов, которые будут записаны на диск.
off_t bytes_to_write = 0; off_t bytes_to_write = 0;
/// Количество записанных байтов при последнем запросе.
off_t bytes_written = 0;
/// Количество нулевых байтов, которые надо отрезать c конца файла /// Количество нулевых байтов, которые надо отрезать c конца файла
/// после завершения операции записи данных. /// после завершения операции записи данных.
off_t truncation_count = 0; off_t truncation_count = 0;
@ -73,16 +71,19 @@ private:
/// Максимальная достигнутая позиция в файле. /// Максимальная достигнутая позиция в файле.
off_t max_pos_in_file = 0; off_t max_pos_in_file = 0;
Position buffer_begin = nullptr; /// Начальная позиция выровненного региона диска, в который записываются данные.
size_t region_aligned_size = 0;
off_t region_aligned_begin = 0; off_t region_aligned_begin = 0;
off_t bytes_written = 0; /// Размер выровненного региона диска.
size_t region_aligned_size = 0;
/// Файловый дескриптор для записи. /// Файловый дескриптор для записи.
int fd = -1; int fd = -1;
/// Файловый дескриптор для чтения. Употребляется для невыровненных записей. /// Файловый дескриптор для чтения. Употребляется для невыровненных записей.
int fd2 = -1; int fd2 = -1;
/// Буфер данных, которые хотим записать на диск.
Position buffer_begin = nullptr;
/// Асинхронная операция записи ещё не завершилась? /// Асинхронная операция записи ещё не завершилась?
bool is_pending_write = false; bool is_pending_write = false;
/// Было получено исключение? /// Было получено исключение?

View File

@ -17,9 +17,9 @@ WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_) filename(filename_)
{ {
/// Исправить информацию о размере буферов, чтобы дополнительные страницы не касались базового класса BufferBase.
this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE); this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
this->internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE); this->internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
flush_buffer.buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE); flush_buffer.buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
flush_buffer.internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE); flush_buffer.internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
@ -111,15 +111,10 @@ void WriteBufferAIO::nextImpl()
/// Создать запрос на асинхронную запись. /// Создать запрос на асинхронную запись.
iov[0].iov_base = buffer_begin; request.aio_lio_opcode = IOCB_CMD_PWRITE;
iov[0].iov_len = region_aligned_size;
bytes_to_write = region_aligned_size;
request.aio_lio_opcode = IOCB_CMD_PWRITEV;
request.aio_fildes = fd; request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(iov); request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
request.aio_nbytes = 1; request.aio_nbytes = region_aligned_size;
request.aio_offset = region_aligned_begin; request.aio_offset = region_aligned_begin;
/// Отправить запрос. /// Отправить запрос.
@ -276,6 +271,8 @@ void WriteBufferAIO::prepare()
const off_t region_aligned_end = region_end + region_right_padding; const off_t region_aligned_end = region_end + region_right_padding;
region_aligned_size = region_aligned_end - region_aligned_begin; region_aligned_size = region_aligned_end - region_aligned_begin;
bytes_to_write = region_aligned_size;
/* /*
Представление данных в буфере до обработки Представление данных в буфере до обработки
@ -297,11 +294,6 @@ void WriteBufferAIO::prepare()
: :
: :
buffer_size buffer_size
<--------------------------------------------------------------->
:
:
buffer_capacity
*/ */
/// Буфер данных, которые хотим записать на диск. /// Буфер данных, которые хотим записать на диск.
@ -317,7 +309,7 @@ void WriteBufferAIO::prepare()
XXX : данные, которые хотим записать XXX : данные, которые хотим записать
ZZZ : данные из диска или нули, если отсутствуют данные ZZZ : данные из диска или нули, если отсутствуют данные
buffer_begin buffer_end memory_page buffer_begin buffer_end дополнительная страница
: : : : : :
: : : : : :
+---:-----------+---------------+---------------+---------------+--:------------+ +---:-----------+---------------+---------------+---------------+--:------------+
@ -329,11 +321,6 @@ void WriteBufferAIO::prepare()
| | | | | | | | | | | |
+---------------+---------------+---------------+---------------+---------------+ +---------------+---------------+---------------+---------------+---------------+
<--------------------------------------------------------------->
:
:
buffer_capacity
<--><--------------------------------------------------------------><-----------> <--><--------------------------------------------------------------><----------->
: : : : : :
: : : : : :
@ -348,9 +335,6 @@ void WriteBufferAIO::prepare()
if (region_left_padding > 0) if (region_left_padding > 0)
{ {
/// Сдвинуть данные буфера вправо. Дополнить начало буфера данными из диска. /// Сдвинуть данные буфера вправо. Дополнить начало буфера данными из диска.
/// Копировать данные, которые не влезают в буфер, в дополнительный буфер
/// размером со страницу.
buffer_size += region_left_padding; buffer_size += region_left_padding;
buffer_end = buffer_begin + buffer_size; buffer_end = buffer_begin + buffer_size;
@ -368,11 +352,8 @@ void WriteBufferAIO::prepare()
if (region_right_padding > 0) if (region_right_padding > 0)
{ {
/// При необходимости дополнить конец буфера данными из диска. /// Дополнить конец буфера данными из диска.
ssize_t read_count = ::pread(fd2, buffer_end, region_right_padding, region_end);
Position from = buffer_end;
ssize_t read_count = ::pread(fd2, from, region_right_padding, region_end);
if (read_count < 0) if (read_count < 0)
{ {
got_exception = true; got_exception = true;
@ -380,9 +361,7 @@ void WriteBufferAIO::prepare()
} }
truncation_count = region_right_padding - read_count; truncation_count = region_right_padding - read_count;
::memset(buffer_end + read_count, 0, truncation_count);
if (from == buffer_end)
::memset(from + read_count, 0, truncation_count);
} }
} }