Merge pull request #36103 from sauliusvl/uring

Add support for io_uring read method
This commit is contained in:
Alexey Milovidov 2022-06-24 00:34:29 +03:00 committed by GitHub
commit 812ab9bd6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 509 additions and 4 deletions

3
.gitmodules vendored
View File

@ -268,3 +268,6 @@
[submodule "contrib/hashidsxx"]
path = contrib/hashidsxx
url = https://github.com/schoentoon/hashidsxx.git
[submodule "contrib/liburing"]
path = contrib/liburing
url = https://github.com/axboe/liburing.git

View File

@ -120,6 +120,7 @@ add_contrib (simdjson-cmake simdjson)
add_contrib (rapidjson-cmake rapidjson)
add_contrib (fastops-cmake fastops)
add_contrib (libuv-cmake libuv)
add_contrib (liburing-cmake liburing)
add_contrib (amqpcpp-cmake AMQP-CPP) # requires: libuv
add_contrib (cassandra-cmake cassandra) # requires: libuv

1
contrib/liburing vendored Submodule

@ -0,0 +1 @@
Subproject commit b1735ffa9c5dc5ec18c8fbc121b7b0bc417cdc89

View File

@ -0,0 +1,41 @@
set (ENABLE_LIBURING_DEFAULT ${ENABLE_LIBRARIES})
if (NOT OS_LINUX)
set (ENABLE_LIBURING_DEFAULT OFF)
endif ()
option (ENABLE_LIBURING "Enable liburing" ${ENABLE_LIBURING_DEFAULT})
if (NOT ENABLE_LIBURING)
message(STATUS "Not using liburing")
return()
endif()
set (LIBURING_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liburing/src/include")
set (LIBURING_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liburing/src")
set (SRCS
"${LIBURING_SOURCE_DIR}/queue.c"
"${LIBURING_SOURCE_DIR}/register.c"
"${LIBURING_SOURCE_DIR}/setup.c"
"${LIBURING_SOURCE_DIR}/syscall.c"
)
set (LIBURING_COMPAT_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/liburing/src/include-compat")
set (LIBURING_COMPAT_HEADER "${LIBURING_COMPAT_INCLUDE_DIR}/liburing/compat.h")
set (LIBURING_CONFIG_HAS_KERNEL_RWF_T, "yes")
set (LIBURING_CONFIG_HAS_KERNEL_TIMESPEC, "no")
set (LIBURING_CONFIG_HAS_OPEN_HOW, "no")
set (LIBURING_CONFIG_HAS_STATX, "no")
set (LIBURING_CONFIG_HAS_GLIBC_STATX, "no")
add_compile_definitions(_GNU_SOURCE)
add_compile_definitions(LIBURING_INTERNAL)
configure_file (compat.h.in ${LIBURING_COMPAT_HEADER})
add_library (_liburing ${SRCS})
add_library (ch_contrib::liburing ALIAS _liburing)
target_include_directories (_liburing SYSTEM PUBLIC ${LIBURING_COMPAT_INCLUDE_DIR} "${LIBURING_SOURCE_DIR}/include")

View File

@ -0,0 +1,40 @@
/* SPDX-License-Identifier: MIT */
#ifndef LIBURING_COMPAT_H
#define LIBURING_COMPAT_H
# cmakedefine LIBURING_CONFIG_HAS_KERNEL_RWF_T
# cmakedefine LIBURING_CONFIG_HAS_KERNEL_TIMESPEC
# cmakedefine LIBURING_CONFIG_HAS_OPEN_HOW
# cmakedefine LIBURING_CONFIG_HAS_GLIBC_STATX
# cmakedefine LIBURING_CONFIG_HAS_STATX
#if !defined(LIBURING_CONFIG_HAS_KERNEL_RWF_T)
typedef int __kernel_rwf_t;
#endif
#if !defined(LIBURING_CONFIG_HAS_KERNEL_TIMESPEC)
#include <stdint.h>
struct __kernel_timespec {
int64_t tv_sec;
long long tv_nsec;
};
#else
#include <linux/time_types.h>
#endif
#if !defined(LIBURING_CONFIG_HAS_OPEN_HOW)
#include <inttypes.h>
struct open_how {
uint64_t flags;
uint64_t mode;
uint64_t resolve;
};
#endif
#if !defined(LIBURING_CONFIG_HAS_GLIBC_STATX) && defined(LIBURING_CONFIG_HAS_STATX)
#include <sys/stat.h>
#endif
#endif

View File

@ -135,6 +135,7 @@ function clone_submodules
contrib/replxx
contrib/wyhash
contrib/hashidsxx
contrib/liburing
)
git submodule sync
@ -157,6 +158,7 @@ function run_cmake
"-DENABLE_NURAFT=1"
"-DENABLE_JEMALLOC=1"
"-DENABLE_REPLXX=1"
"-DENABLE_LIBURING=1"
)
# TODO remove this? we don't use ccache anyway. An option would be to download it

View File

@ -498,6 +498,11 @@ if (TARGET ch_contrib::msgpack)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::msgpack)
endif()
if (TARGET ch_contrib::liburing)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::liburing)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${LIBURING_COMPAT_INCLUDE_DIR} ${LIBURING_INCLUDE_DIR})
endif()
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::fast_float)
if (USE_ORC)

View File

@ -93,6 +93,8 @@
M(CacheFileSegments, "Number of existing cache file segments") \
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
M(S3Requests, "S3 requests") \
M(IOUringPendingEvents, "Number of io_uring SQEs waiting to be submitted") \
M(IOUringInFlightEvents, "Number of io_uring SQEs in flight") \
namespace CurrentMetrics
{

View File

@ -633,6 +633,9 @@
M(662, FS_METADATA_ERROR) \
M(663, CANNOT_COLLECT_OBJECTS_FOR_BACKUP) \
M(664, ACCESS_STORAGE_DOESNT_ALLOW_BACKUP) \
M(665, IO_URING_INIT_FAILED) \
M(666, IO_URING_SUBMIT_ERROR) \
M(667, IO_URING_WAIT_ERROR) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -344,7 +344,10 @@
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \
\
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \
M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \
namespace ProfileEvents
{

View File

@ -553,7 +553,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool.", 0) \
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \

View File

@ -0,0 +1,303 @@
#if defined(OS_LINUX)
#include "IOUringReader.h"
#include <base/errnoToString.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/MemorySanitizer.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <future>
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event IOUringSQEsSubmitted;
extern const Event IOUringSQEsResubmits;
}
namespace CurrentMetrics
{
extern const Metric IOUringPendingEvents;
extern const Metric IOUringInFlightEvents;
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int IO_URING_INIT_FAILED;
extern const int IO_URING_SUBMIT_ERROR;
extern const int IO_URING_WAIT_ERROR;
}
IOUringReader::IOUringReader(uint32_t entries_)
: log(&Poco::Logger::get("IOUringReader"))
{
struct io_uring_probe * probe = io_uring_get_probe();
if (!probe)
{
is_supported = false;
return;
}
is_supported = io_uring_opcode_supported(probe, IORING_OP_READ);
io_uring_free_probe(probe);
if (!is_supported)
return;
struct io_uring_params params =
{
.cq_entries = 0, // filled by the kernel, initializing to silence warning
.flags = 0,
};
int ret = io_uring_queue_init_params(entries_, &ring, &params);
if (ret < 0)
throwFromErrno("Failed initializing io_uring", ErrorCodes::IO_URING_INIT_FAILED, -ret);
cq_entries = params.cq_entries;
ring_completion_monitor = ThreadFromGlobalPool([this] { monitorRing(); });
}
std::future<IAsynchronousReader::Result> IOUringReader::submit(Request request)
{
assert(request.size);
// take lock here because we're modifying containers and submitting to the ring,
// the monitor thread can also do the same
std::unique_lock lock{mutex};
// use the requested read destination address as the request id, the assumption
// here is that we won't get asked to fill in the same address more than once in parallel
auto request_id = reinterpret_cast<UInt64>(request.buf);
std::promise<IAsynchronousReader::Result> promise;
auto enqueued_request = EnqueuedRequest
{
.promise = std::move(promise),
.request = request,
.resubmitting = false,
.bytes_read = 0
};
// if there's room in the completion queue submit the request to the ring immediately,
// otherwise push it to the back of the pending queue
if (in_flight_requests.size() < cq_entries)
{
if (submitToRing(enqueued_request))
{
const auto [kv, success] = in_flight_requests.emplace(request_id, std::move(enqueued_request));
if (!success)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
return makeFailedResult(ErrorCodes::LOGICAL_ERROR, "Tried enqueuing read request for {} that is already submitted", request_id);
}
return (kv->second).promise.get_future();
}
else
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
return makeFailedResult(ErrorCodes::IO_URING_SUBMIT_ERROR, "Failed submitting SQE");
}
}
else
{
CurrentMetrics::add(CurrentMetrics::IOUringPendingEvents);
pending_requests.push_back(std::move(enqueued_request));
return pending_requests.back().promise.get_future();
}
}
bool IOUringReader::submitToRing(EnqueuedRequest & enqueued)
{
struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
if (!sqe)
return false;
auto request = enqueued.request;
auto request_id = reinterpret_cast<UInt64>(request.buf);
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
io_uring_sqe_set_data64(sqe, request_id);
io_uring_prep_read(sqe, fd, request.buf, request.size - enqueued.bytes_read, request.offset + enqueued.bytes_read);
int submitted = io_uring_submit(&ring);
if (submitted <= 0)
return false;
if (!enqueued.resubmitting)
{
ProfileEvents::increment(ProfileEvents::IOUringSQEsSubmitted);
CurrentMetrics::add(CurrentMetrics::IOUringInFlightEvents);
CurrentMetrics::add(CurrentMetrics::Read);
}
return true;
}
void IOUringReader::failRequest(const EnqueuedIterator & requestIt, int code, const std::string & message)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
(requestIt->second).promise.set_exception(std::make_exception_ptr(Exception(code, message)));
finalizeRequest(requestIt);
}
void IOUringReader::finalizeRequest(const EnqueuedIterator & requestIt)
{
in_flight_requests.erase(requestIt);
CurrentMetrics::sub(CurrentMetrics::IOUringInFlightEvents);
CurrentMetrics::sub(CurrentMetrics::Read);
// since we just finalized a request there's now room in the completion queue,
// see if there are any pending requests and submit one from the front of the queue
if (!pending_requests.empty())
{
auto pending_request = std::move(pending_requests.front());
pending_requests.pop_front();
if (submitToRing(pending_request))
{
auto request_id = reinterpret_cast<UInt64>(pending_request.request.buf);
if (!in_flight_requests.contains(request_id))
in_flight_requests.emplace(request_id, std::move(pending_request));
else
failPromise(pending_request.promise,
ErrorCodes::LOGICAL_ERROR,
"Tried enqueuing pending read request for {} that is already submitted", request_id);
}
else
{
failPromise(pending_request.promise, ErrorCodes::IO_URING_SUBMIT_ERROR, "Failed submitting SQE for pending request");
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
}
CurrentMetrics::sub(CurrentMetrics::IOUringPendingEvents);
}
}
void IOUringReader::monitorRing()
{
setThreadName("IOUringMonitor");
while (!cancelled.load(std::memory_order_relaxed))
{
// we can't use wait_cqe_* variants with timeouts as they can
// submit timeout events in older kernels that do not support IORING_FEAT_EXT_ARG
// and it is not safe to mix submission and consumption event threads.
struct io_uring_cqe * cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret == -EAGAIN || ret == -EINTR)
continue;
if (ret < 0)
{
LOG_ERROR(log, "Failed waiting for io_uring CQEs: {}", errnoToString(ErrorCodes::IO_URING_WAIT_ERROR, -ret));
continue;
}
// user_data zero means a noop event sent from the destructor meant to interrupt the thread
if (cancelled.load(std::memory_order_relaxed) || cqe->user_data == 0)
{
LOG_DEBUG(log, "Stopping IOUringMonitor thread");
break;
}
// it is safe to re-submit events once we take the lock here
std::unique_lock lock{mutex};
auto request_id = cqe->user_data;
const auto it = in_flight_requests.find(request_id);
if (it == in_flight_requests.end())
{
LOG_ERROR(log, "Got a completion event for a request {} that was not submitted", request_id);
continue;
}
auto & enqueued = it->second;
if (cqe->res == -EAGAIN)
{
enqueued.resubmitting = true;
ProfileEvents::increment(ProfileEvents::IOUringSQEsResubmits);
if (!submitToRing(enqueued))
failRequest(it, ErrorCodes::IO_URING_SUBMIT_ERROR, "Failed re-submitting SQE");
io_uring_cqe_seen(&ring, cqe);
continue;
}
if (cqe->res < 0)
{
int fd = assert_cast<const LocalFileDescriptor &>(*enqueued.request.descriptor).fd;
failRequest(it, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, fmt::format("Cannot read from file {}", fd));
io_uring_cqe_seen(&ring, cqe);
continue;
}
size_t bytes_read = cqe->res;
size_t total_bytes_read = enqueued.bytes_read + bytes_read;
if (bytes_read > 0)
{
__msan_unpoison(enqueued.request.buf + enqueued.bytes_read, bytes_read);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
}
if (bytes_read > 0 && total_bytes_read < enqueued.request.size)
{
// potential short read, re-submit
enqueued.resubmitting = true;
enqueued.bytes_read += bytes_read;
if (!submitToRing(enqueued))
failRequest(it, ErrorCodes::IO_URING_SUBMIT_ERROR, "Failed re-submitting SQE for short read");
}
else
{
enqueued.promise.set_value(Result{ .size = total_bytes_read, .offset = enqueued.request.ignore });
finalizeRequest(it);
}
io_uring_cqe_seen(&ring, cqe);
}
}
IOUringReader::~IOUringReader()
{
cancelled.store(true, std::memory_order_relaxed);
// interrupt the monitor thread by sending a noop event
{
std::unique_lock lock{mutex};
struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
io_uring_prep_nop(sqe);
io_uring_sqe_set_data(sqe, nullptr);
io_uring_submit(&ring);
}
ring_completion_monitor.join();
io_uring_queue_exit(&ring);
}
}
#endif

View File

@ -0,0 +1,78 @@
#pragma once
#if defined(OS_LINUX)
#include <Common/ThreadPool.h>
#include <IO/AsynchronousReader.h>
#include <deque>
#include <unordered_map>
#include <liburing.h>
namespace DB
{
/** Perform reads using the io_uring Linux subsystem.
*
* The class sets up a single io_uring that clients submit read requests to, they are
* placed in a map using the read buffer address as the key and the original request
* with a promise as the value. A monitor thread continuously polls the completion queue,
* looks up the completed request and completes the matching promise.
*/
class IOUringReader final : public IAsynchronousReader
{
private:
bool is_supported;
std::mutex mutex;
struct io_uring ring;
uint32_t cq_entries;
std::atomic<bool> cancelled{false};
ThreadFromGlobalPool ring_completion_monitor;
struct EnqueuedRequest
{
std::promise<IAsynchronousReader::Result> promise;
Request request;
bool resubmitting; // resubmits can happen due to short reads or when io_uring returns -EAGAIN
size_t bytes_read; // keep track of bytes already read in case short reads happen
};
std::deque<EnqueuedRequest> pending_requests;
std::unordered_map<UInt64, EnqueuedRequest> in_flight_requests;
bool submitToRing(EnqueuedRequest & enqueued);
using EnqueuedIterator = std::unordered_map<UInt64, EnqueuedRequest>::iterator;
void failRequest(const EnqueuedIterator & requestIt, int code, const std::string & message);
void finalizeRequest(const EnqueuedIterator & requestIt);
void monitorRing();
template <typename T, typename... Args> inline void failPromise(
std::promise<T> & promise, int code, fmt::format_string<Args...> fmt, Args &&... args)
{
promise.set_exception(std::make_exception_ptr(Exception(code, fmt, std::forward<Args>(args)...)));
}
template <typename... Args> inline std::future<Result> makeFailedResult(
int code, fmt::format_string<Args...> fmt, Args &&... args)
{
auto promise = std::promise<Result>{};
failPromise(promise, code, fmt, std::forward<Args>(args)...);
return promise.get_future();
}
const Poco::Logger * log;
public:
IOUringReader(uint32_t entries_);
inline bool isSupported() { return is_supported; }
std::future<Result> submit(Request request) override;
virtual ~IOUringReader() override;
};
}
#endif

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/MMapReadBufferFromFileWithCache.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <Common/ProfileEvents.h>
@ -23,6 +24,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_METHOD;
}
@ -75,6 +77,19 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
{
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
}
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
{
#if defined(OS_LINUX)
static std::shared_ptr<IOUringReader> reader = std::make_shared<IOUringReader>(512);
if (!reader->isSupported())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
#else
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Read method io_uring is only supported in Linux");
#endif
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();

View File

@ -30,6 +30,13 @@ enum class LocalFSReadMethod
*/
mmap,
/**
* Use the io_uring Linux subsystem for asynchronous reads.
* Can use direct IO after specified size.
* Can do prefetch with double buffering.
*/
io_uring,
/**
* Checks if data is in page cache with 'preadv2' on modern Linux kernels.
* If data is in page cache, read from the same thread.

View File

@ -425,6 +425,7 @@ class SettingsRandomizer:
"read_in_order_two_level_merge_threshold": lambda: random.randint(0, 100),
"optimize_aggregation_in_order": lambda: random.randint(0, 1),
"aggregation_in_order_max_block_bytes": lambda: random.randint(0, 50000000),
"local_filesystem_read_method": lambda: random.choice(['pread', 'pread_threadpool', 'io_uring']),
}
@staticmethod

View File

@ -1,5 +1,5 @@
{% for index_granularity_bytes in [0, 10 * 1024 * 1024] -%}
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] -%}
{% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] -%}
{% for direct_io in [0, 1] -%}
{% for prefetch in [0, 1] -%}
{% for priority in [0, 1] -%}

View File

@ -19,7 +19,7 @@ settings
as select number, repeat(toString(number), 5) from numbers(1e6);
{# check each local_filesystem_read_method #}
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] %}
{% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] %}
{# check w/ O_DIRECT and w/o (min_bytes_to_use_direct_io) #}
{% for direct_io in [0, 1] %}
{# check local_filesystem_read_prefetch (just a smoke test) #}