#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using DB::throwFromErrno; inline int io_setup(unsigned nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } inline int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout); } enum Mode { MODE_READ = 1, MODE_WRITE = 2, }; struct AlignedBuffer { int size = 0; char * data = nullptr; AlignedBuffer() {} void init(int size_) { uninit(); size_t page = sysconf(_SC_PAGESIZE); size = size_; data = static_cast(memalign(page, (size + page - 1) / page * page)); if (!data) throwFromErrno("memalign failed"); } void uninit() { if (data) free(data); data = nullptr; size = 0; } AlignedBuffer(int size_) : size(0), data(NULL) { init(size_); } ~AlignedBuffer() { uninit(); } }; struct AioContext { aio_context_t ctx; AioContext() { ctx = 0; if (io_setup(128, &ctx) < 0) throwFromErrno("io_setup failed"); } ~AioContext() { io_destroy(ctx); } }; void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count, std::exception_ptr & exception) { try { AioContext ctx; std::vector buffers(buffers_count); for (size_t i = 0; i < buffers_count; ++i) { buffers[i].init(block_size); } drand48_data rand_data; timespec times; clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×); srand48_r(times.tv_nsec, &rand_data); size_t in_progress = 0; size_t blocks_sent = 0; std::vector buffer_used(buffers_count, false); std::vector iocbs(buffers_count); std::vector query_cbs; std::vector events(buffers_count); while (blocks_sent < count || in_progress > 0) { /// Составим запросы. query_cbs.clear(); for (size_t i = 0; i < buffers_count; ++i) { if (blocks_sent >= count || in_progress >= buffers_count) break; if (buffer_used[i]) continue; buffer_used[i] = true; ++blocks_sent; ++in_progress; char * buf = buffers[i].data; long rand_result1 = 0; long rand_result2 = 0; long rand_result3 = 0; lrand48_r(&rand_data, &rand_result1); lrand48_r(&rand_data, &rand_result2); lrand48_r(&rand_data, &rand_result3); size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43); size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size; iocb & cb = iocbs[i]; memset(&cb, 0, sizeof(cb)); cb.aio_buf = reinterpret_cast(buf); cb.aio_fildes = fd; cb.aio_nbytes = block_size; cb.aio_offset = offset; cb.aio_data = static_cast(i); if (mode == MODE_READ) { cb.aio_lio_opcode = IOCB_CMD_PREAD; } else { cb.aio_lio_opcode = IOCB_CMD_PWRITE; } query_cbs.push_back(&cb); } /// Отправим запросы. if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0) throwFromErrno("io_submit failed"); /// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов. memset(&events[0], 0, buffers_count * sizeof(events[0])); int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr); if (evs < 0) throwFromErrno("io_getevents failed"); for (int i = 0; i < evs; ++i) { int b = static_cast(events[i].data); if (events[i].res != static_cast(block_size)) throw Poco::Exception("read/write error"); --in_progress; buffer_used[b] = false; } } } catch (...) { exception = std::current_exception(); } } int mainImpl(int argc, char ** argv) { const char * file_name = 0; int mode = MODE_READ; size_t min_offset = 0; size_t max_offset = 0; size_t block_size = 0; size_t buffers_count = 0; size_t threads_count = 0; size_t count = 0; if (argc != 9) { std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl; return 1; } file_name = argv[1]; if (argv[2][0] == 'w') mode = MODE_WRITE; min_offset = Poco::NumberParser::parseUnsigned64(argv[3]); max_offset = Poco::NumberParser::parseUnsigned64(argv[4]); block_size = Poco::NumberParser::parseUnsigned64(argv[5]); threads_count = Poco::NumberParser::parseUnsigned(argv[6]); buffers_count = Poco::NumberParser::parseUnsigned(argv[7]); count = Poco::NumberParser::parseUnsigned(argv[8]); int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT); if (-1 == fd) throwFromErrno("Cannot open file"); typedef std::vector Exceptions; boost::threadpool::pool pool(threads_count); Exceptions exceptions(threads_count); Stopwatch watch; for (size_t i = 0; i < threads_count; ++i) pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, std::ref(exceptions[i]))); pool.wait(); watch.stop(); for (size_t i = 0; i < threads_count; ++i) if (exceptions[i]) std::rethrow_exception(exceptions[i]); if (0 != close(fd)) throwFromErrno("Cannot close file"); std::cout << std::fixed << std::setprecision(2) << "Done " << count << " * " << threads_count << " ops"; std::cout << " in " << watch.elapsedSeconds() << " sec." << ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec." << ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec." << std::endl; return 0; } int main(int argc, char ** argv) { try { return mainImpl(argc, argv); } catch (const Poco::Exception & e) { std::cerr << e.what() << ", " << e.message() << std::endl; return 1; } }