ClickHouse/utils/iotest/iotest_aio.cpp
2012-10-04 08:12:13 +00:00

316 lines
7.4 KiB
C++

#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <iomanip>
#include <vector>
#include <Poco/NumberParser.h>
#include <Poco/NumberFormatter.h>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <DB/Core/Exception.h>
#include <statdaemons/threadpool.hpp>
#include <statdaemons/Stopwatch.h>
#include <stdlib.h>
#include <malloc.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <linux/aio_abi.h>
#include <sys/syscall.h>
using DB::throwFromErrno;
inline int io_setup(unsigned nr, aio_context_t *ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
}
inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events, struct timespec *timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
enum Mode
{
MODE_READ = 1,
MODE_WRITE = 2,
};
struct AlignedBuffer
{
int size;
char * data;
AlignedBuffer() : size(0), data(NULL) {}
void init(int size_)
{
uninit();
size_t page = sysconf(_SC_PAGESIZE);
size = size_;
data = static_cast<char*>(memalign(page, (size + page - 1) / page * page));
if (!data)
throwFromErrno("memalign failed");
}
void uninit()
{
if (data)
free(data);
data = NULL;
size = 0;
}
AlignedBuffer(int size_) : size(0), data(NULL)
{
init(size_);
}
~AlignedBuffer()
{
uninit();
}
};
struct AioContext
{
aio_context_t ctx;
AioContext()
{
ctx = 0;
if (io_setup(128, &ctx) < 0)
throwFromErrno("io_setup failed");
}
~AioContext()
{
io_destroy(ctx);
}
};
typedef Poco::SharedPtr<Poco::Exception> ExceptionPtr;
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count, ExceptionPtr & exception)
{
try
{
AioContext ctx;
std::vector<AlignedBuffer> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i)
{
buffers[i].init(block_size);
}
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data);
size_t in_progress = 0;
size_t blocks_sent = 0;
std::vector<bool> buffer_used(buffers_count, false);
std::vector<iocb> iocbs(buffers_count);
std::vector<iocb*> query_cbs;
std::vector<io_event> events(buffers_count);
while (blocks_sent < count || in_progress > 0)
{
/// Составим запросы.
query_cbs.clear();
for (size_t i = 0; i < buffers_count; ++i)
{
if (blocks_sent >= count || in_progress >= buffers_count)
break;
if (buffer_used[i])
continue;
buffer_used[i] = true;
++blocks_sent;
++in_progress;
char * buf = buffers[i].data;
for (size_t j = 0; j + 3 < block_size; j += 3)
{
long r;
lrand48_r(&rand_data, &r);
buf[j ] = static_cast<char>((r >> 0) & 255);
buf[j + 1] = static_cast<char>((r >> 8) & 255);
buf[j + 2] = static_cast<char>((r >> 16) & 255);
}
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
iocb & cb = iocbs[i];
memset(&cb, 0, sizeof(cb));
cb.aio_buf = reinterpret_cast<uint64_t>(buf);
cb.aio_fildes = fd;
cb.aio_nbytes = block_size;
cb.aio_offset = offset;
cb.aio_data = static_cast<uint64_t>(i);
if (mode == MODE_READ)
{
cb.aio_lio_opcode = IOCB_CMD_PREAD;
}
else
{
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
}
query_cbs.push_back(&cb);
}
/// Отправим запросы.
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
throwFromErrno("io_submit failed");
/// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов.
memset(&events[0], 0, buffers_count * sizeof(events[0]));
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], NULL);
if (evs < 0)
throwFromErrno("io_getevents failed");
for (int i = 0; i < evs; ++i)
{
int b = static_cast<int>(events[i].data);
if (events[i].res != static_cast<int>(block_size))
throw Poco::Exception("read/write error");
--in_progress;
buffer_used[b] = false;
}
}
// iocb cb;
// memset(&cb, 0, sizeof(cb));
// cb.aio_lio_opcode = IOCB_CMD_FSYNC;
// cb.aio_fildes = fd;
// iocb *cbs = &cb;
// if (io_submit(ctx.ctx, 1, &cbs) < 0)
// throwFromErrno("io_submit of fdatasync failed");
// io_event e;
// if (io_getevents(ctx.ctx, 1, 1, &e, NULL) < 0)
// throwFromErrno("io_getevents failed");
// if (e.res < 0)
// throw Poco::Exception("sync failed");
}
catch (const Poco::Exception & e)
{
exception = e.clone();
}
catch (...)
{
exception = new Poco::Exception("Unknown exception");
}
}
int mainImpl(int argc, char ** argv)
{
const char * file_name = 0;
int mode = MODE_READ;
size_t min_offset = 0;
size_t max_offset = 0;
size_t block_size = 0;
size_t buffers_count = 0;
size_t threads_count = 0;
size_t count = 0;
if (argc != 9)
{
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
return 1;
}
file_name = argv[1];
if (argv[2][0] == 'w')
mode = MODE_WRITE;
min_offset = Poco::NumberParser::parseUnsigned64(argv[3]);
max_offset = Poco::NumberParser::parseUnsigned64(argv[4]);
block_size = Poco::NumberParser::parseUnsigned64(argv[5]);
threads_count = Poco::NumberParser::parseUnsigned(argv[6]);
buffers_count = Poco::NumberParser::parseUnsigned(argv[7]);
count = Poco::NumberParser::parseUnsigned(argv[8]);
int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
if (-1 == fd)
throwFromErrno("Cannot open file");
typedef std::vector<ExceptionPtr> Exceptions;
boost::threadpool::pool pool(threads_count);
Exceptions exceptions(threads_count);
Stopwatch watch;
for (size_t i = 0; i < threads_count; ++i)
pool.schedule(boost::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, boost::ref(exceptions[i])));
pool.wait();
watch.stop();
for (size_t i = 0; i < threads_count; ++i)
if (exceptions[i])
exceptions[i]->rethrow();
if (0 != close(fd))
throwFromErrno("Cannot close file");
std::cout << std::fixed << std::setprecision(2)
<< "Done " << count << " * " << threads_count << " ops";
std::cout << " in " << watch.elapsedSeconds() << " sec."
<< ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
<< ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
<< std::endl;
return 0;
}
int main(int argc, char ** argv)
{
try
{
return mainImpl(argc, argv);
}
catch (const Poco::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
}