dbms: Server: feature development. [#METR-15090]

This commit is contained in:
Alexey Arno 2015-03-25 17:14:06 +03:00
parent 33460bb5ea
commit 9f6d66a285
2 changed files with 244 additions and 19 deletions

View File

@ -93,6 +93,9 @@ off_t WriteBufferAIO::seek(off_t off, int whence)
throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (pos_in_file > max_pos)
max_pos = pos_in_file;
return pos_in_file;
}
@ -142,12 +145,12 @@ void WriteBufferAIO::nextImpl()
truncate_count = 0;
/// Регион диска, на который хотим записать данные.
/// Регион диска, в который хотим записать данные.
off_t region_begin = pos_in_file;
off_t region_end = pos_in_file + flush_buffer.offset();
size_t region_size = region_end - region_begin;
/// Регион диска, на который действительно записываем данные.
/// Регион диска, в который действительно записываем данные.
size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
size_t region_right_padding = 0;
if (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE != 0)
@ -165,15 +168,14 @@ void WriteBufferAIO::nextImpl()
/// Обработать буфер, чтобы он оторажал структуру региона диска.
// Process the left side.
bool has_excess_buffer = false;
size_t excess = 0;
if (region_left_padding > 0)
{
if ((region_left_padding + buffer_size) > buffer_capacity)
{
has_excess_buffer = true;
excess = region_left_padding + buffer_size - buffer_capacity;
::memset(&memory_page[0], 0, memory_page.size());
::memcpy(&memory_page[0], buffer_end - region_left_padding, region_left_padding);
::memcpy(&memory_page[0], buffer_end - excess, excess);
buffer_end = buffer_begin + buffer_capacity;
buffer_size = buffer_capacity;
}
@ -194,20 +196,22 @@ void WriteBufferAIO::nextImpl()
}
}
Position end_ptr;
if (has_excess_buffer)
end_ptr = &memory_page[region_left_padding];
else
end_ptr = buffer_end;
// Process the right side.
if (region_right_padding > 0)
{
Position end_ptr;
if (excess > 0)
end_ptr = &memory_page[excess];
else
end_ptr = buffer_end;
::memset(end_ptr, 0, region_right_padding);
ssize_t read_count = ::pread(fd2, end_ptr, region_right_padding, region_end);
if (read_count < 0)
read_count = 0;
truncate_count = DEFAULT_AIO_FILE_BLOCK_SIZE - (region_left_padding + read_count);
{
got_exception = true;
throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
}
truncate_count = region_right_padding - read_count;
}
/// Создать запрос на асинхронную запись.
@ -215,13 +219,13 @@ void WriteBufferAIO::nextImpl()
size_t i = 0;
iov[i].iov_base = buffer_begin;
iov[i].iov_len = (has_excess_buffer ? buffer_capacity : region_aligned_size);
iov[i].iov_len = ((excess > 0) ? buffer_capacity : region_aligned_size);
++i;
if (has_excess_buffer)
if (excess > 0)
{
iov[i].iov_base = &memory_page[0];
iov[i].iov_len = DEFAULT_AIO_FILE_BLOCK_SIZE;
iov[i].iov_len = memory_page.size();
++i;
}

View File

@ -21,6 +21,10 @@ bool test3();
bool test4();
bool test5();
bool test6();
bool test7();
bool test8();
bool test9();
bool test10();
void run()
{
@ -31,7 +35,11 @@ void run()
test3,
test4,
test5,
test6
test6,
test7,
test8,
test9,
test10
};
unsigned int num = 0;
@ -391,6 +399,219 @@ bool test6()
return true;
}
bool test7()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
std::string buf2 = "1111111111";
{
DB::WriteBufferAIO out(filename, DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.seek(3, SEEK_SET);
out.write(&buf[0], buf.length());
out.seek(3, SEEK_CUR);
out.write(&buf2[0], buf2.length());
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
if (received.length() != (6 + buf.length() + buf2.length()))
return false;
if (received.substr(0, 3) != std::string(3, '\0'))
return false;
if (received.substr(3, buf.length()) != buf)
return false;
if (received.substr(3 + buf.length(), 3) != std::string(3, '\0'))
return false;
if (received.substr(6 + buf.length()) != buf2)
return false;
in.close();
fs::remove_all(directory);
return true;
}
bool test8()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
std::string buf2 = "11111111112222222222";
{
// Minimal buffer size = 2 pages.
DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.seek(DEFAULT_AIO_FILE_BLOCK_SIZE - (buf2.length() / 2), SEEK_SET);
out.write(&buf2[0], buf2.length());
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
if (received.length() != 4106)
return false;
if (received.substr(0, 4086) != std::string(4086, '\0'))
return false;
if (received.substr(4086, 20) != buf2)
return false;
in.close();
fs::remove_all(directory);
return true;
}
bool test9()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
std::string buf2 = "11111111112222222222";
{
// Minimal buffer size = 2 pages.
DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE - (buf2.length() / 2), SEEK_SET);
out.write(&buf2[0], buf2.length());
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
if (received.length() != 8202)
return false;
if (received.substr(0, 8182) != std::string(8182, '\0'))
return false;
if (received.substr(8182, 20) != buf2)
return false;
in.close();
fs::remove_all(directory);
return true;
}
bool test10()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 3 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
std::string buf2(DEFAULT_AIO_FILE_BLOCK_SIZE + 10, '1');
{
DB::WriteBufferAIO out(filename, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.seek(3, SEEK_SET);
out.write(&buf[0], buf.length());
out.seek(-DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
out.write(&buf2[0], buf2.length());
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
if (received.substr(3, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(0, 2 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
if (received.substr(3 + 2 * DEFAULT_AIO_FILE_BLOCK_SIZE, DEFAULT_AIO_FILE_BLOCK_SIZE + 10) != buf2)
return false;
return true;
}
}
int main()