AIOContextPool: refactor, extract member functions, employ a condition_variable to minimize CPU waste [#METR-18618]

This commit is contained in:
Andrey Mironov 2015-11-05 16:52:06 +03:00
parent 1f08dccab2
commit 249a5922fe

View File

@ -1,9 +1,11 @@
#pragma once
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/Exception.h>
#include <common/singleton.h>
#include <boost/range/iterator_range.hpp>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <future>
#include <mutex>
#include <map>
@ -64,96 +66,103 @@ class AIOContextPool : public Singleton<AIOContextPool>
friend class Singleton<AIOContextPool>;
static const auto max_concurrent_events = 128;
static const auto max_timeout_nsec = 1000;
static const auto max_timeout_sec = 1;
static const auto max_timeout_nsec = 0;
AIOContext aio_context{max_concurrent_events};
std::size_t id{};
mutable std::mutex mutex;
mutable std::condition_variable have_resources;
std::map<std::size_t, std::promise<ssize_t>> promises;
std::vector<iocb *> queued_requests;
std::atomic<bool> cancelled{false};
std::thread man_of_his_word{&AIOContextPool::fulfill_promises, this};
std::thread io_completion_monitor{&AIOContextPool::monitorForCompletion, this};
~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
man_of_his_word.join();
io_completion_monitor.join();
}
void fulfill_promises()
void monitorForCompletion()
{
/// array to hold completion events
io_event events[max_concurrent_events] {};
const auto p_events = &events[0];
timespec timeout{0, max_timeout_nsec};
io_event events[max_concurrent_events];
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
{
try
{
/// number of events signaling on
auto num_events = 0;
/// request 1 to `max_concurrent_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_concurrent_events, p_events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion",
ErrorCodes::AIO_COMPLETION_ERROR, errno);
/// look at returned events and
for (const auto & event : boost::make_iterator_range(p_events, p_events + num_events))
{
/// get id from event
const auto id = event.data;
/// find corresponding promise, set result and erase promise from map
const std::lock_guard<std::mutex> lock{mutex};
const auto it = promises.find(id);
it->second.set_value(event.res);
promises.erase(it);
}
if (queued_requests.empty())
continue;
const std::lock_guard<std::mutex> lock{mutex};
auto num_requests = 0;
/// submit a batch of requests
while ((num_requests = io_submit(aio_context.ctx, queued_requests.size(), queued_requests.data())) < 0)
if (!(errno == EINTR || errno == EAGAIN))
throwFromErrno("io_submit: Failed to submit batch of " +
std::to_string(queued_requests.size()) + " requests for asynchronous IO",
ErrorCodes::AIO_SUBMIT_ERROR, errno);
if (num_requests <= 0)
continue;
/// erase submitted requests
queued_requests.erase(std::begin(queued_requests),
std::next(std::begin(queued_requests), num_requests));
const auto num_events = getCompletionEvents(events, max_concurrent_events);
fulfillPromises(events, num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any client and continue
const std::lock_guard<std::mutex> lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::fulfill_promises()");
}
}
}
int getCompletionEvents(io_event events[], const int max_events)
{
timespec timeout{max_timeout_sec, max_timeout_nsec};
auto num_events = 0;
/// request 1 to `max_concurrent_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion",
ErrorCodes::AIO_COMPLETION_ERROR, errno);
return num_events;
}
void fulfillPromises(const io_event events[], const int num_events)
{
const std::lock_guard<std::mutex> lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
const auto id = event.data;
/// set value via promise and release it
const auto it = promises.find(id);
it->second.set_value(event.res);
promises.erase(it);
}
}
void notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void reportExceptionToAnyProducer()
{
const std::lock_guard<std::mutex> lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
public:
std::future<ssize_t> post(struct iocb & iocb)
{
const std::lock_guard<std::mutex> lock{mutex};
std::unique_lock<std::mutex> lock{mutex};
/// get current id and increment it by one
const auto request_id = id++;
@ -162,10 +171,22 @@ public:
promises.emplace(request_id, std::promise<ssize_t>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
queued_requests.push_back(&iocb);
auto num_requests = 0;
struct iocb * requests[] { &iocb };
/// submit a request
while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO",
ErrorCodes::AIO_SUBMIT_ERROR, errno);
}
return promises[request_id].get_future();
}
};