mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 17:20:50 +00:00
dbms: Server: feature development. [#METR-15090]
This commit is contained in:
parent
ee75917084
commit
5312a1b17d
@ -43,6 +43,10 @@ private:
|
||||
void waitForAIOCompletion();
|
||||
/// Менять местами основной и дублирующий буферы.
|
||||
void swapBuffers() noexcept;
|
||||
///
|
||||
void prepare();
|
||||
///
|
||||
void finalize();
|
||||
|
||||
private:
|
||||
/// Буфер для асинхронных операций записи данных.
|
||||
@ -75,6 +79,13 @@ private:
|
||||
/// Максимальная достигнутая позиция в файле.
|
||||
off_t max_pos_in_file = 0;
|
||||
|
||||
Position buffer_begin = nullptr;
|
||||
size_t excess_count = 0;
|
||||
size_t buffer_capacity = 0;
|
||||
size_t region_aligned_size = 0;
|
||||
off_t region_aligned_begin = 0;
|
||||
off_t bytes_written = 0;
|
||||
|
||||
/// Файловый дескриптор для записи.
|
||||
int fd = -1;
|
||||
/// Файловый дескриптор для чтения. Употребляется для невыровненных записей.
|
||||
|
@ -147,196 +147,7 @@ void WriteBufferAIO::nextImpl()
|
||||
waitForAIOCompletion();
|
||||
swapBuffers();
|
||||
|
||||
truncation_count = 0;
|
||||
|
||||
/*
|
||||
Страница на диске или в памяти
|
||||
|
||||
начальный адрес (начальная позиция в случае диска) кратен DEFAULT_AIO_FILE_BLOCK_SIZE
|
||||
:
|
||||
:
|
||||
+---------------+
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
+---------------+
|
||||
<--------------->
|
||||
:
|
||||
:
|
||||
DEFAULT_AIO_FILE_BLOCK_SIZE
|
||||
|
||||
*/
|
||||
|
||||
/*
|
||||
Представление данных на диске
|
||||
|
||||
XXX : данные, которые хотим записать
|
||||
ZZZ : данные, которые уже на диске или нули, если отсутствуют данные
|
||||
|
||||
region_aligned_begin region_aligned_end
|
||||
: region_begin region_end :
|
||||
: : : :
|
||||
: : : :
|
||||
+---:-----------+---------------+---------------+---------------+--:------------+
|
||||
| : | | | | : |
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
| | | | | |
|
||||
+---------------+---------------+---------------+---------------+---------------+
|
||||
|
||||
<--><--------------------------------------------------------------><----------->
|
||||
: : :
|
||||
: : :
|
||||
region_left_padding region_size region_right_padding
|
||||
|
||||
<------------------------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
region_aligned_size
|
||||
*/
|
||||
|
||||
/// Регион диска, в который хотим записать данные.
|
||||
const off_t region_begin = pos_in_file;
|
||||
const off_t region_end = pos_in_file + flush_buffer.offset();
|
||||
const size_t region_size = region_end - region_begin;
|
||||
|
||||
/// Выровненный регион диска, в который хотим записать данные.
|
||||
const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
const off_t region_aligned_begin = region_begin - region_left_padding;
|
||||
const off_t region_aligned_end = region_end + region_right_padding;
|
||||
const size_t region_aligned_size = region_aligned_end - region_aligned_begin;
|
||||
|
||||
/*
|
||||
Представление данных в буфере до обработки
|
||||
|
||||
XXX : данные, которые хотим записать
|
||||
|
||||
buffer_begin buffer_end
|
||||
: :
|
||||
: :
|
||||
+---------------+---------------+---------------+-------------:-+
|
||||
| | | | : |
|
||||
+---------------+---------------+---------------+-------------+ |
|
||||
|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
|
||||
|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
|
||||
+---------------+---------------+---------------+-------------+ |
|
||||
| | | | |
|
||||
+---------------+---------------+---------------+---------------+
|
||||
|
||||
<------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_size
|
||||
|
||||
<--------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_capacity
|
||||
*/
|
||||
|
||||
/// Буфер данных, которые хотим записать на диск.
|
||||
const Position buffer_begin = flush_buffer.buffer().begin();
|
||||
Position buffer_end = buffer_begin + region_size;
|
||||
size_t buffer_size = buffer_end - buffer_begin;
|
||||
const size_t buffer_capacity = flush_buffer.buffer().size();
|
||||
|
||||
/// Обработать буфер, чтобы он отражал структуру региона диска.
|
||||
|
||||
/*
|
||||
Представление данных в буфере после обработки
|
||||
|
||||
XXX : данные, которые хотим записать
|
||||
ZZZ : данные из диска или нули, если отсутствуют данные
|
||||
|
||||
buffer_begin buffer_end memory_page
|
||||
: : :
|
||||
: : :
|
||||
+---:-----------+---------------+---------------+---------------+--:------------+
|
||||
| | | | | : |
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
| | | | | |
|
||||
+---------------+---------------+---------------+---------------+---------------+
|
||||
|
||||
<--------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_capacity
|
||||
|
||||
<--><--------------------------------------------------------------><----------->
|
||||
: : :
|
||||
: : :
|
||||
region_left_padding region_size region_right_padding
|
||||
|
||||
<------------------------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
region_aligned_size
|
||||
*/
|
||||
|
||||
size_t excess_count = 0;
|
||||
|
||||
if (region_left_padding > 0)
|
||||
{
|
||||
/// Сдвинуть данные буфера вправо. Дополнить начало буфера данными из диска.
|
||||
/// Копировать данные, которые не влезают в буфер, в дополнительный буфер
|
||||
/// размером со страницу.
|
||||
|
||||
if ((region_left_padding + buffer_size) > buffer_capacity)
|
||||
{
|
||||
excess_count = region_left_padding + buffer_size - buffer_capacity;
|
||||
::memcpy(&memory_page[0], buffer_end - excess_count, excess_count);
|
||||
::memset(&memory_page[excess_count], 0, memory_page.size() - excess_count);
|
||||
buffer_size = buffer_capacity;
|
||||
}
|
||||
else
|
||||
buffer_size += region_left_padding;
|
||||
|
||||
buffer_end = buffer_begin + buffer_size;
|
||||
|
||||
::memmove(buffer_begin + region_left_padding, buffer_begin, buffer_size - region_left_padding);
|
||||
|
||||
ssize_t read_count = ::pread(fd2, buffer_begin, region_left_padding, region_aligned_begin);
|
||||
if (read_count < 0)
|
||||
{
|
||||
got_exception = true;
|
||||
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
|
||||
}
|
||||
|
||||
::memset(buffer_begin + read_count, 0, region_left_padding - read_count);
|
||||
}
|
||||
|
||||
if (region_right_padding > 0)
|
||||
{
|
||||
/// При необходимости дополнить конец буфера данными из диска.
|
||||
|
||||
Position from;
|
||||
if (excess_count > 0)
|
||||
from = &memory_page[excess_count];
|
||||
else
|
||||
from = buffer_end;
|
||||
|
||||
ssize_t read_count = ::pread(fd2, from, region_right_padding, region_end);
|
||||
if (read_count < 0)
|
||||
{
|
||||
got_exception = true;
|
||||
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
|
||||
}
|
||||
|
||||
truncation_count = region_right_padding - read_count;
|
||||
|
||||
if (from == buffer_end)
|
||||
::memset(from + read_count, 0, truncation_count);
|
||||
}
|
||||
prepare();
|
||||
|
||||
/// Создать запрос на асинхронную запись.
|
||||
|
||||
@ -399,7 +210,7 @@ void WriteBufferAIO::waitForAIOCompletion()
|
||||
}
|
||||
|
||||
is_pending_write = false;
|
||||
off_t bytes_written = events[0].res;
|
||||
bytes_written = events[0].res;
|
||||
|
||||
if (bytes_written < bytes_to_write)
|
||||
{
|
||||
@ -407,6 +218,211 @@ void WriteBufferAIO::waitForAIOCompletion()
|
||||
throw Exception("Asynchronous write error on file " + filename, ErrorCodes::AIO_WRITE_ERROR);
|
||||
}
|
||||
|
||||
finalize();
|
||||
}
|
||||
|
||||
void WriteBufferAIO::swapBuffers() noexcept
|
||||
{
|
||||
buffer().swap(flush_buffer.buffer());
|
||||
std::swap(position(), flush_buffer.position());
|
||||
}
|
||||
|
||||
void WriteBufferAIO::prepare()
|
||||
{
|
||||
truncation_count = 0;
|
||||
|
||||
/*
|
||||
Страница на диске или в памяти
|
||||
|
||||
начальный адрес (начальная позиция в случае диска) кратен DEFAULT_AIO_FILE_BLOCK_SIZE
|
||||
:
|
||||
:
|
||||
+---------------+
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
+---------------+
|
||||
<--------------->
|
||||
:
|
||||
:
|
||||
DEFAULT_AIO_FILE_BLOCK_SIZE
|
||||
|
||||
*/
|
||||
|
||||
/*
|
||||
Представление данных на диске
|
||||
|
||||
XXX : данные, которые хотим записать
|
||||
ZZZ : данные, которые уже на диске или нули, если отсутствуют данные
|
||||
|
||||
region_aligned_begin region_aligned_end
|
||||
: region_begin region_end :
|
||||
: : : :
|
||||
: : : :
|
||||
+---:-----------+---------------+---------------+---------------+--:------------+
|
||||
| : | | | | : |
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
| | | | | |
|
||||
+---------------+---------------+---------------+---------------+---------------+
|
||||
|
||||
<--><--------------------------------------------------------------><----------->
|
||||
: : :
|
||||
: : :
|
||||
region_left_padding region_size region_right_padding
|
||||
|
||||
<------------------------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
region_aligned_size
|
||||
*/
|
||||
|
||||
/// Регион диска, в который хотим записать данные.
|
||||
const off_t region_begin = pos_in_file;
|
||||
const off_t region_end = pos_in_file + flush_buffer.offset();
|
||||
const size_t region_size = region_end - region_begin;
|
||||
|
||||
/// Выровненный регион диска, в который хотим записать данные.
|
||||
const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||
|
||||
region_aligned_begin = region_begin - region_left_padding;
|
||||
const off_t region_aligned_end = region_end + region_right_padding;
|
||||
region_aligned_size = region_aligned_end - region_aligned_begin;
|
||||
|
||||
/*
|
||||
Представление данных в буфере до обработки
|
||||
|
||||
XXX : данные, которые хотим записать
|
||||
|
||||
buffer_begin buffer_end
|
||||
: :
|
||||
: :
|
||||
+---------------+---------------+---------------+-------------:-+
|
||||
| | | | : |
|
||||
+---------------+---------------+---------------+-------------+ |
|
||||
|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
|
||||
|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
|
||||
+---------------+---------------+---------------+-------------+ |
|
||||
| | | | |
|
||||
+---------------+---------------+---------------+---------------+
|
||||
|
||||
<------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_size
|
||||
|
||||
<--------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_capacity
|
||||
*/
|
||||
|
||||
/// Буфер данных, которые хотим записать на диск.
|
||||
buffer_begin = flush_buffer.buffer().begin();
|
||||
Position buffer_end = buffer_begin + region_size;
|
||||
size_t buffer_size = buffer_end - buffer_begin;
|
||||
buffer_capacity = flush_buffer.buffer().size();
|
||||
|
||||
/// Обработать буфер, чтобы он отражал структуру региона диска.
|
||||
|
||||
/*
|
||||
Представление данных в буфере после обработки
|
||||
|
||||
XXX : данные, которые хотим записать
|
||||
ZZZ : данные из диска или нули, если отсутствуют данные
|
||||
|
||||
buffer_begin buffer_end memory_page
|
||||
: : :
|
||||
: : :
|
||||
+---:-----------+---------------+---------------+---------------+--:------------+
|
||||
| | | | | : |
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
|ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
|
||||
| +-----------+---------------+---------------+---------------+--+ |
|
||||
| | | | | |
|
||||
+---------------+---------------+---------------+---------------+---------------+
|
||||
|
||||
<--------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
buffer_capacity
|
||||
|
||||
<--><--------------------------------------------------------------><----------->
|
||||
: : :
|
||||
: : :
|
||||
region_left_padding region_size region_right_padding
|
||||
|
||||
<------------------------------------------------------------------------------->
|
||||
:
|
||||
:
|
||||
region_aligned_size
|
||||
*/
|
||||
|
||||
excess_count = 0;
|
||||
|
||||
if (region_left_padding > 0)
|
||||
{
|
||||
/// Сдвинуть данные буфера вправо. Дополнить начало буфера данными из диска.
|
||||
/// Копировать данные, которые не влезают в буфер, в дополнительный буфер
|
||||
/// размером со страницу.
|
||||
|
||||
if ((region_left_padding + buffer_size) > buffer_capacity)
|
||||
{
|
||||
excess_count = region_left_padding + buffer_size - buffer_capacity;
|
||||
::memcpy(&memory_page[0], buffer_end - excess_count, excess_count);
|
||||
::memset(&memory_page[excess_count], 0, memory_page.size() - excess_count);
|
||||
buffer_size = buffer_capacity;
|
||||
}
|
||||
else
|
||||
buffer_size += region_left_padding;
|
||||
|
||||
buffer_end = buffer_begin + buffer_size;
|
||||
|
||||
::memmove(buffer_begin + region_left_padding, buffer_begin, buffer_size - region_left_padding);
|
||||
|
||||
ssize_t read_count = ::pread(fd2, buffer_begin, region_left_padding, region_aligned_begin);
|
||||
if (read_count < 0)
|
||||
{
|
||||
got_exception = true;
|
||||
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
|
||||
}
|
||||
|
||||
::memset(buffer_begin + read_count, 0, region_left_padding - read_count);
|
||||
}
|
||||
|
||||
if (region_right_padding > 0)
|
||||
{
|
||||
/// При необходимости дополнить конец буфера данными из диска.
|
||||
|
||||
Position from;
|
||||
if (excess_count > 0)
|
||||
from = &memory_page[excess_count];
|
||||
else
|
||||
from = buffer_end;
|
||||
|
||||
ssize_t read_count = ::pread(fd2, from, region_right_padding, region_end);
|
||||
if (read_count < 0)
|
||||
{
|
||||
got_exception = true;
|
||||
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
|
||||
}
|
||||
|
||||
truncation_count = region_right_padding - read_count;
|
||||
|
||||
if (from == buffer_end)
|
||||
::memset(from + read_count, 0, truncation_count);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferAIO::finalize()
|
||||
{
|
||||
bytes_written -= truncation_count;
|
||||
|
||||
off_t pos_offset = bytes_written - (pos_in_file - request.aio_offset);
|
||||
@ -432,10 +448,4 @@ void WriteBufferAIO::waitForAIOCompletion()
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferAIO::swapBuffers() noexcept
|
||||
{
|
||||
buffer().swap(flush_buffer.buffer());
|
||||
std::swap(position(), flush_buffer.position());
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user