add initial io_uring support

This commit is contained in:
Saulius Valatka 2022-06-27 13:40:52 +03:00
parent fb45ed038f
commit ac2c921bdf
11 changed files with 344 additions and 4 deletions

View File

@ -106,6 +106,7 @@
M(KeeperAliveConnections, "Number of alive connections") \ M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequets, "Number of outstanding requests") \ M(KeeperOutstandingRequets, "Number of outstanding requests") \
M(ThreadsInOvercommitTracker, "Number of waiting threads inside of OvercommitTracker") \ M(ThreadsInOvercommitTracker, "Number of waiting threads inside of OvercommitTracker") \
M(IOUringEnqueuedEvents, "Number of io_uring SQEs in flight.") \
namespace CurrentMetrics namespace CurrentMetrics
{ {

View File

@ -646,6 +646,9 @@
M(675, CANNOT_PARSE_IPV4) \ M(675, CANNOT_PARSE_IPV4) \
M(676, CANNOT_PARSE_IPV6) \ M(676, CANNOT_PARSE_IPV6) \
M(677, THREAD_WAS_CANCELED) \ M(677, THREAD_WAS_CANCELED) \
M(678, IO_URING_INIT_FAILED) \
M(679, IO_URING_QUEUE_FULL) \
M(680, IO_URING_WAIT_ERROR) \
\ \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \ M(1000, POCO_EXCEPTION) \

View File

@ -474,6 +474,10 @@ The server successfully detected this situation and will download merged part fr
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \ M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
\ \
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\ M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
M(IOUringSQEQueueFullRetries, "Total number of retries to get an SQE due to the io_uring queue being full") \
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \
M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \
M(IOUringShortReads, "Total number of short (partial) reads returned by io_uring") \
namespace ProfileEvents namespace ProfileEvents
{ {

View File

@ -594,7 +594,7 @@ class IColumn;
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \ M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\ \
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap.", 0) \ M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap.", 0) \
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \ M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool.", 0) \
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \ M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \ M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \ M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \

View File

@ -0,0 +1,242 @@
#if defined(OS_LINUX)
#include "IOUringReader.h"
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <base/errnoToString.h>
#include <future>
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event IOUringSQEQueueFullRetries;
extern const Event IOUringSQEsSubmitted;
extern const Event IOUringSQEsResubmits;
extern const Event IOUringShortReads;
}
namespace CurrentMetrics
{
extern const Metric IOUringEnqueuedEvents;
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int IO_URING_INIT_FAILED;
extern const int IO_URING_QUEUE_FULL;
extern const int IO_URING_WAIT_ERROR;
}
IOUringReader::IOUringReader(size_t queue_size_)
{
struct io_uring_probe * probe = io_uring_get_probe();
if (!probe)
{
is_supported = false;
return;
}
is_supported = io_uring_opcode_supported(probe, IORING_OP_READ);
io_uring_free_probe(probe);
if (!is_supported)
return;
int ret = io_uring_queue_init(queue_size_, &ring, 0);
if (ret < 0)
throwFromErrno(fmt::format("Failed initializing io_uring of size {}", queue_size_), ErrorCodes::IO_URING_INIT_FAILED);
ring_completion_monitor = ThreadFromGlobalPool([this] { monitorRing(); });
}
std::future<IAsynchronousReader::Result> IOUringReader::submit(Request request)
{
assert(request.size);
// take lock here because we're modifying an std::map and submitting to the ring,
// the monitor thread can also modify the map and re-submit events
std::unique_lock lock{mutex};
// use the requested read destination address as the request id, the assumption
// here is that we won't get asked to fill in the same address more than once in parallel
auto request_id = reinterpret_cast<UInt64>(request.buf);
const auto [kv, is_newly_inserted] = enqueued_requests.emplace(request_id, EnqueuedRequest{
.promise = std::promise<IAsynchronousReader::Result>{},
.request = request,
.bytes_read = 0
});
if (!is_newly_inserted)
return makeFailedResult(ErrorCodes::LOGICAL_ERROR, "Tried enqueuing read request for {} that is already submitted", request_id);
EnqueuedRequest & enqueued = kv->second;
trySubmitRequest(request_id, enqueued, false);
return enqueued.promise.get_future();
}
bool IOUringReader::trySubmitRequest(UInt64 request_id, EnqueuedRequest & enqueued, bool resubmitting)
{
auto request = enqueued.request;
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
struct io_uring_sqe * sqe = nullptr;
for (int i = 0; i < 1000; ++i) // try a few times in case the queue is full.
{
sqe = io_uring_get_sqe(&ring);
if (sqe) break;
ProfileEvents::increment(ProfileEvents::IOUringSQEQueueFullRetries);
}
int submitted = 0;
if (sqe)
{
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(request_id));
io_uring_prep_read(sqe, fd, request.buf, static_cast<unsigned>(request.size - enqueued.bytes_read), request.offset + enqueued.bytes_read);
submitted = io_uring_submit(&ring);
}
if (submitted == 0)
{
failPromise(enqueued.promise, ErrorCodes::IO_URING_QUEUE_FULL, "Submission queue is full, failed to get an SQE");
if (resubmitting) // an existing request failed, decrement counters
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
CurrentMetrics::sub(CurrentMetrics::IOUringEnqueuedEvents);
CurrentMetrics::sub(CurrentMetrics::Read);
}
return false;
}
if (!resubmitting) // new request, increment counters
{
ProfileEvents::increment(ProfileEvents::IOUringSQEsSubmitted);
CurrentMetrics::add(CurrentMetrics::IOUringEnqueuedEvents);
CurrentMetrics::add(CurrentMetrics::Read);
}
return true;
}
void IOUringReader::monitorRing()
{
setThreadName("IOUringMonitor");
while (!cancelled.load(std::memory_order_relaxed))
{
// we can't use wait_cqe_* variants with timeouts as they can
// submit timeout events in older kernels that do not support IORING_FEAT_EXT_ARG
// and it is not safe to mix submission and consumption event threads.
struct io_uring_cqe * cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0)
throwFromErrno(fmt::format("Failed waiting for io_uring CQEs: {}", ret), ErrorCodes::IO_URING_WAIT_ERROR);
// user_data zero means a noop event sent from the destructor meant to interrupt the thread
if (cancelled.load(std::memory_order_relaxed) || cqe->user_data == 0)
break;
// it is safe to re-submit events once we take the lock here
std::unique_lock lock{mutex};
auto request_id = cqe->user_data;
const auto it = enqueued_requests.find(request_id);
if (it == enqueued_requests.end())
throwFromErrno(
fmt::format("Got a completion event for a request {} that was not submitted", request_id),
ErrorCodes::LOGICAL_ERROR);
auto & enqueued = it->second;
if (cqe->res < 0)
{
if (cqe->res == -EAGAIN)
{
ProfileEvents::increment(ProfileEvents::IOUringSQEsResubmits);
trySubmitRequest(request_id, enqueued, true);
io_uring_cqe_seen(&ring, cqe);
continue;
}
int fd = assert_cast<const LocalFileDescriptor &>(*enqueued.request.descriptor).fd;
failPromise(enqueued.promise, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Cannot read from file {}", fd);
enqueued_requests.erase(it);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
CurrentMetrics::sub(CurrentMetrics::IOUringEnqueuedEvents);
CurrentMetrics::sub(CurrentMetrics::Read);
io_uring_cqe_seen(&ring, cqe);
continue;
}
size_t bytes_read = cqe->res;
size_t total_bytes_read = enqueued.bytes_read + bytes_read;
if (bytes_read > 0)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
if (enqueued.bytes_read > 0) ProfileEvents::increment(ProfileEvents::IOUringShortReads);
}
if (bytes_read > 0 && total_bytes_read < enqueued.request.size)
{
// potential short read, re-submit
enqueued.bytes_read += bytes_read;
trySubmitRequest(request_id, enqueued, true);
}
else
{
enqueued.promise.set_value(Result{ .size = total_bytes_read, .offset = enqueued.request.ignore });
enqueued_requests.erase(it);
CurrentMetrics::sub(CurrentMetrics::IOUringEnqueuedEvents);
CurrentMetrics::sub(CurrentMetrics::Read);
}
io_uring_cqe_seen(&ring, cqe);
}
}
IOUringReader::~IOUringReader()
{
cancelled.store(true, std::memory_order_relaxed);
// interrupt the monitor thread by sending a noop event
{
std::unique_lock lock{mutex};
struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
io_uring_prep_nop(sqe);
io_uring_sqe_set_data(sqe, nullptr);
io_uring_submit(&ring);
}
ring_completion_monitor.join();
io_uring_queue_exit(&ring);
}
}
#endif

View File

@ -0,0 +1,68 @@
#pragma once
#if defined(OS_LINUX)
#include <Common/ThreadPool.h>
#include <IO/AsynchronousReader.h>
#include <unordered_map>
#include <liburing.h>
namespace DB
{
/** Perform reads using the io_uring Linux subsystem.
*
* The class sets up a single io_uring that clients submit read requests to, they are
* placed in a map using the read buffer address as the key and the original request
* with a promise as the value. A monitor thread continuously polls the completion queue,
* looks up the completed request and completes the matching promise.
*/
class IOUringReader final : public IAsynchronousReader
{
private:
bool is_supported;
std::mutex mutex;
struct io_uring ring;
std::atomic<bool> cancelled{false};
ThreadFromGlobalPool ring_completion_monitor;
struct EnqueuedRequest
{
std::promise<IAsynchronousReader::Result> promise;
Request request;
size_t bytes_read; // keep track of bytes already read in case short reads happen
};
std::unordered_map<UInt64, EnqueuedRequest> enqueued_requests;
bool trySubmitRequest(UInt64 request_id, EnqueuedRequest & enqueued, bool resubmitting);
void monitorRing();
template <typename T, typename... Args> inline void failPromise(
std::promise<T> & promise, int code, fmt::format_string<Args...> fmt, Args &&... args)
{
promise.set_exception(std::make_exception_ptr(Exception(fmt::format(fmt, std::forward<Args>(args)...), code)));
}
template <typename... Args> inline std::future<Result> makeFailedResult(
int code, fmt::format_string<Args...> fmt, Args &&... args)
{
auto promise = std::promise<Result>{};
failPromise(promise, code, fmt, std::forward<Args>(args)...);
return promise.get_future();
}
public:
IOUringReader(size_t queue_size_);
inline bool isSupported() { return is_supported; }
std::future<Result> submit(Request request) override;
void wait() override {}
virtual ~IOUringReader() override;
};
}
#endif

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/MMapReadBufferFromFileWithCache.h> #include <IO/MMapReadBufferFromFileWithCache.h>
#include <IO/AsynchronousReadBufferFromFile.h> #include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h> #include <Disks/IO/ThreadPoolReader.h>
#include <IO/SynchronousReader.h> #include <IO/SynchronousReader.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
@ -23,6 +24,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_METHOD;
} }
@ -80,6 +82,19 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
{ {
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size);
} }
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
{
#if defined(OS_LINUX)
static std::shared_ptr<IOUringReader> reader = std::make_shared<IOUringReader>(4096);
if (!reader->isSupported())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
*reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
#else
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Read method io_uring is only supported in Linux");
#endif
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async) else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{ {
auto context = Context::getGlobalContextInstance(); auto context = Context::getGlobalContextInstance();

View File

@ -31,6 +31,13 @@ enum class LocalFSReadMethod
*/ */
mmap, mmap,
/**
* Use the io_uring Linux subsystem for asynchronous reads.
* Can use direct IO after specified size.
* Can do prefetch with double buffering.
*/
io_uring,
/** /**
* Checks if data is in page cache with 'preadv2' on modern Linux kernels. * Checks if data is in page cache with 'preadv2' on modern Linux kernels.
* If data is in page cache, read from the same thread. * If data is in page cache, read from the same thread.

View File

@ -491,7 +491,7 @@ class SettingsRandomizer:
if random.random() < 0.2 if random.random() < 0.2
else random.randint(1, 1024 * 1024 * 1024), else random.randint(1, 1024 * 1024 * 1024),
"local_filesystem_read_method": lambda: random.choice( "local_filesystem_read_method": lambda: random.choice(
["read", "pread", "mmap", "pread_threadpool"] ["read", "pread", "mmap", "pread_threadpool", "io_uring"]
), ),
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]), "remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
"local_filesystem_read_prefetch": lambda: random.randint(0, 1), "local_filesystem_read_prefetch": lambda: random.randint(0, 1),

View File

@ -1,5 +1,5 @@
{% for index_granularity_bytes in [0, 10 * 1024 * 1024] -%} {% for index_granularity_bytes in [0, 10 * 1024 * 1024] -%}
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] -%} {% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] -%}
{% for direct_io in [0, 1] -%} {% for direct_io in [0, 1] -%}
{% for prefetch in [0, 1] -%} {% for prefetch in [0, 1] -%}
{% for priority in [0, 1] -%} {% for priority in [0, 1] -%}

View File

@ -19,7 +19,7 @@ settings
as select number, repeat(toString(number), 5) from numbers(1e6); as select number, repeat(toString(number), 5) from numbers(1e6);
{# check each local_filesystem_read_method #} {# check each local_filesystem_read_method #}
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] %} {% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] %}
{# check w/ O_DIRECT and w/o (min_bytes_to_use_direct_io) #} {# check w/ O_DIRECT and w/o (min_bytes_to_use_direct_io) #}
{% for direct_io in [0, 1] %} {% for direct_io in [0, 1] %}
{# check local_filesystem_read_prefetch (just a smoke test) #} {# check local_filesystem_read_prefetch (just a smoke test) #}