ClickHouse/utils/iotest/iotest_aio.cpp
Vladimir Smirnov d36f52502e Make it compilable on OS X
It's still hackish and dirty, but server and client compies.

Server starts, but throwes meaningless exception on any query.

Client seems to be working fine.

Linux compilation might (but shouldn't) be broken (not tested).
2016-11-01 17:59:21 +01:00

273 lines
6.1 KiB
C++

#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <iomanip>
#include <vector>
#include <Poco/NumberParser.h>
#include <Poco/NumberFormatter.h>
#include <Poco/Exception.h>
#include <DB/Common/Exception.h>
#include <DB/Common/ThreadPool.h>
#include <DB/Common/Stopwatch.h>
#include <stdlib.h>
#include <malloc.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <linux/aio_abi.h>
#include <sys/syscall.h>
using DB::throwFromErrno;
inline int io_setup(unsigned nr, aio_context_t *ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
}
inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events, struct timespec *timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
enum Mode
{
MODE_READ = 1,
MODE_WRITE = 2,
};
struct AlignedBuffer
{
int size = 0;
char * data = nullptr;
AlignedBuffer() {}
void init(int size_)
{
uninit();
size_t page = sysconf(_SC_PAGESIZE);
size = size_;
data = static_cast<char*>(memalign(page, (size + page - 1) / page * page));
if (!data)
throwFromErrno("memalign failed");
}
void uninit()
{
if (data)
free(data);
data = nullptr;
size = 0;
}
AlignedBuffer(int size_) : size(0), data(NULL)
{
init(size_);
}
~AlignedBuffer()
{
uninit();
}
};
struct AioContext
{
aio_context_t ctx;
AioContext()
{
ctx = 0;
if (io_setup(128, &ctx) < 0)
throwFromErrno("io_setup failed");
}
~AioContext()
{
io_destroy(ctx);
}
};
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count)
{
AioContext ctx;
std::vector<AlignedBuffer> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i)
{
buffers[i].init(block_size);
}
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data);
size_t in_progress = 0;
size_t blocks_sent = 0;
std::vector<bool> buffer_used(buffers_count, false);
std::vector<iocb> iocbs(buffers_count);
std::vector<iocb*> query_cbs;
std::vector<io_event> events(buffers_count);
while (blocks_sent < count || in_progress > 0)
{
/// Составим запросы.
query_cbs.clear();
for (size_t i = 0; i < buffers_count; ++i)
{
if (blocks_sent >= count || in_progress >= buffers_count)
break;
if (buffer_used[i])
continue;
buffer_used[i] = true;
++blocks_sent;
++in_progress;
char * buf = buffers[i].data;
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
iocb & cb = iocbs[i];
memset(&cb, 0, sizeof(cb));
cb.aio_buf = reinterpret_cast<UInt64>(buf);
cb.aio_fildes = fd;
cb.aio_nbytes = block_size;
cb.aio_offset = offset;
cb.aio_data = static_cast<UInt64>(i);
if (mode == MODE_READ)
{
cb.aio_lio_opcode = IOCB_CMD_PREAD;
}
else
{
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
}
query_cbs.push_back(&cb);
}
/// Отправим запросы.
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
throwFromErrno("io_submit failed");
/// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов.
memset(&events[0], 0, buffers_count * sizeof(events[0]));
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
if (evs < 0)
throwFromErrno("io_getevents failed");
for (int i = 0; i < evs; ++i)
{
int b = static_cast<int>(events[i].data);
if (events[i].res != static_cast<int>(block_size))
throw Poco::Exception("read/write error");
--in_progress;
buffer_used[b] = false;
}
}
}
int mainImpl(int argc, char ** argv)
{
const char * file_name = 0;
int mode = MODE_READ;
size_t min_offset = 0;
size_t max_offset = 0;
size_t block_size = 0;
size_t buffers_count = 0;
size_t threads_count = 0;
size_t count = 0;
if (argc != 9)
{
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
return 1;
}
file_name = argv[1];
if (argv[2][0] == 'w')
mode = MODE_WRITE;
min_offset = Poco::NumberParser::parseUnsigned64(argv[3]);
max_offset = Poco::NumberParser::parseUnsigned64(argv[4]);
block_size = Poco::NumberParser::parseUnsigned64(argv[5]);
threads_count = Poco::NumberParser::parseUnsigned(argv[6]);
buffers_count = Poco::NumberParser::parseUnsigned(argv[7]);
count = Poco::NumberParser::parseUnsigned(argv[8]);
int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
if (-1 == fd)
throwFromErrno("Cannot open file");
ThreadPool pool(threads_count);
Stopwatch watch;
for (size_t i = 0; i < threads_count; ++i)
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count));
pool.wait();
watch.stop();
if (0 != close(fd))
throwFromErrno("Cannot close file");
std::cout << std::fixed << std::setprecision(2)
<< "Done " << count << " * " << threads_count << " ops";
std::cout << " in " << watch.elapsedSeconds() << " sec."
<< ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
<< ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
<< std::endl;
return 0;
}
int main(int argc, char ** argv)
{
try
{
return mainImpl(argc, argv);
}
catch (const Poco::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
}