dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-04-03 14:51:41 +03:00
parent d377b188a7
commit fa89f1643e
6 changed files with 370 additions and 65 deletions

View File

@ -105,7 +105,11 @@ private:
return;
}
int res = posix_memalign(reinterpret_cast<void **>(&new_m_data), alignment, (m_capacity + alignment - 1) / alignment * alignment);
size_t aligned_capacity = (m_capacity + alignment - 1) / alignment * alignment;
m_capacity = aligned_capacity;
m_size = m_capacity;
int res = posix_memalign(reinterpret_cast<void **>(&new_m_data), alignment, m_capacity);
if (0 != res)
DB::throwFromErrno("Cannot allocate memory (posix_memalign)", ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);

View File

@ -44,11 +44,11 @@ public:
{
bytes += offset();
bool res = nextImpl();
(void) sync();
if (!res)
working_buffer.resize(0);
pos = working_buffer.begin() + working_buffer_offset;
working_buffer_offset = 0;
pos = working_buffer.begin();
return res;
}
@ -118,7 +118,7 @@ public:
while (bytes_copied < n && !eof())
{
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to + bytes_copied, pos, bytes_to_copy);
::memcpy(to + bytes_copied, pos, bytes_to_copy);
pos += bytes_to_copy;
bytes_copied += bytes_to_copy;
}
@ -150,6 +150,7 @@ private:
* Кинуть исключение, если что-то не так.
*/
virtual bool nextImpl() { return false; };
virtual bool sync() { return false; }
};

View File

@ -3,12 +3,14 @@
#include <DB/IO/ReadBufferFromFileBase.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Core/Defines.h>
#include <statdaemons/AIO.h>
#include <string>
#include <limits>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
namespace DB
{
@ -34,6 +36,7 @@ public:
private:
off_t getPositionInFileRelaxed() const noexcept;
bool nextImpl() override;
bool sync() override;
/// Ждать окончания текущей асинхронной задачи.
void waitForAIOCompletion();
/// Менять местами основной и дублирующий буферы.
@ -49,6 +52,12 @@ private:
AIOContext aio_context{1};
iovec iov[2];
/// Дополнительный буфер размером со страницу. Содежрит те данные, которые
/// не влезают в основной буфер.
Memory memory_page{DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE};
const std::string filename;
size_t max_bytes_read = std::numeric_limits<size_t>::max();
@ -57,6 +66,8 @@ private:
off_t pos_in_file = 0;
int fd = -1;
size_t buffer_capacity = 0;
/// Асинхронная операция чтения ещё не завершилась.
bool is_pending_read = false;
/// Было получено исключение.

View File

@ -12,7 +12,7 @@ namespace DB
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_,
char * existing_memory_)
: ReadBufferFromFileBase(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
fill_buffer(BufferWithOwnMemory<ReadBuffer>(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
@ -37,7 +37,7 @@ ReadBufferAIO::~ReadBufferAIO()
{
try
{
waitForAIOCompletion();
(void) sync();
}
catch (...)
{
@ -61,7 +61,7 @@ void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
off_t ReadBufferAIO::seek(off_t off, int whence)
{
waitForAIOCompletion();
(void) sync();
off_t new_pos;
@ -132,13 +132,8 @@ bool ReadBufferAIO::nextImpl()
if (is_eof)
return false;
waitForAIOCompletion();
/// При первом вызове не надо обменять местами основной и дублирующий буферы.
if (is_started)
swapBuffers();
else
is_started = true;
(void) sync();
is_started = true;
/// Если конец файла только что достигнут, больше ничего не делаем.
if (is_eof)
@ -147,20 +142,51 @@ bool ReadBufferAIO::nextImpl()
/// Количество запрашиваемых байтов.
requested_byte_count = std::min(fill_buffer.internalBuffer().size(), max_bytes_read);
/// Для запроса выравниваем количество запрашиваемых байтов на границе следующего блока.
size_t effective_byte_count = requested_byte_count;
if ((effective_byte_count % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
effective_byte_count += DEFAULT_AIO_FILE_BLOCK_SIZE - (effective_byte_count % DEFAULT_AIO_FILE_BLOCK_SIZE);
/// Регион диска, из которого хотим читать данные.
const off_t region_begin = pos_in_file;
const off_t region_end = pos_in_file + requested_byte_count;
const size_t region_size = region_end - region_begin;
/// Также выравниваем позицию в файле на границе предыдущего блока.
off_t effective_pos_in_file = pos_in_file - (pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE);
/// Выровненный регион диска, из которого хотим читать данные.
const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
/// Создать запрос.
request.aio_lio_opcode = IOCB_CMD_PREAD;
const off_t region_aligned_begin = region_begin - region_left_padding;
const off_t region_aligned_end = region_end + region_right_padding;
const off_t region_aligned_size = region_aligned_end - region_aligned_begin;
/// Буфер, в который запишем данные из диска.
const Position buffer_begin = fill_buffer.internalBuffer().begin();
buffer_capacity = this->memory.size();
size_t excess_count = 0;
if ((region_left_padding + region_size) > buffer_capacity)
{
excess_count = region_left_padding + region_size - buffer_capacity;
::memset(&memory_page[0], 0, memory_page.size());
}
/// Создать запрос на асинхронное чтение.
size_t i = 0;
iov[i].iov_base = buffer_begin;
iov[i].iov_len = (excess_count > 0) ? buffer_capacity : region_aligned_size;
++i;
if (excess_count > 0)
{
iov[i].iov_base = &memory_page[0];
iov[i].iov_len = memory_page.size();
++i;
}
request.aio_lio_opcode = IOCB_CMD_PREADV;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(fill_buffer.internalBuffer().begin());
request.aio_nbytes = effective_byte_count;
request.aio_offset = effective_pos_in_file;
request.aio_buf = reinterpret_cast<UInt64>(iov);
request.aio_nbytes = i;
request.aio_offset = region_aligned_begin;
/// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
@ -176,51 +202,79 @@ bool ReadBufferAIO::nextImpl()
return true;
}
bool ReadBufferAIO::sync()
{
if (is_eof)
return false;
if (!is_started)
return false;
if (!is_pending_read)
return false;
waitForAIOCompletion();
swapBuffers();
return true;
}
void ReadBufferAIO::waitForAIOCompletion()
{
if (is_pending_read)
if (!is_pending_read)
return;
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
{
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
}
is_pending_read = false;
off_t bytes_read = events[0].res;
if (bytes_read < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
/// Игнорируем излишние байты справа.
bytes_read = std::min(bytes_read, static_cast<off_t>(requested_byte_count));
if (bytes_read > 0)
fill_buffer.buffer().resize(bytes_read);
if (static_cast<size_t>(bytes_read) < fill_buffer.internalBuffer().size())
is_eof = true;
/// Игнорируем излишние байты слева.
working_buffer_offset = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
bytes_read -= working_buffer_offset;
pos_in_file += bytes_read;
total_bytes_read += bytes_read;
if (total_bytes_read == max_bytes_read)
is_eof = true;
}
is_pending_read = false;
off_t bytes_read = events[0].res;
size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE;
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
/// Игнорируем излишние байты слева.
bytes_read -= region_left_padding;
/// Игнорируем излишние байты справа.
bytes_read = std::min(bytes_read, static_cast<off_t>(requested_byte_count));
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
Position buffer_begin = fill_buffer.internalBuffer().begin();
if ((region_left_padding + bytes_read) > buffer_capacity)
{
size_t excess_count = region_left_padding + bytes_read - buffer_capacity;
::memmove(buffer_begin, buffer_begin + region_left_padding, buffer_capacity);
::memcpy(buffer_begin + buffer_capacity - region_left_padding, &memory_page[0], excess_count);
}
else
::memmove(buffer_begin, buffer_begin + region_left_padding, bytes_read);
if (bytes_read > 0)
fill_buffer.buffer().resize(bytes_read);
if (static_cast<size_t>(bytes_read) < requested_byte_count)
is_eof = true;
pos_in_file += bytes_read;
total_bytes_read += bytes_read;
if (total_bytes_read == max_bytes_read)
is_eof = true;
}
void ReadBufferAIO::swapBuffers() noexcept

View File

@ -12,7 +12,7 @@ namespace DB
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: WriteBufferFromFileBase(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
flush_buffer(BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);

View File

@ -15,6 +15,9 @@ namespace
void run();
void prepare(size_t s, std::string & directory, std::string & filename, std::string & buf);
void prepare2(std::string & directory, std::string & filename, std::string & buf);
void prepare3(std::string & directory, std::string & filename, std::string & buf);
void prepare4(std::string & directory, std::string & filename, std::string & buf);
void die(const std::string & msg);
void run_test(unsigned int num, const std::function<bool()> func);
@ -32,6 +35,10 @@ bool test11(const std::string & filename);
bool test12(const std::string & filename, const std::string & buf);
bool test13(const std::string & filename, const std::string & buf);
bool test14(const std::string & filename, const std::string & buf);
bool test15(const std::string & filename, const std::string & buf);
bool test16(const std::string & filename, const std::string & buf);
bool test17(const std::string & filename, const std::string & buf);
bool test18(const std::string & filename, const std::string & buf);
void run()
{
@ -47,6 +54,21 @@ void run()
std::string buf2;
prepare(2 * DEFAULT_AIO_FILE_BLOCK_SIZE - 3, directory2, filename2, buf2);
std::string directory3;
std::string filename3;
std::string buf3;
prepare2(directory3, filename3, buf3);
std::string directory4;
std::string filename4;
std::string buf4;
prepare3(directory4, filename4, buf4);
std::string directory5;
std::string filename5;
std::string buf5;
prepare4(directory5, filename5, buf5);
const std::vector<std::function<bool()> > tests =
{
std::bind(test1, std::ref(filename)),
@ -62,7 +84,11 @@ void run()
std::bind(test11, std::ref(filename)),
std::bind(test12, std::ref(filename), std::ref(buf)),
std::bind(test13, std::ref(filename2), std::ref(buf2)),
std::bind(test14, std::ref(filename), std::ref(buf))
std::bind(test14, std::ref(filename), std::ref(buf)),
std::bind(test15, std::ref(filename3), std::ref(buf3)),
std::bind(test16, std::ref(filename3), std::ref(buf3)),
std::bind(test17, std::ref(filename4), std::ref(buf4)),
std::bind(test18, std::ref(filename5), std::ref(buf5))
};
unsigned int num = 0;
@ -74,6 +100,9 @@ void run()
fs::remove_all(directory);
fs::remove_all(directory2);
fs::remove_all(directory3);
fs::remove_all(directory4);
fs::remove_all(directory5);
}
void prepare(size_t s, std::string & directory, std::string & filename, std::string & buf)
@ -101,6 +130,68 @@ void prepare(size_t s, std::string & directory, std::string & filename, std::st
out << buf;
}
void prepare2(std::string & directory, std::string & filename, std::string & buf)
{
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
directory = std::string(dir);
filename = directory + "/foo";
buf = "122333444455555666666777777788888888999999999";
std::ofstream out(filename.c_str());
if (!out.is_open())
die("Could not open file");
out << buf;
}
void prepare3(std::string & directory, std::string & filename, std::string & buf)
{
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
directory = std::string(dir);
filename = directory + "/foo";
buf = "122333444455555666666777777788888888999999999";
std::ofstream out(filename.c_str());
if (!out.is_open())
die("Could not open file");
out.seekp(7, std::ios_base::beg);
out << buf;
}
void prepare4(std::string & directory, std::string & filename, std::string & buf)
{
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
directory = std::string(dir);
filename = directory + "/foo";
std::ofstream out(filename.c_str());
if (!out.is_open())
die("Could not open file");
for (size_t i = 0; i < 1340; ++i)
buf += symbols[i % symbols.length()];
out.seekp(2984, std::ios_base::beg);
out << buf;
}
void die(const std::string & msg)
{
std::cout << msg << "\n";
@ -372,6 +463,149 @@ bool test14(const std::string & filename, const std::string & buf)
return true;
}
bool test15(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(1000);
DB::ReadBufferAIO in(filename, DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], 1);
if (count != 1)
return false;
if (newbuf[0] != '1')
return false;
return true;
}
bool test16(const std::string & filename, const std::string & buf)
{
DB::ReadBufferAIO in(filename, DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count;
{
std::string newbuf;
newbuf.resize(1);
count = in.read(&newbuf[0], 1);
if (count != 1)
return false;
if (newbuf[0] != '1')
return false;
}
in.seek(2, SEEK_CUR);
{
std::string newbuf;
newbuf.resize(3);
count = in.read(&newbuf[0], 3);
if (count != 3)
return false;
if (newbuf != "333")
return false;
}
in.seek(4, SEEK_CUR);
{
std::string newbuf;
newbuf.resize(5);
count = in.read(&newbuf[0], 5);
if (count != 5)
return false;
if (newbuf != "55555")
return false;
}
in.seek(6, SEEK_CUR);
{
std::string newbuf;
newbuf.resize(7);
count = in.read(&newbuf[0], 7);
if (count != 7)
return false;
if (newbuf != "7777777")
return false;
}
in.seek(8, SEEK_CUR);
{
std::string newbuf;
newbuf.resize(9);
count = in.read(&newbuf[0], 9);
if (count != 9)
return false;
if (newbuf != "999999999")
return false;
}
return true;
}
bool test17(const std::string & filename, const std::string & buf)
{
DB::ReadBufferAIO in(filename, DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count;
{
std::string newbuf;
newbuf.resize(10);
count = in.read(&newbuf[0], 10);
if (count != 10)
return false;
if (newbuf.substr(0, 7) != std::string(7, '\0'))
return false;
if (newbuf.substr(7) != "122")
return false;
}
in.seek(7 + buf.length() - 2, SEEK_SET);
{
std::string newbuf;
newbuf.resize(160);
count = in.read(&newbuf[0], 160);
if (count != 2)
return false;
if (newbuf.substr(0, 2) != "99")
return false;
}
in.seek(7 + buf.length() + DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_SET);
{
std::string newbuf;
newbuf.resize(50);
count = in.read(&newbuf[0], 50);
if (count != 0)
return false;
}
return true;
}
bool test18(const std::string & filename, const std::string & buf)
{
DB::ReadBufferAIO in(filename, DEFAULT_AIO_FILE_BLOCK_SIZE);
std::string newbuf;
newbuf.resize(1340);
in.seek(2984, SEEK_SET);
size_t count = in.read(&newbuf[0], 1340);
if (count != 1340)
return false;
if (newbuf != buf)
return false;
return true;
}
}
int main()
@ -379,3 +613,4 @@ int main()
run();
return 0;
}