mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-05 23:31:24 +00:00
307 lines
7.1 KiB
C++
307 lines
7.1 KiB
C++
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <stdlib.h>
|
|
#include <time.h>
|
|
|
|
#include <iostream>
|
|
#include <iomanip>
|
|
#include <vector>
|
|
|
|
#include <Poco/NumberParser.h>
|
|
#include <Poco/NumberFormatter.h>
|
|
#include <Poco/Exception.h>
|
|
#include <Poco/SharedPtr.h>
|
|
|
|
#include <DB/Core/Exception.h>
|
|
|
|
#include <statdaemons/threadpool.hpp>
|
|
#include <statdaemons/Stopwatch.h>
|
|
|
|
#include <stdlib.h>
|
|
#include <malloc.h>
|
|
|
|
#include <fcntl.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <linux/aio_abi.h>
|
|
#include <sys/syscall.h>
|
|
|
|
using DB::throwFromErrno;
|
|
|
|
inline int io_setup(unsigned nr, aio_context_t *ctxp)
|
|
{
|
|
return syscall(__NR_io_setup, nr, ctxp);
|
|
}
|
|
|
|
inline int io_destroy(aio_context_t ctx)
|
|
{
|
|
return syscall(__NR_io_destroy, ctx);
|
|
}
|
|
|
|
inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
|
|
{
|
|
return syscall(__NR_io_submit, ctx, nr, iocbpp);
|
|
}
|
|
|
|
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
|
|
struct io_event *events, struct timespec *timeout)
|
|
{
|
|
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
|
|
}
|
|
|
|
enum Mode
|
|
{
|
|
MODE_READ = 1,
|
|
MODE_WRITE = 2,
|
|
};
|
|
|
|
|
|
struct AlignedBuffer
|
|
{
|
|
int size = 0;
|
|
char * data = nullptr;
|
|
|
|
AlignedBuffer() {}
|
|
|
|
void init(int size_)
|
|
{
|
|
uninit();
|
|
size_t page = sysconf(_SC_PAGESIZE);
|
|
size = size_;
|
|
data = static_cast<char*>(memalign(page, (size + page - 1) / page * page));
|
|
if (!data)
|
|
throwFromErrno("memalign failed");
|
|
}
|
|
|
|
void uninit()
|
|
{
|
|
if (data)
|
|
free(data);
|
|
data = nullptr;
|
|
size = 0;
|
|
}
|
|
|
|
AlignedBuffer(int size_) : size(0), data(NULL)
|
|
{
|
|
init(size_);
|
|
}
|
|
|
|
~AlignedBuffer()
|
|
{
|
|
uninit();
|
|
}
|
|
};
|
|
|
|
struct AioContext
|
|
{
|
|
aio_context_t ctx;
|
|
|
|
AioContext()
|
|
{
|
|
ctx = 0;
|
|
if (io_setup(128, &ctx) < 0)
|
|
throwFromErrno("io_setup failed");
|
|
}
|
|
|
|
~AioContext()
|
|
{
|
|
io_destroy(ctx);
|
|
}
|
|
};
|
|
|
|
typedef Poco::SharedPtr<Poco::Exception> ExceptionPtr;
|
|
|
|
|
|
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count, ExceptionPtr & exception)
|
|
{
|
|
try
|
|
{
|
|
AioContext ctx;
|
|
|
|
std::vector<AlignedBuffer> buffers(buffers_count);
|
|
for (size_t i = 0; i < buffers_count; ++i)
|
|
{
|
|
buffers[i].init(block_size);
|
|
}
|
|
|
|
drand48_data rand_data;
|
|
timespec times;
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×);
|
|
srand48_r(times.tv_nsec, &rand_data);
|
|
|
|
size_t in_progress = 0;
|
|
size_t blocks_sent = 0;
|
|
std::vector<bool> buffer_used(buffers_count, false);
|
|
std::vector<iocb> iocbs(buffers_count);
|
|
std::vector<iocb*> query_cbs;
|
|
std::vector<io_event> events(buffers_count);
|
|
|
|
while (blocks_sent < count || in_progress > 0)
|
|
{
|
|
/// Составим запросы.
|
|
query_cbs.clear();
|
|
for (size_t i = 0; i < buffers_count; ++i)
|
|
{
|
|
if (blocks_sent >= count || in_progress >= buffers_count)
|
|
break;
|
|
|
|
if (buffer_used[i])
|
|
continue;
|
|
|
|
buffer_used[i] = true;
|
|
++blocks_sent;
|
|
++in_progress;
|
|
|
|
char * buf = buffers[i].data;
|
|
|
|
long rand_result1 = 0;
|
|
long rand_result2 = 0;
|
|
long rand_result3 = 0;
|
|
lrand48_r(&rand_data, &rand_result1);
|
|
lrand48_r(&rand_data, &rand_result2);
|
|
lrand48_r(&rand_data, &rand_result3);
|
|
|
|
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
|
|
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
|
|
|
|
iocb & cb = iocbs[i];
|
|
memset(&cb, 0, sizeof(cb));
|
|
cb.aio_buf = reinterpret_cast<uint64_t>(buf);
|
|
cb.aio_fildes = fd;
|
|
cb.aio_nbytes = block_size;
|
|
cb.aio_offset = offset;
|
|
cb.aio_data = static_cast<uint64_t>(i);
|
|
|
|
if (mode == MODE_READ)
|
|
{
|
|
cb.aio_lio_opcode = IOCB_CMD_PREAD;
|
|
}
|
|
else
|
|
{
|
|
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
|
|
}
|
|
|
|
query_cbs.push_back(&cb);
|
|
}
|
|
|
|
/// Отправим запросы.
|
|
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
|
|
throwFromErrno("io_submit failed");
|
|
|
|
/// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов.
|
|
memset(&events[0], 0, buffers_count * sizeof(events[0]));
|
|
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
|
|
if (evs < 0)
|
|
throwFromErrno("io_getevents failed");
|
|
|
|
for (int i = 0; i < evs; ++i)
|
|
{
|
|
int b = static_cast<int>(events[i].data);
|
|
if (events[i].res != static_cast<int>(block_size))
|
|
throw Poco::Exception("read/write error");
|
|
--in_progress;
|
|
buffer_used[b] = false;
|
|
}
|
|
}
|
|
|
|
// iocb cb;
|
|
// memset(&cb, 0, sizeof(cb));
|
|
// cb.aio_lio_opcode = IOCB_CMD_FSYNC;
|
|
// cb.aio_fildes = fd;
|
|
// iocb *cbs = &cb;
|
|
// if (io_submit(ctx.ctx, 1, &cbs) < 0)
|
|
// throwFromErrno("io_submit of fdatasync failed");
|
|
// io_event e;
|
|
// if (io_getevents(ctx.ctx, 1, 1, &e, nullptr) < 0)
|
|
// throwFromErrno("io_getevents failed");
|
|
// if (e.res < 0)
|
|
// throw Poco::Exception("sync failed");
|
|
}
|
|
catch (const Poco::Exception & e)
|
|
{
|
|
exception = e.clone();
|
|
}
|
|
catch (...)
|
|
{
|
|
exception = new Poco::Exception("Unknown exception");
|
|
}
|
|
}
|
|
|
|
|
|
int mainImpl(int argc, char ** argv)
|
|
{
|
|
const char * file_name = 0;
|
|
int mode = MODE_READ;
|
|
size_t min_offset = 0;
|
|
size_t max_offset = 0;
|
|
size_t block_size = 0;
|
|
size_t buffers_count = 0;
|
|
size_t threads_count = 0;
|
|
size_t count = 0;
|
|
|
|
if (argc != 9)
|
|
{
|
|
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
|
|
return 1;
|
|
}
|
|
|
|
file_name = argv[1];
|
|
if (argv[2][0] == 'w')
|
|
mode = MODE_WRITE;
|
|
min_offset = Poco::NumberParser::parseUnsigned64(argv[3]);
|
|
max_offset = Poco::NumberParser::parseUnsigned64(argv[4]);
|
|
block_size = Poco::NumberParser::parseUnsigned64(argv[5]);
|
|
threads_count = Poco::NumberParser::parseUnsigned(argv[6]);
|
|
buffers_count = Poco::NumberParser::parseUnsigned(argv[7]);
|
|
count = Poco::NumberParser::parseUnsigned(argv[8]);
|
|
|
|
int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
|
|
if (-1 == fd)
|
|
throwFromErrno("Cannot open file");
|
|
|
|
typedef std::vector<ExceptionPtr> Exceptions;
|
|
|
|
boost::threadpool::pool pool(threads_count);
|
|
Exceptions exceptions(threads_count);
|
|
|
|
Stopwatch watch;
|
|
|
|
for (size_t i = 0; i < threads_count; ++i)
|
|
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, std::ref(exceptions[i])));
|
|
pool.wait();
|
|
|
|
watch.stop();
|
|
|
|
for (size_t i = 0; i < threads_count; ++i)
|
|
if (exceptions[i])
|
|
exceptions[i]->rethrow();
|
|
|
|
if (0 != close(fd))
|
|
throwFromErrno("Cannot close file");
|
|
|
|
std::cout << std::fixed << std::setprecision(2)
|
|
<< "Done " << count << " * " << threads_count << " ops";
|
|
std::cout << " in " << watch.elapsedSeconds() << " sec."
|
|
<< ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
|
|
<< ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
|
|
<< std::endl;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
int main(int argc, char ** argv)
|
|
{
|
|
try
|
|
{
|
|
return mainImpl(argc, argv);
|
|
}
|
|
catch (const Poco::Exception & e)
|
|
{
|
|
std::cerr << e.what() << ", " << e.message() << std::endl;
|
|
return 1;
|
|
}
|
|
}
|