From 9f6d66a28533ade41a7cd44cad7b878e1b4e3a48 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 25 Mar 2015 17:14:06 +0300 Subject: [PATCH] dbms: Server: feature development. [#METR-15090] --- dbms/src/IO/WriteBufferAIO.cpp | 40 +++-- dbms/src/IO/tests/write_buffer_aio.cpp | 223 ++++++++++++++++++++++++- 2 files changed, 244 insertions(+), 19 deletions(-) diff --git a/dbms/src/IO/WriteBufferAIO.cpp b/dbms/src/IO/WriteBufferAIO.cpp index d2bd7f43aee..7315166fd3f 100644 --- a/dbms/src/IO/WriteBufferAIO.cpp +++ b/dbms/src/IO/WriteBufferAIO.cpp @@ -93,6 +93,9 @@ off_t WriteBufferAIO::seek(off_t off, int whence) throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } + if (pos_in_file > max_pos) + max_pos = pos_in_file; + return pos_in_file; } @@ -142,12 +145,12 @@ void WriteBufferAIO::nextImpl() truncate_count = 0; - /// Регион диска, на который хотим записать данные. + /// Регион диска, в который хотим записать данные. off_t region_begin = pos_in_file; off_t region_end = pos_in_file + flush_buffer.offset(); size_t region_size = region_end - region_begin; - /// Регион диска, на который действительно записываем данные. + /// Регион диска, в который действительно записываем данные. size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE; size_t region_right_padding = 0; if (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE != 0) @@ -165,15 +168,14 @@ void WriteBufferAIO::nextImpl() /// Обработать буфер, чтобы он оторажал структуру региона диска. - // Process the left side. - bool has_excess_buffer = false; + size_t excess = 0; if (region_left_padding > 0) { if ((region_left_padding + buffer_size) > buffer_capacity) { - has_excess_buffer = true; + excess = region_left_padding + buffer_size - buffer_capacity; ::memset(&memory_page[0], 0, memory_page.size()); - ::memcpy(&memory_page[0], buffer_end - region_left_padding, region_left_padding); + ::memcpy(&memory_page[0], buffer_end - excess, excess); buffer_end = buffer_begin + buffer_capacity; buffer_size = buffer_capacity; } @@ -194,20 +196,22 @@ void WriteBufferAIO::nextImpl() } } - Position end_ptr; - if (has_excess_buffer) - end_ptr = &memory_page[region_left_padding]; - else - end_ptr = buffer_end; - - // Process the right side. if (region_right_padding > 0) { + Position end_ptr; + if (excess > 0) + end_ptr = &memory_page[excess]; + else + end_ptr = buffer_end; + ::memset(end_ptr, 0, region_right_padding); ssize_t read_count = ::pread(fd2, end_ptr, region_right_padding, region_end); if (read_count < 0) - read_count = 0; - truncate_count = DEFAULT_AIO_FILE_BLOCK_SIZE - (region_left_padding + read_count); + { + got_exception = true; + throw Exception("Read error", ErrorCodes::AIO_READ_ERROR); + } + truncate_count = region_right_padding - read_count; } /// Создать запрос на асинхронную запись. @@ -215,13 +219,13 @@ void WriteBufferAIO::nextImpl() size_t i = 0; iov[i].iov_base = buffer_begin; - iov[i].iov_len = (has_excess_buffer ? buffer_capacity : region_aligned_size); + iov[i].iov_len = ((excess > 0) ? buffer_capacity : region_aligned_size); ++i; - if (has_excess_buffer) + if (excess > 0) { iov[i].iov_base = &memory_page[0]; - iov[i].iov_len = DEFAULT_AIO_FILE_BLOCK_SIZE; + iov[i].iov_len = memory_page.size(); ++i; } diff --git a/dbms/src/IO/tests/write_buffer_aio.cpp b/dbms/src/IO/tests/write_buffer_aio.cpp index f5f8ee60ccd..799c29dd8c9 100644 --- a/dbms/src/IO/tests/write_buffer_aio.cpp +++ b/dbms/src/IO/tests/write_buffer_aio.cpp @@ -21,6 +21,10 @@ bool test3(); bool test4(); bool test5(); bool test6(); +bool test7(); +bool test8(); +bool test9(); +bool test10(); void run() { @@ -31,7 +35,11 @@ void run() test3, test4, test5, - test6 + test6, + test7, + test8, + test9, + test10 }; unsigned int num = 0; @@ -391,6 +399,219 @@ bool test6() return true; } +bool test7() +{ + namespace fs = boost::filesystem; + + static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + + char pattern[] = "/tmp/fileXXXXXX"; + char * dir = ::mkdtemp(pattern); + if (dir == nullptr) + die("Could not create directory"); + + const std::string directory = std::string(dir); + const std::string filename = directory + "/foo"; + + size_t n = DEFAULT_AIO_FILE_BLOCK_SIZE; + + std::string buf; + buf.reserve(n); + + for (size_t i = 0; i < n; ++i) + buf += symbols[i % symbols.length()]; + + std::string buf2 = "1111111111"; + + { + DB::WriteBufferAIO out(filename, DEFAULT_AIO_FILE_BLOCK_SIZE); + + if (out.getFileName() != filename) + return false; + if (out.getFD() == -1) + return false; + + out.seek(3, SEEK_SET); + out.write(&buf[0], buf.length()); + out.seek(3, SEEK_CUR); + out.write(&buf2[0], buf2.length()); + } + + std::ifstream in(filename.c_str()); + if (!in.is_open()) + die("Could not open file"); + + std::string received{ std::istreambuf_iterator(in), std::istreambuf_iterator() }; + + if (received.length() != (6 + buf.length() + buf2.length())) + return false; + if (received.substr(0, 3) != std::string(3, '\0')) + return false; + if (received.substr(3, buf.length()) != buf) + return false; + if (received.substr(3 + buf.length(), 3) != std::string(3, '\0')) + return false; + if (received.substr(6 + buf.length()) != buf2) + return false; + + in.close(); + fs::remove_all(directory); + + return true; +} + +bool test8() +{ + namespace fs = boost::filesystem; + + static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + + char pattern[] = "/tmp/fileXXXXXX"; + char * dir = ::mkdtemp(pattern); + if (dir == nullptr) + die("Could not create directory"); + + const std::string directory = std::string(dir); + const std::string filename = directory + "/foo"; + + std::string buf2 = "11111111112222222222"; + + { + // Minimal buffer size = 2 pages. + DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE); + + if (out.getFileName() != filename) + return false; + if (out.getFD() == -1) + return false; + + out.seek(DEFAULT_AIO_FILE_BLOCK_SIZE - (buf2.length() / 2), SEEK_SET); + out.write(&buf2[0], buf2.length()); + } + + std::ifstream in(filename.c_str()); + if (!in.is_open()) + die("Could not open file"); + + std::string received{ std::istreambuf_iterator(in), std::istreambuf_iterator() }; + + if (received.length() != 4106) + return false; + if (received.substr(0, 4086) != std::string(4086, '\0')) + return false; + if (received.substr(4086, 20) != buf2) + return false; + + in.close(); + fs::remove_all(directory); + + return true; +} + +bool test9() +{ + namespace fs = boost::filesystem; + + static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + + char pattern[] = "/tmp/fileXXXXXX"; + char * dir = ::mkdtemp(pattern); + if (dir == nullptr) + die("Could not create directory"); + + const std::string directory = std::string(dir); + const std::string filename = directory + "/foo"; + + std::string buf2 = "11111111112222222222"; + + { + // Minimal buffer size = 2 pages. + DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE); + + if (out.getFileName() != filename) + return false; + if (out.getFD() == -1) + return false; + + out.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE - (buf2.length() / 2), SEEK_SET); + out.write(&buf2[0], buf2.length()); + } + + std::ifstream in(filename.c_str()); + if (!in.is_open()) + die("Could not open file"); + + std::string received{ std::istreambuf_iterator(in), std::istreambuf_iterator() }; + + if (received.length() != 8202) + return false; + if (received.substr(0, 8182) != std::string(8182, '\0')) + return false; + if (received.substr(8182, 20) != buf2) + return false; + + in.close(); + fs::remove_all(directory); + + return true; +} + +bool test10() +{ + namespace fs = boost::filesystem; + + static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + + char pattern[] = "/tmp/fileXXXXXX"; + char * dir = ::mkdtemp(pattern); + if (dir == nullptr) + die("Could not create directory"); + + const std::string directory = std::string(dir); + const std::string filename = directory + "/foo"; + + size_t n = 3 * DEFAULT_AIO_FILE_BLOCK_SIZE; + + std::string buf; + buf.reserve(n); + + for (size_t i = 0; i < n; ++i) + buf += symbols[i % symbols.length()]; + + std::string buf2(DEFAULT_AIO_FILE_BLOCK_SIZE + 10, '1'); + + { + DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE); + + if (out.getFileName() != filename) + return false; + if (out.getFD() == -1) + return false; + + out.seek(3, SEEK_SET); + out.write(&buf[0], buf.length()); + out.seek(-DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR); + out.write(&buf2[0], buf2.length()); + } + + std::ifstream in(filename.c_str()); + if (!in.is_open()) + die("Could not open file"); + + std::string received{ std::istreambuf_iterator(in), std::istreambuf_iterator() }; + + in.close(); + fs::remove_all(directory); + + if (received.substr(3, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(0, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE)) + return false; + + if (received.substr(3 + 2 * DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE + 10) != buf2) + return false; + + return true; +} + } int main()