diff --git a/.gitmodules b/.gitmodules index b4673f113b7..e395860d957 100644 --- a/.gitmodules +++ b/.gitmodules @@ -330,3 +330,6 @@ [submodule "contrib/crc32-vpmsum"] path = contrib/crc32-vpmsum url = https://github.com/antonblanchard/crc32-vpmsum.git +[submodule "contrib/liburing"] + path = contrib/liburing + url = https://github.com/axboe/liburing diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index f5d1315cc02..5fc8d960f56 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -140,6 +140,7 @@ add_contrib (simdjson-cmake simdjson) add_contrib (rapidjson-cmake rapidjson) add_contrib (fastops-cmake fastops) add_contrib (libuv-cmake libuv) +add_contrib (liburing-cmake liburing) add_contrib (amqpcpp-cmake AMQP-CPP) # requires: libuv add_contrib (cassandra-cmake cassandra) # requires: libuv diff --git a/contrib/liburing b/contrib/liburing new file mode 160000 index 00000000000..f5a48392c4e --- /dev/null +++ b/contrib/liburing @@ -0,0 +1 @@ +Subproject commit f5a48392c4ea33f222cbebeb2e2fc31620162949 diff --git a/contrib/liburing-cmake/CMakeLists.txt b/contrib/liburing-cmake/CMakeLists.txt new file mode 100644 index 00000000000..02bc116c660 --- /dev/null +++ b/contrib/liburing-cmake/CMakeLists.txt @@ -0,0 +1,53 @@ +set (ENABLE_LIBURING_DEFAULT ${ENABLE_LIBRARIES}) + +if (NOT OS_LINUX) + set (ENABLE_LIBURING_DEFAULT OFF) +endif () + +option (ENABLE_LIBURING "Enable liburing" ${ENABLE_LIBURING_DEFAULT}) + +if (NOT ENABLE_LIBURING) + message (STATUS "Not using liburing") + return () +endif () + +set (LIBURING_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liburing/src/include") +set (LIBURING_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liburing/src") + +set (SRCS + "${LIBURING_SOURCE_DIR}/queue.c" + "${LIBURING_SOURCE_DIR}/register.c" + "${LIBURING_SOURCE_DIR}/setup.c" + "${LIBURING_SOURCE_DIR}/syscall.c" + "${LIBURING_SOURCE_DIR}/version.c" +) + +add_compile_definitions (_GNU_SOURCE) +add_compile_definitions (LIBURING_INTERNAL) + +set (LIBURING_COMPAT_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/liburing/src/include-compat") +set (LIBURING_COMPAT_HEADER "${LIBURING_COMPAT_INCLUDE_DIR}/liburing/compat.h") + +set (LIBURING_CONFIG_HAS_KERNEL_RWF_T FALSE) +set (LIBURING_CONFIG_HAS_KERNEL_TIMESPEC FALSE) +set (LIBURING_CONFIG_HAS_OPEN_HOW FALSE) +set (LIBURING_CONFIG_HAS_STATX FALSE) +set (LIBURING_CONFIG_HAS_GLIBC_STATX FALSE) + +configure_file (compat.h.in ${LIBURING_COMPAT_HEADER}) + +set (LIBURING_GENERATED_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/liburing/src/include") +set (LIBURING_VERSION_HEADER "${LIBURING_GENERATED_INCLUDE_DIR}/liburing/io_uring_version.h") + +file (READ "${LIBURING_SOURCE_DIR}/../liburing.spec" LIBURING_SPEC) + +string (REGEX MATCH "Version: ([0-9]+)\.([0-9]+)" _ ${LIBURING_SPEC}) +set (LIBURING_VERSION_MAJOR ${CMAKE_MATCH_1}) +set (LIBURING_VERSION_MINOR ${CMAKE_MATCH_2}) + +configure_file (io_uring_version.h.in ${LIBURING_VERSION_HEADER}) + +add_library (_liburing ${SRCS}) +add_library (ch_contrib::liburing ALIAS _liburing) + +target_include_directories (_liburing SYSTEM PUBLIC ${LIBURING_COMPAT_INCLUDE_DIR} ${LIBURING_GENERATED_INCLUDE_DIR} "${LIBURING_SOURCE_DIR}/include") diff --git a/contrib/liburing-cmake/compat.h.in b/contrib/liburing-cmake/compat.h.in new file mode 100644 index 00000000000..468e529cd33 --- /dev/null +++ b/contrib/liburing-cmake/compat.h.in @@ -0,0 +1,50 @@ +/* SPDX-License-Identifier: MIT */ +#ifndef LIBURING_COMPAT_H +#define LIBURING_COMPAT_H + +# cmakedefine LIBURING_CONFIG_HAS_KERNEL_RWF_T +# cmakedefine LIBURING_CONFIG_HAS_KERNEL_TIMESPEC +# cmakedefine LIBURING_CONFIG_HAS_OPEN_HOW +# cmakedefine LIBURING_CONFIG_HAS_GLIBC_STATX +# cmakedefine LIBURING_CONFIG_HAS_STATX + +#if !defined(LIBURING_CONFIG_HAS_KERNEL_RWF_T) +typedef int __kernel_rwf_t; +#endif + +#if !defined(LIBURING_CONFIG_HAS_KERNEL_TIMESPEC) +#include + +struct __kernel_timespec { + int64_t tv_sec; + long long tv_nsec; +}; + +/* is not available, so it can't be included */ +#define UAPI_LINUX_IO_URING_H_SKIP_LINUX_TIME_TYPES_H 1 + +#else +#include + +/* is included above and not needed again */ +#define UAPI_LINUX_IO_URING_H_SKIP_LINUX_TIME_TYPES_H 1 + +#endif + +#if !defined(LIBURING_CONFIG_HAS_OPEN_HOW) +#include + +struct open_how { + uint64_t flags; + uint64_t mode; + uint64_t resolve; +}; +#else +#include +#endif + +#if !defined(LIBURING_CONFIG_HAS_GLIBC_STATX) && defined(LIBURING_CONFIG_HAS_STATX) +#include +#endif + +#endif diff --git a/contrib/liburing-cmake/io_uring_version.h.in b/contrib/liburing-cmake/io_uring_version.h.in new file mode 100644 index 00000000000..3fc6132b224 --- /dev/null +++ b/contrib/liburing-cmake/io_uring_version.h.in @@ -0,0 +1,8 @@ +/* SPDX-License-Identifier: MIT */ +#ifndef LIBURING_VERSION_H +#define LIBURING_VERSION_H + +#define IO_URING_VERSION_MAJOR ${LIBURING_VERSION_MAJOR} +#define IO_URING_VERSION_MINOR ${LIBURING_VERSION_MINOR} + +#endif diff --git a/docker/test/fasttest/run.sh b/docker/test/fasttest/run.sh index 3cd50b06d5a..cf33dffa646 100755 --- a/docker/test/fasttest/run.sh +++ b/docker/test/fasttest/run.sh @@ -139,6 +139,7 @@ function clone_submodules contrib/morton-nd contrib/xxHash contrib/simdjson + contrib/liburing ) git submodule sync @@ -161,6 +162,7 @@ function run_cmake "-DENABLE_NURAFT=1" "-DENABLE_SIMDJSON=1" "-DENABLE_JEMALLOC=1" + "-DENABLE_LIBURING=1" ) export CCACHE_DIR="$FASTTEST_WORKSPACE/ccache" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 70260ee31d9..14838560a88 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -513,6 +513,11 @@ if (TARGET ch_contrib::msgpack) target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::msgpack) endif() +if (TARGET ch_contrib::liburing) + target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::liburing) + target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${LIBURING_COMPAT_INCLUDE_DIR} ${LIBURING_INCLUDE_DIR}) +endif() + target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::fast_float) if (USE_ORC) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 1573ea98c34..36ed5d3d51b 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -106,6 +106,8 @@ M(KeeperAliveConnections, "Number of alive connections") \ M(KeeperOutstandingRequets, "Number of outstanding requests") \ M(ThreadsInOvercommitTracker, "Number of waiting threads inside of OvercommitTracker") \ + M(IOUringPendingEvents, "Number of io_uring SQEs waiting to be submitted") \ + M(IOUringInFlightEvents, "Number of io_uring SQEs in flight") \ namespace CurrentMetrics { diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 0ad4cbb9e6f..028663a2176 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -646,6 +646,8 @@ M(675, CANNOT_PARSE_IPV4) \ M(676, CANNOT_PARSE_IPV6) \ M(677, THREAD_WAS_CANCELED) \ + M(678, IO_URING_INIT_FAILED) \ + M(679, IO_URING_SUBMIT_ERROR) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index d03e8c8d002..71e453560af 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -472,6 +472,10 @@ The server successfully detected this situation and will download merged part fr M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \ \ M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\ + M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \ + M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \ + M(IOUringCQEsCompleted, "Total number of successfully completed io_uring CQEs") \ + M(IOUringCQEsFailed, "Total number of completed io_uring CQEs with failures") \ namespace ProfileEvents { diff --git a/src/Core/Settings.h b/src/Core/Settings.h index dbe905ee962..8ddd42dbecf 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -603,7 +603,7 @@ class IColumn; M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \ \ M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap.", 0) \ - M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \ + M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool.", 0) \ M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \ M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \ M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \ diff --git a/src/Disks/IO/IOUringReader.cpp b/src/Disks/IO/IOUringReader.cpp new file mode 100644 index 00000000000..3c6e7d415c7 --- /dev/null +++ b/src/Disks/IO/IOUringReader.cpp @@ -0,0 +1,341 @@ +#if defined(OS_LINUX) + +#include "IOUringReader.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ProfileEvents +{ + extern const Event ReadBufferFromFileDescriptorRead; + extern const Event ReadBufferFromFileDescriptorReadFailed; + extern const Event ReadBufferFromFileDescriptorReadBytes; + + extern const Event IOUringSQEsSubmitted; + extern const Event IOUringSQEsResubmits; + extern const Event IOUringCQEsCompleted; + extern const Event IOUringCQEsFailed; +} + +namespace CurrentMetrics +{ + extern const Metric IOUringPendingEvents; + extern const Metric IOUringInFlightEvents; + extern const Metric Read; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + extern const int IO_URING_INIT_FAILED; + extern const int IO_URING_SUBMIT_ERROR; +} + +IOUringReader::IOUringReader(uint32_t entries_) + : log(&Poco::Logger::get("IOUringReader")) +{ + struct io_uring_probe * probe = io_uring_get_probe(); + if (!probe) + { + is_supported = false; + return; + } + + is_supported = io_uring_opcode_supported(probe, IORING_OP_READ); + io_uring_free_probe(probe); + + if (!is_supported) + return; + + struct io_uring_params params = + { + .cq_entries = 0, // filled by the kernel, initializing to silence warning + .flags = 0, + }; + + int ret = io_uring_queue_init_params(entries_, &ring, ¶ms); + if (ret < 0) + throwFromErrno("Failed initializing io_uring", ErrorCodes::IO_URING_INIT_FAILED, -ret); + + cq_entries = params.cq_entries; + ring_completion_monitor = ThreadFromGlobalPool([this] { monitorRing(); }); +} + +std::future IOUringReader::submit(Request request) +{ + assert(request.size); + + // take lock here because we're modifying containers and submitting to the ring, + // the monitor thread can also do the same + std::unique_lock lock{mutex}; + + // use the requested read destination address as the request id, the assumption + // here is that we won't get asked to fill in the same address more than once in parallel + auto request_id = reinterpret_cast(request.buf); + + std::promise promise; + auto enqueued_request = EnqueuedRequest + { + .promise = std::move(promise), + .request = request, + .resubmitting = false, + .bytes_read = 0 + }; + + // if there's room in the completion queue submit the request to the ring immediately, + // otherwise push it to the back of the pending queue + if (in_flight_requests.size() < cq_entries) + { + int ret = submitToRing(enqueued_request); + if (ret > 0) + { + const auto [kv, success] = in_flight_requests.emplace(request_id, std::move(enqueued_request)); + if (!success) + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + return makeFailedResult(Exception( + ErrorCodes::LOGICAL_ERROR, "Tried enqueuing read request for {} that is already submitted", request_id)); + } + return (kv->second).promise.get_future(); + } + else + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + return makeFailedResult(Exception( + ErrorCodes::IO_URING_SUBMIT_ERROR, "Failed submitting SQE: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted")); + } + } + else + { + CurrentMetrics::add(CurrentMetrics::IOUringPendingEvents); + pending_requests.push_back(std::move(enqueued_request)); + return pending_requests.back().promise.get_future(); + } +} + +int IOUringReader::submitToRing(EnqueuedRequest & enqueued) +{ + struct io_uring_sqe * sqe = io_uring_get_sqe(&ring); + if (!sqe) + return 0; + + auto request = enqueued.request; + auto request_id = reinterpret_cast(request.buf); + int fd = assert_cast(*request.descriptor).fd; + + io_uring_sqe_set_data64(sqe, request_id); + io_uring_prep_read(sqe, fd, request.buf, static_cast(request.size - enqueued.bytes_read), request.offset + enqueued.bytes_read); + int ret = 0; + + do + { + ret = io_uring_submit(&ring); + } while (ret == -EINTR || ret == -EAGAIN); + + if (ret > 0 && !enqueued.resubmitting) + { + ProfileEvents::increment(ProfileEvents::IOUringSQEsSubmitted); + CurrentMetrics::add(CurrentMetrics::IOUringInFlightEvents); + CurrentMetrics::add(CurrentMetrics::Read); + } + + return ret; +} + +void IOUringReader::failRequest(const EnqueuedIterator & requestIt, const Exception & ex) +{ + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + (requestIt->second).promise.set_exception(std::make_exception_ptr(ex)); + + finalizeRequest(requestIt); +} + +void IOUringReader::finalizeRequest(const EnqueuedIterator & requestIt) +{ + in_flight_requests.erase(requestIt); + + CurrentMetrics::sub(CurrentMetrics::IOUringInFlightEvents); + CurrentMetrics::sub(CurrentMetrics::Read); + + // since we just finalized a request there's now room in the completion queue, + // see if there are any pending requests and submit one from the front of the queue + if (!pending_requests.empty()) + { + auto pending_request = std::move(pending_requests.front()); + pending_requests.pop_front(); + + int ret = submitToRing(pending_request); + if (ret > 0) + { + auto request_id = reinterpret_cast(pending_request.request.buf); + if (!in_flight_requests.contains(request_id)) + in_flight_requests.emplace(request_id, std::move(pending_request)); + else + failPromise(pending_request.promise, Exception(ErrorCodes::LOGICAL_ERROR, + "Tried enqueuing pending read request for {} that is already submitted", request_id)); + } + else + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); + failPromise(pending_request.promise, Exception(ErrorCodes::IO_URING_SUBMIT_ERROR, + "Failed submitting SQE for pending request: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted")); + } + + CurrentMetrics::sub(CurrentMetrics::IOUringPendingEvents); + } +} + +void IOUringReader::monitorRing() +{ + setThreadName("IOUringMonitor"); + + while (!cancelled.load(std::memory_order_relaxed)) + { + // we can't use wait_cqe_* variants with timeouts as they can + // submit timeout events in older kernels that do not support IORING_FEAT_EXT_ARG + // and it is not safe to mix submission and consumption event threads. + struct io_uring_cqe * cqe = nullptr; + int ret = io_uring_wait_cqe(&ring, &cqe); + + if (ret == -EAGAIN || ret == -EINTR) + { + LOG_DEBUG(log, "Restarting waiting for CQEs due to: {}", errnoToString(-ret)); + + io_uring_cqe_seen(&ring, cqe); + continue; + } + + if (ret < 0) + { + LOG_ERROR(log, "Failed waiting for io_uring CQEs: {}", errnoToString(-ret)); + continue; + } + + // user_data zero means a noop event sent from the destructor meant to interrupt the thread + if (cancelled.load(std::memory_order_relaxed) || (cqe && cqe->user_data == 0)) + { + LOG_DEBUG(log, "Stopping IOUringMonitor thread"); + + io_uring_cqe_seen(&ring, cqe); + break; + } + + if (!cqe) + { + LOG_ERROR(log, "Unexpectedly got a null CQE, continuing"); + continue; + } + + // it is safe to re-submit events once we take the lock here + std::unique_lock lock{mutex}; + + auto request_id = cqe->user_data; + const auto it = in_flight_requests.find(request_id); + if (it == in_flight_requests.end()) + { + LOG_ERROR(log, "Got a completion event for a request {} that was not submitted", request_id); + + io_uring_cqe_seen(&ring, cqe); + continue; + } + + auto & enqueued = it->second; + + if (cqe->res == -EAGAIN || cqe->res == -EINTR) + { + enqueued.resubmitting = true; + ProfileEvents::increment(ProfileEvents::IOUringSQEsResubmits); + + ret = submitToRing(enqueued); + if (ret <= 0) + { + failRequest(it, Exception(ErrorCodes::IO_URING_SUBMIT_ERROR, + "Failed re-submitting SQE: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted")); + } + + io_uring_cqe_seen(&ring, cqe); + continue; + } + + if (cqe->res < 0) + { + auto req = enqueued.request; + int fd = assert_cast(*req.descriptor).fd; + failRequest(it, Exception( + ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, + "Failed reading {} bytes at offset {} to address {} from fd {}: {}", + req.size, req.offset, static_cast(req.buf), fd, errnoToString(-cqe->res) + )); + + ProfileEvents::increment(ProfileEvents::IOUringCQEsFailed); + io_uring_cqe_seen(&ring, cqe); + continue; + } + + size_t bytes_read = cqe->res; + size_t total_bytes_read = enqueued.bytes_read + bytes_read; + + if (bytes_read > 0) + { + __msan_unpoison(enqueued.request.buf + enqueued.bytes_read, bytes_read); + + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + } + + if (bytes_read > 0 && total_bytes_read < enqueued.request.size) + { + // potential short read, re-submit + enqueued.resubmitting = true; + enqueued.bytes_read += bytes_read; + + ret = submitToRing(enqueued); + if (ret <= 0) + { + failRequest(it, Exception(ErrorCodes::IO_URING_SUBMIT_ERROR, + "Failed re-submitting SQE for short read: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted")); + } + } + else + { + enqueued.promise.set_value(Result{ .size = total_bytes_read, .offset = enqueued.request.ignore }); + finalizeRequest(it); + } + + ProfileEvents::increment(ProfileEvents::IOUringCQEsCompleted); + io_uring_cqe_seen(&ring, cqe); + } +} + +IOUringReader::~IOUringReader() +{ + cancelled.store(true, std::memory_order_relaxed); + + // interrupt the monitor thread by sending a noop event + { + std::unique_lock lock{mutex}; + + struct io_uring_sqe * sqe = io_uring_get_sqe(&ring); + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + io_uring_submit(&ring); + } + + ring_completion_monitor.join(); + + io_uring_queue_exit(&ring); +} + +} +#endif diff --git a/src/Disks/IO/IOUringReader.h b/src/Disks/IO/IOUringReader.h new file mode 100644 index 00000000000..beba3636ca7 --- /dev/null +++ b/src/Disks/IO/IOUringReader.h @@ -0,0 +1,78 @@ +#pragma once +#if defined(OS_LINUX) + +#include +#include +#include +#include +#include + +namespace DB +{ + +/** Perform reads using the io_uring Linux subsystem. + * + * The class sets up a single io_uring that clients submit read requests to, they are + * placed in a map using the read buffer address as the key and the original request + * with a promise as the value. A monitor thread continuously polls the completion queue, + * looks up the completed request and completes the matching promise. + */ +class IOUringReader final : public IAsynchronousReader +{ +private: + bool is_supported; + + std::mutex mutex; + struct io_uring ring; + uint32_t cq_entries; + + std::atomic cancelled{false}; + ThreadFromGlobalPool ring_completion_monitor; + + struct EnqueuedRequest + { + std::promise promise; + Request request; + bool resubmitting; // resubmits can happen due to short reads or when io_uring returns -EAGAIN + size_t bytes_read; // keep track of bytes already read in case short reads happen + }; + + std::deque pending_requests; + std::unordered_map in_flight_requests; + + int submitToRing(EnqueuedRequest & enqueued); + + using EnqueuedIterator = std::unordered_map::iterator; + + void failRequest(const EnqueuedIterator & requestIt, const Exception & ex); + void finalizeRequest(const EnqueuedIterator & requestIt); + + void monitorRing(); + + template inline void failPromise(std::promise & promise, const Exception & ex) + { + promise.set_exception(std::make_exception_ptr(ex)); + } + + inline std::future makeFailedResult(const Exception & ex) + { + auto promise = std::promise{}; + failPromise(promise, ex); + return promise.get_future(); + } + + const Poco::Logger * log; + +public: + IOUringReader(uint32_t entries_); + + inline bool isSupported() { return is_supported; } + std::future submit(Request request) override; + + void wait() override {} + + virtual ~IOUringReader() override; +}; + +} +#endif diff --git a/src/Disks/IO/createReadBufferFromFileBase.cpp b/src/Disks/IO/createReadBufferFromFileBase.cpp index e2522da85c9..47625705a10 100644 --- a/src/Disks/IO/createReadBufferFromFileBase.cpp +++ b/src/Disks/IO/createReadBufferFromFileBase.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNSUPPORTED_METHOD; } @@ -80,6 +82,19 @@ std::unique_ptr createReadBufferFromFileBase( { res = std::make_unique(filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); } + else if (settings.local_fs_method == LocalFSReadMethod::io_uring) + { +#if defined(OS_LINUX) + static std::shared_ptr reader = std::make_shared(512); + if (!reader->isSupported()) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system"); + + res = std::make_unique( + *reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size); +#else + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Read method io_uring is only supported in Linux"); +#endif + } else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async) { auto context = Context::getGlobalContextInstance(); diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index 3d8f9a05e53..4a0344bf11b 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -31,6 +31,13 @@ enum class LocalFSReadMethod */ mmap, + /** + * Use the io_uring Linux subsystem for asynchronous reads. + * Can use direct IO after specified size. + * Can do prefetch with double buffering. + */ + io_uring, + /** * Checks if data is in page cache with 'preadv2' on modern Linux kernels. * If data is in page cache, read from the same thread. diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 72e6615e720..50d940bc23c 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -491,7 +491,7 @@ class SettingsRandomizer: if random.random() < 0.2 else random.randint(1, 1024 * 1024 * 1024), "local_filesystem_read_method": lambda: random.choice( - ["read", "pread", "mmap", "pread_threadpool"] + ["read", "pread", "mmap", "pread_threadpool", "io_uring"] ), "remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]), "local_filesystem_read_prefetch": lambda: random.randint(0, 1), diff --git a/tests/queries/0_stateless/02051_read_settings.reference.j2 b/tests/queries/0_stateless/02051_read_settings.reference.j2 index 391cf3adf35..ee6c4bdd918 100644 --- a/tests/queries/0_stateless/02051_read_settings.reference.j2 +++ b/tests/queries/0_stateless/02051_read_settings.reference.j2 @@ -1,5 +1,5 @@ {% for index_granularity_bytes in [0, 10 * 1024 * 1024] -%} -{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] -%} +{% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] -%} {% for direct_io in [0, 1] -%} {% for prefetch in [0, 1] -%} {% for priority in [0, 1] -%} diff --git a/tests/queries/0_stateless/02051_read_settings.sql.j2 b/tests/queries/0_stateless/02051_read_settings.sql.j2 index 69dd3c264ba..1f121b0c268 100644 --- a/tests/queries/0_stateless/02051_read_settings.sql.j2 +++ b/tests/queries/0_stateless/02051_read_settings.sql.j2 @@ -19,7 +19,7 @@ settings as select number, repeat(toString(number), 5) from numbers(1e6); {# check each local_filesystem_read_method #} -{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] %} +{% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] %} {# check w/ O_DIRECT and w/o (min_bytes_to_use_direct_io) #} {% for direct_io in [0, 1] %} {# check local_filesystem_read_prefetch (just a smoke test) #}