mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
dbms: Server: feature development. [#METR-15090]
This commit is contained in:
parent
59ebe53759
commit
52e6f5d428
@ -59,10 +59,6 @@ private:
|
|||||||
|
|
||||||
iovec iov[3];
|
iovec iov[3];
|
||||||
|
|
||||||
/// Дополнительный буфер размером со страницу. Содежрит те данные, которые
|
|
||||||
/// не влезают в основной буфер.
|
|
||||||
Memory memory_page{DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE};
|
|
||||||
|
|
||||||
const std::string filename;
|
const std::string filename;
|
||||||
|
|
||||||
/// Количество байтов, которые будут записаны на диск.
|
/// Количество байтов, которые будут записаны на диск.
|
||||||
@ -78,8 +74,6 @@ private:
|
|||||||
off_t max_pos_in_file = 0;
|
off_t max_pos_in_file = 0;
|
||||||
|
|
||||||
Position buffer_begin = nullptr;
|
Position buffer_begin = nullptr;
|
||||||
size_t excess_count = 0;
|
|
||||||
size_t buffer_capacity = 0;
|
|
||||||
size_t region_aligned_size = 0;
|
size_t region_aligned_size = 0;
|
||||||
off_t region_aligned_begin = 0;
|
off_t region_aligned_begin = 0;
|
||||||
off_t bytes_written = 0;
|
off_t bytes_written = 0;
|
||||||
|
@ -9,12 +9,20 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/// Примечание: выделяется дополнительная страница, которая содежрит те данные, которые
|
||||||
|
/// не влезают в основной буфер.
|
||||||
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
|
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
|
||||||
char * existing_memory_)
|
char * existing_memory_)
|
||||||
: WriteBufferFromFileBase(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
|
: WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
|
||||||
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
|
flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
|
||||||
filename(filename_)
|
filename(filename_)
|
||||||
{
|
{
|
||||||
|
this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
this->internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
flush_buffer.buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
flush_buffer.internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::FileOpen);
|
ProfileEvents::increment(ProfileEvents::FileOpen);
|
||||||
|
|
||||||
int open_flags = (flags_ == -1) ? (O_WRONLY | O_TRUNC | O_CREAT) : flags_;
|
int open_flags = (flags_ == -1) ? (O_WRONLY | O_TRUNC | O_CREAT) : flags_;
|
||||||
@ -103,35 +111,15 @@ void WriteBufferAIO::nextImpl()
|
|||||||
|
|
||||||
/// Создать запрос на асинхронную запись.
|
/// Создать запрос на асинхронную запись.
|
||||||
|
|
||||||
size_t i = 0;
|
iov[0].iov_base = buffer_begin;
|
||||||
|
iov[0].iov_len = region_aligned_size;
|
||||||
|
|
||||||
iov[i].iov_base = buffer_begin;
|
bytes_to_write = region_aligned_size;
|
||||||
iov[i].iov_len = ((excess_count > 0) ? buffer_capacity : region_aligned_size);
|
|
||||||
++i;
|
|
||||||
|
|
||||||
if (excess_count > 0)
|
|
||||||
{
|
|
||||||
iov[i].iov_base = &memory_page[0];
|
|
||||||
iov[i].iov_len = memory_page.size();
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
|
|
||||||
bytes_to_write = 0;
|
|
||||||
for (size_t j = 0; j < i; ++j)
|
|
||||||
{
|
|
||||||
if ((iov[j].iov_len > std::numeric_limits<off_t>::max()) ||
|
|
||||||
(static_cast<off_t>(iov[j].iov_len) > (std::numeric_limits<off_t>::max() - bytes_to_write)))
|
|
||||||
{
|
|
||||||
got_exception = true;
|
|
||||||
throw Exception("Overflow on bytes to write", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
|
||||||
bytes_to_write += iov[j].iov_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
request.aio_lio_opcode = IOCB_CMD_PWRITEV;
|
request.aio_lio_opcode = IOCB_CMD_PWRITEV;
|
||||||
request.aio_fildes = fd;
|
request.aio_fildes = fd;
|
||||||
request.aio_buf = reinterpret_cast<UInt64>(iov);
|
request.aio_buf = reinterpret_cast<UInt64>(iov);
|
||||||
request.aio_nbytes = i;
|
request.aio_nbytes = 1;
|
||||||
request.aio_offset = region_aligned_begin;
|
request.aio_offset = region_aligned_begin;
|
||||||
|
|
||||||
/// Отправить запрос.
|
/// Отправить запрос.
|
||||||
@ -320,7 +308,6 @@ void WriteBufferAIO::prepare()
|
|||||||
buffer_begin = flush_buffer.buffer().begin();
|
buffer_begin = flush_buffer.buffer().begin();
|
||||||
Position buffer_end = buffer_begin + region_size;
|
Position buffer_end = buffer_begin + region_size;
|
||||||
size_t buffer_size = buffer_end - buffer_begin;
|
size_t buffer_size = buffer_end - buffer_begin;
|
||||||
buffer_capacity = flush_buffer.buffer().size();
|
|
||||||
|
|
||||||
/// Обработать буфер, чтобы он отражал структуру региона диска.
|
/// Обработать буфер, чтобы он отражал структуру региона диска.
|
||||||
|
|
||||||
@ -358,24 +345,13 @@ void WriteBufferAIO::prepare()
|
|||||||
region_aligned_size
|
region_aligned_size
|
||||||
*/
|
*/
|
||||||
|
|
||||||
excess_count = 0;
|
|
||||||
|
|
||||||
if (region_left_padding > 0)
|
if (region_left_padding > 0)
|
||||||
{
|
{
|
||||||
/// Сдвинуть данные буфера вправо. Дополнить начало буфера данными из диска.
|
/// Сдвинуть данные буфера вправо. Дополнить начало буфера данными из диска.
|
||||||
/// Копировать данные, которые не влезают в буфер, в дополнительный буфер
|
/// Копировать данные, которые не влезают в буфер, в дополнительный буфер
|
||||||
/// размером со страницу.
|
/// размером со страницу.
|
||||||
|
|
||||||
if ((region_left_padding + buffer_size) > buffer_capacity)
|
buffer_size += region_left_padding;
|
||||||
{
|
|
||||||
excess_count = region_left_padding + buffer_size - buffer_capacity;
|
|
||||||
::memcpy(&memory_page[0], buffer_end - excess_count, excess_count);
|
|
||||||
::memset(&memory_page[excess_count], 0, memory_page.size() - excess_count);
|
|
||||||
buffer_size = buffer_capacity;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
buffer_size += region_left_padding;
|
|
||||||
|
|
||||||
buffer_end = buffer_begin + buffer_size;
|
buffer_end = buffer_begin + buffer_size;
|
||||||
|
|
||||||
::memmove(buffer_begin + region_left_padding, buffer_begin, buffer_size - region_left_padding);
|
::memmove(buffer_begin + region_left_padding, buffer_begin, buffer_size - region_left_padding);
|
||||||
@ -394,11 +370,7 @@ void WriteBufferAIO::prepare()
|
|||||||
{
|
{
|
||||||
/// При необходимости дополнить конец буфера данными из диска.
|
/// При необходимости дополнить конец буфера данными из диска.
|
||||||
|
|
||||||
Position from;
|
Position from = buffer_end;
|
||||||
if (excess_count > 0)
|
|
||||||
from = &memory_page[excess_count];
|
|
||||||
else
|
|
||||||
from = buffer_end;
|
|
||||||
|
|
||||||
ssize_t read_count = ::pread(fd2, from, region_right_padding, region_end);
|
ssize_t read_count = ::pread(fd2, from, region_right_padding, region_end);
|
||||||
if (read_count < 0)
|
if (read_count < 0)
|
||||||
|
Loading…
Reference in New Issue
Block a user