#if defined(OS_LINUX) || defined(__FreeBSD__) #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int CANNOT_IO_SUBMIT; extern const int CANNOT_IO_GETEVENTS; } AIOContextPool::~AIOContextPool() { cancelled.store(true, std::memory_order_relaxed); io_completion_monitor.join(); } void AIOContextPool::doMonitor() { /// continue checking for events unless cancelled while (!cancelled.load(std::memory_order_relaxed)) waitForCompletion(); /// wait until all requests have been completed while (!promises.empty()) waitForCompletion(); } void AIOContextPool::waitForCompletion() { /// array to hold completion events std::vector events(max_concurrent_events); try { const auto num_events = getCompletionEvents(events.data(), max_concurrent_events); fulfillPromises(events.data(), num_events); notifyProducers(num_events); } catch (...) { /// there was an error, log it, return to any producer and continue reportExceptionToAnyProducer(); tryLogCurrentException("AIOContextPool::waitForCompletion()"); } } int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) const { timespec timeout{timeout_sec, 0}; auto num_events = 0; /// request 1 to `max_events` events while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) if (errno != EINTR) throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno); /// Unpoison the memory returned from a non-instrumented system call. __msan_unpoison(events, sizeof(*events) * num_events); return num_events; } void AIOContextPool::fulfillPromises(const io_event events[], const int num_events) { if (num_events == 0) return; const std::lock_guard lock{mutex}; /// look at returned events and find corresponding promise, set result and erase promise from map for (const auto & event : boost::make_iterator_range(events, events + num_events)) { /// get id from event #if defined(__FreeBSD__) const auto completed_id = (reinterpret_cast(event.udata))->aio_data; #else const auto completed_id = event.data; #endif /// set value via promise and release it const auto it = promises.find(completed_id); if (it == std::end(promises)) { LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id {}", completed_id); continue; } #if defined(__FreeBSD__) it->second.set_value(aio_return(reinterpret_cast(event.udata))); #else it->second.set_value(event.res); #endif promises.erase(it); } } void AIOContextPool::notifyProducers(const int num_producers) const { if (num_producers == 0) return; if (num_producers > 1) have_resources.notify_all(); else have_resources.notify_one(); } void AIOContextPool::reportExceptionToAnyProducer() { const std::lock_guard lock{mutex}; const auto any_promise_it = std::begin(promises); any_promise_it->second.set_exception(std::current_exception()); } std::future AIOContextPool::post(struct iocb & iocb) { std::unique_lock lock{mutex}; /// get current id and increment it by one const auto request_id = next_id; ++next_id; /// create a promise and put request in "queue" promises.emplace(request_id, std::promise{}); /// store id in AIO request for further identification iocb.aio_data = request_id; struct iocb * requests[] { &iocb }; /// submit a request while (io_submit(aio_context.ctx, 1, requests) < 0) { if (errno == EAGAIN) /// wait until at least one event has been completed (or a spurious wakeup) and try again have_resources.wait(lock); else if (errno != EINTR) throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT); } return promises[request_id].get_future(); } AIOContextPool & AIOContextPool::instance() { static AIOContextPool instance; return instance; } } #endif