From 249a5922fe4df3a447c32e98f44f1135c0c97cce Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 5 Nov 2015 16:52:06 +0300 Subject: [PATCH] AIOContextPool: refactor, extract member functions, employ a condition_variable to minimize CPU waste [#METR-18618] --- dbms/include/DB/Common/AIO.h | 139 ++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 59 deletions(-) diff --git a/dbms/include/DB/Common/AIO.h b/dbms/include/DB/Common/AIO.h index b8496af0915..65d3b73e263 100644 --- a/dbms/include/DB/Common/AIO.h +++ b/dbms/include/DB/Common/AIO.h @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include #include +#include #include #include #include @@ -64,96 +66,103 @@ class AIOContextPool : public Singleton friend class Singleton; static const auto max_concurrent_events = 128; - static const auto max_timeout_nsec = 1000; + static const auto max_timeout_sec = 1; + static const auto max_timeout_nsec = 0; AIOContext aio_context{max_concurrent_events}; std::size_t id{}; mutable std::mutex mutex; + mutable std::condition_variable have_resources; std::map> promises; - std::vector queued_requests; std::atomic cancelled{false}; - std::thread man_of_his_word{&AIOContextPool::fulfill_promises, this}; + std::thread io_completion_monitor{&AIOContextPool::monitorForCompletion, this}; ~AIOContextPool() { cancelled.store(true, std::memory_order_relaxed); - man_of_his_word.join(); + io_completion_monitor.join(); } - void fulfill_promises() + void monitorForCompletion() { /// array to hold completion events - io_event events[max_concurrent_events] {}; - const auto p_events = &events[0]; - timespec timeout{0, max_timeout_nsec}; + io_event events[max_concurrent_events]; /// continue checking for events unless cancelled while (!cancelled.load(std::memory_order_relaxed)) { try { - /// number of events signaling on - auto num_events = 0; - - /// request 1 to `max_concurrent_events` events - while ((num_events = io_getevents(aio_context.ctx, 1, max_concurrent_events, p_events, &timeout)) < 0) - if (errno != EINTR) - throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", - ErrorCodes::AIO_COMPLETION_ERROR, errno); - - /// look at returned events and - for (const auto & event : boost::make_iterator_range(p_events, p_events + num_events)) - { - /// get id from event - const auto id = event.data; - - /// find corresponding promise, set result and erase promise from map - const std::lock_guard lock{mutex}; - - const auto it = promises.find(id); - it->second.set_value(event.res); - promises.erase(it); - } - - if (queued_requests.empty()) - continue; - - const std::lock_guard lock{mutex}; - auto num_requests = 0; - - /// submit a batch of requests - while ((num_requests = io_submit(aio_context.ctx, queued_requests.size(), queued_requests.data())) < 0) - if (!(errno == EINTR || errno == EAGAIN)) - throwFromErrno("io_submit: Failed to submit batch of " + - std::to_string(queued_requests.size()) + " requests for asynchronous IO", - ErrorCodes::AIO_SUBMIT_ERROR, errno); - - if (num_requests <= 0) - continue; - - /// erase submitted requests - queued_requests.erase(std::begin(queued_requests), - std::next(std::begin(queued_requests), num_requests)); + const auto num_events = getCompletionEvents(events, max_concurrent_events); + fulfillPromises(events, num_events); + notifyProducers(num_events); } catch (...) { - /// there was an error, log it, return to any client and continue - const std::lock_guard lock{mutex}; - - const auto any_promise_it = std::begin(promises); - any_promise_it->second.set_exception(std::current_exception()); - + /// there was an error, log it, return to any producer and continue + reportExceptionToAnyProducer(); tryLogCurrentException("AIOContextPool::fulfill_promises()"); } } } + int getCompletionEvents(io_event events[], const int max_events) + { + timespec timeout{max_timeout_sec, max_timeout_nsec}; + + auto num_events = 0; + + /// request 1 to `max_concurrent_events` events + while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) + if (errno != EINTR) + throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", + ErrorCodes::AIO_COMPLETION_ERROR, errno); + + return num_events; + } + + void fulfillPromises(const io_event events[], const int num_events) + { + const std::lock_guard lock{mutex}; + + /// look at returned events and find corresponding promise, set result and erase promise from map + for (const auto & event : boost::make_iterator_range(events, events + num_events)) + { + /// get id from event + const auto id = event.data; + + /// set value via promise and release it + const auto it = promises.find(id); + it->second.set_value(event.res); + promises.erase(it); + } + } + + void notifyProducers(const int num_producers) const + { + if (num_producers == 0) + return; + + if (num_producers > 1) + have_resources.notify_all(); + else + have_resources.notify_one(); + } + + void reportExceptionToAnyProducer() + { + const std::lock_guard lock{mutex}; + + const auto any_promise_it = std::begin(promises); + any_promise_it->second.set_exception(std::current_exception()); + } + public: std::future post(struct iocb & iocb) { - const std::lock_guard lock{mutex}; + std::unique_lock lock{mutex}; /// get current id and increment it by one const auto request_id = id++; @@ -162,10 +171,22 @@ public: promises.emplace(request_id, std::promise{}); /// store id in AIO request for further identification iocb.aio_data = request_id; - queued_requests.push_back(&iocb); + + auto num_requests = 0; + struct iocb * requests[] { &iocb }; + + /// submit a request + while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0) + { + if (errno == EAGAIN) + /// wait until at least one event has been completed (or a spurious wakeup) and try again + have_resources.wait(lock); + else if (errno != EINTR) + throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", + ErrorCodes::AIO_SUBMIT_ERROR, errno); + } return promises[request_id].get_future(); - } };