mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
dbms: Server: feature development. [#METR-15090]
This commit is contained in:
parent
9460f1b4d4
commit
7cdc460f96
@ -44,7 +44,6 @@ public:
|
|||||||
{
|
{
|
||||||
bytes += offset();
|
bytes += offset();
|
||||||
bool res = nextImpl();
|
bool res = nextImpl();
|
||||||
sync();
|
|
||||||
if (!res)
|
if (!res)
|
||||||
working_buffer.resize(0);
|
working_buffer.resize(0);
|
||||||
|
|
||||||
@ -150,7 +149,6 @@ private:
|
|||||||
* Кинуть исключение, если что-то не так.
|
* Кинуть исключение, если что-то не так.
|
||||||
*/
|
*/
|
||||||
virtual bool nextImpl() { return false; };
|
virtual bool nextImpl() { return false; };
|
||||||
virtual void sync() {}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,11 +36,15 @@ private:
|
|||||||
off_t getPositionInFileRelaxed() const noexcept;
|
off_t getPositionInFileRelaxed() const noexcept;
|
||||||
off_t doSeek(off_t off, int whence) override;
|
off_t doSeek(off_t off, int whence) override;
|
||||||
bool nextImpl() override;
|
bool nextImpl() override;
|
||||||
void sync() override;
|
void synchronousRead();
|
||||||
|
void initRequest();
|
||||||
|
void publishReceivedData();
|
||||||
|
void sync();
|
||||||
/// Ждать окончания текущей асинхронной задачи.
|
/// Ждать окончания текущей асинхронной задачи.
|
||||||
void waitForAIOCompletion();
|
void waitForAIOCompletion();
|
||||||
/// Менять местами основной и дублирующий буферы.
|
/// Менять местами основной и дублирующий буферы.
|
||||||
void swapBuffers() noexcept;
|
void swapBuffers() noexcept;
|
||||||
|
void skipLastRequest();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Буфер для асинхронных операций чтения данных.
|
/// Буфер для асинхронных операций чтения данных.
|
||||||
@ -60,11 +64,14 @@ private:
|
|||||||
|
|
||||||
const std::string filename;
|
const std::string filename;
|
||||||
|
|
||||||
|
ssize_t bytes_read = 0;
|
||||||
size_t max_bytes_read = std::numeric_limits<size_t>::max();
|
size_t max_bytes_read = std::numeric_limits<size_t>::max();
|
||||||
size_t total_bytes_read = 0;
|
size_t total_bytes_read = 0;
|
||||||
size_t requested_byte_count = 0;
|
size_t requested_byte_count = 0;
|
||||||
|
off_t region_aligned_begin = 0;
|
||||||
off_t pos_in_file = 0;
|
off_t pos_in_file = 0;
|
||||||
int fd = -1;
|
int fd = -1;
|
||||||
|
size_t iov_size = 0;
|
||||||
|
|
||||||
size_t buffer_capacity = 0;
|
size_t buffer_capacity = 0;
|
||||||
|
|
||||||
|
@ -61,8 +61,6 @@ void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
|
|||||||
|
|
||||||
off_t ReadBufferAIO::doSeek(off_t off, int whence)
|
off_t ReadBufferAIO::doSeek(off_t off, int whence)
|
||||||
{
|
{
|
||||||
sync();
|
|
||||||
|
|
||||||
off_t new_pos;
|
off_t new_pos;
|
||||||
|
|
||||||
if (whence == SEEK_SET)
|
if (whence == SEEK_SET)
|
||||||
@ -110,6 +108,9 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
|
|||||||
pos = working_buffer.end();
|
pos = working_buffer.end();
|
||||||
pos_in_file = new_pos;
|
pos_in_file = new_pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Сдвинулись, значит не можем использовать результат последнего асинхронного запроса.
|
||||||
|
skipLastRequest();
|
||||||
}
|
}
|
||||||
|
|
||||||
return new_pos;
|
return new_pos;
|
||||||
@ -132,60 +133,24 @@ bool ReadBufferAIO::nextImpl()
|
|||||||
if (is_eof)
|
if (is_eof)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
sync();
|
if (!is_started)
|
||||||
is_started = true;
|
{
|
||||||
|
synchronousRead();
|
||||||
|
is_started = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
sync();
|
||||||
|
|
||||||
/// Если конец файла только что достигнут, больше ничего не делаем.
|
/// Если конец файла только что достигнут, больше ничего не делаем.
|
||||||
if (is_eof)
|
if (is_eof)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
/// Количество запрашиваемых байтов.
|
initRequest();
|
||||||
requested_byte_count = std::min(fill_buffer.internalBuffer().size(), max_bytes_read);
|
|
||||||
|
|
||||||
/// Регион диска, из которого хотим читать данные.
|
|
||||||
const off_t region_begin = pos_in_file;
|
|
||||||
const off_t region_end = pos_in_file + requested_byte_count;
|
|
||||||
const size_t region_size = region_end - region_begin;
|
|
||||||
|
|
||||||
/// Выровненный регион диска, из которого хотим читать данные.
|
|
||||||
const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
|
||||||
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
|
||||||
|
|
||||||
const off_t region_aligned_begin = region_begin - region_left_padding;
|
|
||||||
const off_t region_aligned_end = region_end + region_right_padding;
|
|
||||||
const off_t region_aligned_size = region_aligned_end - region_aligned_begin;
|
|
||||||
|
|
||||||
/// Буфер, в который запишем данные из диска.
|
|
||||||
const Position buffer_begin = fill_buffer.internalBuffer().begin();
|
|
||||||
buffer_capacity = this->memory.size();
|
|
||||||
|
|
||||||
size_t excess_count = 0;
|
|
||||||
|
|
||||||
if ((region_left_padding + region_size) > buffer_capacity)
|
|
||||||
{
|
|
||||||
excess_count = region_left_padding + region_size - buffer_capacity;
|
|
||||||
::memset(&memory_page[0], 0, memory_page.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Создать запрос на асинхронное чтение.
|
|
||||||
|
|
||||||
size_t i = 0;
|
|
||||||
|
|
||||||
iov[i].iov_base = buffer_begin;
|
|
||||||
iov[i].iov_len = (excess_count > 0) ? buffer_capacity : region_aligned_size;
|
|
||||||
++i;
|
|
||||||
|
|
||||||
if (excess_count > 0)
|
|
||||||
{
|
|
||||||
iov[i].iov_base = &memory_page[0];
|
|
||||||
iov[i].iov_len = memory_page.size();
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
|
|
||||||
request.aio_lio_opcode = IOCB_CMD_PREADV;
|
request.aio_lio_opcode = IOCB_CMD_PREADV;
|
||||||
request.aio_fildes = fd;
|
request.aio_fildes = fd;
|
||||||
request.aio_buf = reinterpret_cast<UInt64>(iov);
|
request.aio_buf = reinterpret_cast<UInt64>(iov);
|
||||||
request.aio_nbytes = i;
|
request.aio_nbytes = iov_size;
|
||||||
request.aio_offset = region_aligned_begin;
|
request.aio_offset = region_aligned_begin;
|
||||||
|
|
||||||
/// Отправить запрос.
|
/// Отправить запрос.
|
||||||
@ -202,32 +167,56 @@ bool ReadBufferAIO::nextImpl()
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReadBufferAIO::sync()
|
void ReadBufferAIO::initRequest()
|
||||||
{
|
{
|
||||||
if (is_eof || !is_started || !is_pending_read)
|
/// Количество запрашиваемых байтов.
|
||||||
return;
|
requested_byte_count = std::min(fill_buffer.internalBuffer().size(), max_bytes_read);
|
||||||
|
|
||||||
waitForAIOCompletion();
|
/// Регион диска, из которого хотим читать данные.
|
||||||
swapBuffers();
|
const off_t region_begin = pos_in_file;
|
||||||
}
|
const off_t region_end = pos_in_file + requested_byte_count;
|
||||||
|
const size_t region_size = region_end - region_begin;
|
||||||
|
|
||||||
void ReadBufferAIO::waitForAIOCompletion()
|
/// Выровненный регион диска, из которого хотим читать данные.
|
||||||
{
|
const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||||
if (!is_pending_read)
|
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||||
return;
|
|
||||||
|
|
||||||
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
|
region_aligned_begin = region_begin - region_left_padding;
|
||||||
|
const off_t region_aligned_end = region_end + region_right_padding;
|
||||||
|
const off_t region_aligned_size = region_aligned_end - region_aligned_begin;
|
||||||
|
|
||||||
|
/// Буфер, в который запишем данные из диска.
|
||||||
|
const Position buffer_begin = fill_buffer.internalBuffer().begin();
|
||||||
|
buffer_capacity = this->memory.size();
|
||||||
|
|
||||||
|
size_t excess_count = 0;
|
||||||
|
|
||||||
|
if ((region_left_padding + region_size) > buffer_capacity)
|
||||||
{
|
{
|
||||||
if (errno != EINTR)
|
excess_count = region_left_padding + region_size - buffer_capacity;
|
||||||
{
|
::memset(&memory_page[0], 0, memory_page.size());
|
||||||
got_exception = true;
|
|
||||||
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
is_pending_read = false;
|
/// Создать запрос на синхронное чтение.
|
||||||
off_t bytes_read = events[0].res;
|
|
||||||
|
|
||||||
|
size_t i = 0;
|
||||||
|
|
||||||
|
iov[i].iov_base = buffer_begin;
|
||||||
|
iov[i].iov_len = (excess_count > 0) ? buffer_capacity : region_aligned_size;
|
||||||
|
++i;
|
||||||
|
|
||||||
|
if (excess_count > 0)
|
||||||
|
{
|
||||||
|
iov[i].iov_base = &memory_page[0];
|
||||||
|
iov[i].iov_len = memory_page.size();
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
iov_size = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadBufferAIO::publishReceivedData()
|
||||||
|
{
|
||||||
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||||
|
|
||||||
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
|
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
|
||||||
@ -271,6 +260,74 @@ void ReadBufferAIO::waitForAIOCompletion()
|
|||||||
is_eof = true;
|
is_eof = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ReadBufferAIO::synchronousRead()
|
||||||
|
{
|
||||||
|
initRequest();
|
||||||
|
bytes_read = ::preadv(fd, iov, iov_size, region_aligned_begin);
|
||||||
|
publishReceivedData();
|
||||||
|
swapBuffers();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadBufferAIO::sync()
|
||||||
|
{
|
||||||
|
if (is_eof || !is_pending_read)
|
||||||
|
return;
|
||||||
|
|
||||||
|
waitForAIOCompletion();
|
||||||
|
swapBuffers();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadBufferAIO::skipLastRequest()
|
||||||
|
{
|
||||||
|
if (is_eof || !is_pending_read)
|
||||||
|
return;
|
||||||
|
|
||||||
|
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
|
||||||
|
{
|
||||||
|
if (errno != EINTR)
|
||||||
|
{
|
||||||
|
got_exception = true;
|
||||||
|
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
is_pending_read = false;
|
||||||
|
is_started = false;
|
||||||
|
|
||||||
|
/// Несмотря на то, что не станем использовать результат последнего запроса,
|
||||||
|
/// убедимся, что запрос правильно выполнен.
|
||||||
|
|
||||||
|
bytes_read = events[0].res;
|
||||||
|
|
||||||
|
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
|
||||||
|
|
||||||
|
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
|
||||||
|
{
|
||||||
|
got_exception = true;
|
||||||
|
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadBufferAIO::waitForAIOCompletion()
|
||||||
|
{
|
||||||
|
if (!is_pending_read)
|
||||||
|
return;
|
||||||
|
|
||||||
|
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
|
||||||
|
{
|
||||||
|
if (errno != EINTR)
|
||||||
|
{
|
||||||
|
got_exception = true;
|
||||||
|
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
is_pending_read = false;
|
||||||
|
bytes_read = events[0].res;
|
||||||
|
|
||||||
|
publishReceivedData();
|
||||||
|
}
|
||||||
|
|
||||||
void ReadBufferAIO::swapBuffers() noexcept
|
void ReadBufferAIO::swapBuffers() noexcept
|
||||||
{
|
{
|
||||||
internalBuffer().swap(fill_buffer.internalBuffer());
|
internalBuffer().swap(fill_buffer.internalBuffer());
|
||||||
|
@ -39,6 +39,8 @@ bool test15(const std::string & filename, const std::string & buf);
|
|||||||
bool test16(const std::string & filename, const std::string & buf);
|
bool test16(const std::string & filename, const std::string & buf);
|
||||||
bool test17(const std::string & filename, const std::string & buf);
|
bool test17(const std::string & filename, const std::string & buf);
|
||||||
bool test18(const std::string & filename, const std::string & buf);
|
bool test18(const std::string & filename, const std::string & buf);
|
||||||
|
bool test19(const std::string & filename, const std::string & buf);
|
||||||
|
bool test20(const std::string & filename, const std::string & buf);
|
||||||
|
|
||||||
void run()
|
void run()
|
||||||
{
|
{
|
||||||
@ -88,7 +90,9 @@ void run()
|
|||||||
std::bind(test15, std::ref(filename3), std::ref(buf3)),
|
std::bind(test15, std::ref(filename3), std::ref(buf3)),
|
||||||
std::bind(test16, std::ref(filename3), std::ref(buf3)),
|
std::bind(test16, std::ref(filename3), std::ref(buf3)),
|
||||||
std::bind(test17, std::ref(filename4), std::ref(buf4)),
|
std::bind(test17, std::ref(filename4), std::ref(buf4)),
|
||||||
std::bind(test18, std::ref(filename5), std::ref(buf5))
|
std::bind(test18, std::ref(filename5), std::ref(buf5)),
|
||||||
|
std::bind(test19, std::ref(filename), std::ref(buf)),
|
||||||
|
std::bind(test20, std::ref(filename), std::ref(buf))
|
||||||
};
|
};
|
||||||
|
|
||||||
unsigned int num = 0;
|
unsigned int num = 0;
|
||||||
@ -368,25 +372,33 @@ bool test9(const std::string & filename, const std::string & buf)
|
|||||||
|
|
||||||
bool test10(const std::string & filename, const std::string & buf)
|
bool test10(const std::string & filename, const std::string & buf)
|
||||||
{
|
{
|
||||||
std::string newbuf;
|
|
||||||
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
|
||||||
|
|
||||||
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
size_t count1 = in.read(&newbuf[0], newbuf.length());
|
|
||||||
if (count1 != newbuf.length())
|
|
||||||
return false;
|
|
||||||
|
|
||||||
if (newbuf != buf.substr(0, 4 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
{
|
||||||
return false;
|
std::string newbuf;
|
||||||
|
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
size_t count1 = in.read(&newbuf[0], newbuf.length());
|
||||||
|
if (count1 != newbuf.length())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (newbuf != buf.substr(0, 4 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
(void) in.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
|
(void) in.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
|
||||||
|
|
||||||
size_t count2 = in.read(&newbuf[0], newbuf.length());
|
{
|
||||||
if (count2 != newbuf.length())
|
std::string newbuf;
|
||||||
return false;
|
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
if (newbuf != buf.substr(6 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
size_t count2 = in.read(&newbuf[0], newbuf.length());
|
||||||
return false;
|
if (count2 != newbuf.length())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (newbuf != buf.substr(6 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -606,6 +618,70 @@ bool test18(const std::string & filename, const std::string & buf)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool test19(const std::string & filename, const std::string & buf)
|
||||||
|
{
|
||||||
|
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::string newbuf;
|
||||||
|
newbuf.resize(5 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
size_t count1 = in.read(&newbuf[0], newbuf.length());
|
||||||
|
if (count1 != newbuf.length())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (newbuf != buf.substr(0, 5 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::string newbuf;
|
||||||
|
newbuf.resize(5 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
size_t count2 = in.read(&newbuf[0], newbuf.length());
|
||||||
|
if (count2 != newbuf.length())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (newbuf != buf.substr(5 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool test20(const std::string & filename, const std::string & buf)
|
||||||
|
{
|
||||||
|
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::string newbuf;
|
||||||
|
newbuf.resize(5 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
size_t count1 = in.read(&newbuf[0], newbuf.length());
|
||||||
|
if (count1 != newbuf.length())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (newbuf != buf.substr(0, 5 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void) in.getPositionInFile();
|
||||||
|
|
||||||
|
{
|
||||||
|
std::string newbuf;
|
||||||
|
newbuf.resize(5 * DEFAULT_AIO_FILE_BLOCK_SIZE);
|
||||||
|
|
||||||
|
size_t count2 = in.read(&newbuf[0], newbuf.length());
|
||||||
|
if (count2 != newbuf.length())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (newbuf != buf.substr(5 * DEFAULT_AIO_FILE_BLOCK_SIZE))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int main()
|
int main()
|
||||||
|
Loading…
Reference in New Issue
Block a user