mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 12:22:12 +00:00
217 lines
5.2 KiB
C++
217 lines
5.2 KiB
C++
#pragma once
|
|
|
|
#include <DB/Core/ErrorCodes.h>
|
|
#include <DB/Common/Exception.h>
|
|
#include <common/logger_useful.h>
|
|
#include <common/singleton.h>
|
|
#include <Poco/Logger.h>
|
|
#include <boost/range/iterator_range.hpp>
|
|
#include <boost/noncopyable.hpp>
|
|
#include <condition_variable>
|
|
#include <future>
|
|
#include <mutex>
|
|
#include <map>
|
|
#include <linux/aio_abi.h>
|
|
#include <sys/syscall.h>
|
|
#include <unistd.h>
|
|
|
|
|
|
/** Небольшие обёртки для асинхронного ввода-вывода.
|
|
*/
|
|
|
|
|
|
inline int io_setup(unsigned nr, aio_context_t * ctxp)
|
|
{
|
|
return syscall(__NR_io_setup, nr, ctxp);
|
|
}
|
|
|
|
inline int io_destroy(aio_context_t ctx)
|
|
{
|
|
return syscall(__NR_io_destroy, ctx);
|
|
}
|
|
|
|
/// last argument is an array of pointers technically speaking
|
|
inline int io_submit(aio_context_t ctx, long nr, struct iocb * iocbpp[])
|
|
{
|
|
return syscall(__NR_io_submit, ctx, nr, iocbpp);
|
|
}
|
|
|
|
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event *events, struct timespec * timeout)
|
|
{
|
|
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
|
|
}
|
|
|
|
|
|
struct AIOContext : private boost::noncopyable
|
|
{
|
|
aio_context_t ctx;
|
|
|
|
AIOContext(unsigned int nr_events = 128)
|
|
{
|
|
ctx = 0;
|
|
if (io_setup(nr_events, &ctx) < 0)
|
|
DB::throwFromErrno("io_setup failed");
|
|
}
|
|
|
|
~AIOContext()
|
|
{
|
|
io_destroy(ctx);
|
|
}
|
|
};
|
|
|
|
namespace DB
|
|
{
|
|
|
|
|
|
class AIOContextPool : public Singleton<AIOContextPool>
|
|
{
|
|
friend class Singleton<AIOContextPool>;
|
|
|
|
static const auto max_concurrent_events = 128;
|
|
static const auto timeout_sec = 1;
|
|
|
|
AIOContext aio_context{max_concurrent_events};
|
|
|
|
using id_t = size_t;
|
|
using bytes_read_t = ssize_t;
|
|
|
|
/// Autoincremental id used to identify completed requests
|
|
id_t id{};
|
|
mutable std::mutex mutex;
|
|
mutable std::condition_variable have_resources;
|
|
std::map<id_t, std::promise<bytes_read_t>> promises;
|
|
|
|
std::atomic<bool> cancelled{false};
|
|
std::thread io_completion_monitor{&AIOContextPool::doMonitor, this};
|
|
|
|
~AIOContextPool()
|
|
{
|
|
cancelled.store(true, std::memory_order_relaxed);
|
|
io_completion_monitor.join();
|
|
}
|
|
|
|
void doMonitor()
|
|
{
|
|
/// continue checking for events unless cancelled
|
|
while (!cancelled.load(std::memory_order_relaxed))
|
|
waitForCompletion();
|
|
|
|
/// wait until all requests have been completed
|
|
while (!promises.empty())
|
|
waitForCompletion();
|
|
}
|
|
|
|
void waitForCompletion()
|
|
{
|
|
/// array to hold completion events
|
|
io_event events[max_concurrent_events];
|
|
|
|
try
|
|
{
|
|
const auto num_events = getCompletionEvents(events, max_concurrent_events);
|
|
fulfillPromises(events, num_events);
|
|
notifyProducers(num_events);
|
|
}
|
|
catch (...)
|
|
{
|
|
/// there was an error, log it, return to any producer and continue
|
|
reportExceptionToAnyProducer();
|
|
tryLogCurrentException("AIOContextPool::waitForCompletion()");
|
|
}
|
|
}
|
|
|
|
int getCompletionEvents(io_event events[], const int max_events)
|
|
{
|
|
timespec timeout{timeout_sec};
|
|
|
|
auto num_events = 0;
|
|
|
|
/// request 1 to `max_events` events
|
|
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
|
|
if (errno != EINTR)
|
|
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion",
|
|
ErrorCodes::AIO_COMPLETION_ERROR, errno);
|
|
|
|
return num_events;
|
|
}
|
|
|
|
void fulfillPromises(const io_event events[], const int num_events)
|
|
{
|
|
if (num_events == 0)
|
|
return;
|
|
|
|
const std::lock_guard<std::mutex> lock{mutex};
|
|
|
|
/// look at returned events and find corresponding promise, set result and erase promise from map
|
|
for (const auto & event : boost::make_iterator_range(events, events + num_events))
|
|
{
|
|
/// get id from event
|
|
const auto id = event.data;
|
|
|
|
/// set value via promise and release it
|
|
const auto it = promises.find(id);
|
|
if (it == std::end(promises))
|
|
{
|
|
LOG_CRITICAL(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id);
|
|
continue;
|
|
}
|
|
|
|
it->second.set_value(event.res);
|
|
promises.erase(it);
|
|
}
|
|
}
|
|
|
|
void notifyProducers(const int num_producers) const
|
|
{
|
|
if (num_producers == 0)
|
|
return;
|
|
|
|
if (num_producers > 1)
|
|
have_resources.notify_all();
|
|
else
|
|
have_resources.notify_one();
|
|
}
|
|
|
|
void reportExceptionToAnyProducer()
|
|
{
|
|
const std::lock_guard<std::mutex> lock{mutex};
|
|
|
|
const auto any_promise_it = std::begin(promises);
|
|
any_promise_it->second.set_exception(std::current_exception());
|
|
}
|
|
|
|
public:
|
|
/// Request AIO read operation for iocb, returns a future with number of bytes read
|
|
std::future<bytes_read_t> post(struct iocb & iocb)
|
|
{
|
|
std::unique_lock<std::mutex> lock{mutex};
|
|
|
|
/// get current id and increment it by one
|
|
const auto request_id = id++;
|
|
|
|
/// create a promise and put request in "queue"
|
|
promises.emplace(request_id, std::promise<bytes_read_t>{});
|
|
/// store id in AIO request for further identification
|
|
iocb.aio_data = request_id;
|
|
|
|
auto num_requests = 0;
|
|
struct iocb * requests[] { &iocb };
|
|
|
|
/// submit a request
|
|
while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0)
|
|
{
|
|
if (errno == EAGAIN)
|
|
/// wait until at least one event has been completed (or a spurious wakeup) and try again
|
|
have_resources.wait(lock);
|
|
else if (errno != EINTR)
|
|
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO",
|
|
ErrorCodes::AIO_SUBMIT_ERROR, errno);
|
|
}
|
|
|
|
return promises[request_id].get_future();
|
|
}
|
|
};
|
|
|
|
|
|
}
|