mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #38456 from sauliusvl/iouring
Re-introduce io_uring read method
This commit is contained in:
commit
bf9f62dcbd
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -330,3 +330,6 @@
|
||||
[submodule "contrib/crc32-vpmsum"]
|
||||
path = contrib/crc32-vpmsum
|
||||
url = https://github.com/antonblanchard/crc32-vpmsum.git
|
||||
[submodule "contrib/liburing"]
|
||||
path = contrib/liburing
|
||||
url = https://github.com/axboe/liburing
|
||||
|
1
contrib/CMakeLists.txt
vendored
1
contrib/CMakeLists.txt
vendored
@ -140,6 +140,7 @@ add_contrib (simdjson-cmake simdjson)
|
||||
add_contrib (rapidjson-cmake rapidjson)
|
||||
add_contrib (fastops-cmake fastops)
|
||||
add_contrib (libuv-cmake libuv)
|
||||
add_contrib (liburing-cmake liburing)
|
||||
add_contrib (amqpcpp-cmake AMQP-CPP) # requires: libuv
|
||||
add_contrib (cassandra-cmake cassandra) # requires: libuv
|
||||
|
||||
|
1
contrib/liburing
vendored
Submodule
1
contrib/liburing
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit f5a48392c4ea33f222cbebeb2e2fc31620162949
|
53
contrib/liburing-cmake/CMakeLists.txt
Normal file
53
contrib/liburing-cmake/CMakeLists.txt
Normal file
@ -0,0 +1,53 @@
|
||||
set (ENABLE_LIBURING_DEFAULT ${ENABLE_LIBRARIES})
|
||||
|
||||
if (NOT OS_LINUX)
|
||||
set (ENABLE_LIBURING_DEFAULT OFF)
|
||||
endif ()
|
||||
|
||||
option (ENABLE_LIBURING "Enable liburing" ${ENABLE_LIBURING_DEFAULT})
|
||||
|
||||
if (NOT ENABLE_LIBURING)
|
||||
message (STATUS "Not using liburing")
|
||||
return ()
|
||||
endif ()
|
||||
|
||||
set (LIBURING_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liburing/src/include")
|
||||
set (LIBURING_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liburing/src")
|
||||
|
||||
set (SRCS
|
||||
"${LIBURING_SOURCE_DIR}/queue.c"
|
||||
"${LIBURING_SOURCE_DIR}/register.c"
|
||||
"${LIBURING_SOURCE_DIR}/setup.c"
|
||||
"${LIBURING_SOURCE_DIR}/syscall.c"
|
||||
"${LIBURING_SOURCE_DIR}/version.c"
|
||||
)
|
||||
|
||||
add_compile_definitions (_GNU_SOURCE)
|
||||
add_compile_definitions (LIBURING_INTERNAL)
|
||||
|
||||
set (LIBURING_COMPAT_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/liburing/src/include-compat")
|
||||
set (LIBURING_COMPAT_HEADER "${LIBURING_COMPAT_INCLUDE_DIR}/liburing/compat.h")
|
||||
|
||||
set (LIBURING_CONFIG_HAS_KERNEL_RWF_T FALSE)
|
||||
set (LIBURING_CONFIG_HAS_KERNEL_TIMESPEC FALSE)
|
||||
set (LIBURING_CONFIG_HAS_OPEN_HOW FALSE)
|
||||
set (LIBURING_CONFIG_HAS_STATX FALSE)
|
||||
set (LIBURING_CONFIG_HAS_GLIBC_STATX FALSE)
|
||||
|
||||
configure_file (compat.h.in ${LIBURING_COMPAT_HEADER})
|
||||
|
||||
set (LIBURING_GENERATED_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/liburing/src/include")
|
||||
set (LIBURING_VERSION_HEADER "${LIBURING_GENERATED_INCLUDE_DIR}/liburing/io_uring_version.h")
|
||||
|
||||
file (READ "${LIBURING_SOURCE_DIR}/../liburing.spec" LIBURING_SPEC)
|
||||
|
||||
string (REGEX MATCH "Version: ([0-9]+)\.([0-9]+)" _ ${LIBURING_SPEC})
|
||||
set (LIBURING_VERSION_MAJOR ${CMAKE_MATCH_1})
|
||||
set (LIBURING_VERSION_MINOR ${CMAKE_MATCH_2})
|
||||
|
||||
configure_file (io_uring_version.h.in ${LIBURING_VERSION_HEADER})
|
||||
|
||||
add_library (_liburing ${SRCS})
|
||||
add_library (ch_contrib::liburing ALIAS _liburing)
|
||||
|
||||
target_include_directories (_liburing SYSTEM PUBLIC ${LIBURING_COMPAT_INCLUDE_DIR} ${LIBURING_GENERATED_INCLUDE_DIR} "${LIBURING_SOURCE_DIR}/include")
|
50
contrib/liburing-cmake/compat.h.in
Normal file
50
contrib/liburing-cmake/compat.h.in
Normal file
@ -0,0 +1,50 @@
|
||||
/* SPDX-License-Identifier: MIT */
|
||||
#ifndef LIBURING_COMPAT_H
|
||||
#define LIBURING_COMPAT_H
|
||||
|
||||
# cmakedefine LIBURING_CONFIG_HAS_KERNEL_RWF_T
|
||||
# cmakedefine LIBURING_CONFIG_HAS_KERNEL_TIMESPEC
|
||||
# cmakedefine LIBURING_CONFIG_HAS_OPEN_HOW
|
||||
# cmakedefine LIBURING_CONFIG_HAS_GLIBC_STATX
|
||||
# cmakedefine LIBURING_CONFIG_HAS_STATX
|
||||
|
||||
#if !defined(LIBURING_CONFIG_HAS_KERNEL_RWF_T)
|
||||
typedef int __kernel_rwf_t;
|
||||
#endif
|
||||
|
||||
#if !defined(LIBURING_CONFIG_HAS_KERNEL_TIMESPEC)
|
||||
#include <stdint.h>
|
||||
|
||||
struct __kernel_timespec {
|
||||
int64_t tv_sec;
|
||||
long long tv_nsec;
|
||||
};
|
||||
|
||||
/* <linux/time_types.h> is not available, so it can't be included */
|
||||
#define UAPI_LINUX_IO_URING_H_SKIP_LINUX_TIME_TYPES_H 1
|
||||
|
||||
#else
|
||||
#include <linux/time_types.h>
|
||||
|
||||
/* <linux/time_types.h> is included above and not needed again */
|
||||
#define UAPI_LINUX_IO_URING_H_SKIP_LINUX_TIME_TYPES_H 1
|
||||
|
||||
#endif
|
||||
|
||||
#if !defined(LIBURING_CONFIG_HAS_OPEN_HOW)
|
||||
#include <inttypes.h>
|
||||
|
||||
struct open_how {
|
||||
uint64_t flags;
|
||||
uint64_t mode;
|
||||
uint64_t resolve;
|
||||
};
|
||||
#else
|
||||
#include <linux/openat2.h>
|
||||
#endif
|
||||
|
||||
#if !defined(LIBURING_CONFIG_HAS_GLIBC_STATX) && defined(LIBURING_CONFIG_HAS_STATX)
|
||||
#include <sys/stat.h>
|
||||
#endif
|
||||
|
||||
#endif
|
8
contrib/liburing-cmake/io_uring_version.h.in
Normal file
8
contrib/liburing-cmake/io_uring_version.h.in
Normal file
@ -0,0 +1,8 @@
|
||||
/* SPDX-License-Identifier: MIT */
|
||||
#ifndef LIBURING_VERSION_H
|
||||
#define LIBURING_VERSION_H
|
||||
|
||||
#define IO_URING_VERSION_MAJOR ${LIBURING_VERSION_MAJOR}
|
||||
#define IO_URING_VERSION_MINOR ${LIBURING_VERSION_MINOR}
|
||||
|
||||
#endif
|
@ -139,6 +139,7 @@ function clone_submodules
|
||||
contrib/morton-nd
|
||||
contrib/xxHash
|
||||
contrib/simdjson
|
||||
contrib/liburing
|
||||
)
|
||||
|
||||
git submodule sync
|
||||
@ -161,6 +162,7 @@ function run_cmake
|
||||
"-DENABLE_NURAFT=1"
|
||||
"-DENABLE_SIMDJSON=1"
|
||||
"-DENABLE_JEMALLOC=1"
|
||||
"-DENABLE_LIBURING=1"
|
||||
)
|
||||
|
||||
export CCACHE_DIR="$FASTTEST_WORKSPACE/ccache"
|
||||
|
@ -513,6 +513,11 @@ if (TARGET ch_contrib::msgpack)
|
||||
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::msgpack)
|
||||
endif()
|
||||
|
||||
if (TARGET ch_contrib::liburing)
|
||||
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::liburing)
|
||||
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${LIBURING_COMPAT_INCLUDE_DIR} ${LIBURING_INCLUDE_DIR})
|
||||
endif()
|
||||
|
||||
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::fast_float)
|
||||
|
||||
if (USE_ORC)
|
||||
|
@ -106,6 +106,8 @@
|
||||
M(KeeperAliveConnections, "Number of alive connections") \
|
||||
M(KeeperOutstandingRequets, "Number of outstanding requests") \
|
||||
M(ThreadsInOvercommitTracker, "Number of waiting threads inside of OvercommitTracker") \
|
||||
M(IOUringPendingEvents, "Number of io_uring SQEs waiting to be submitted") \
|
||||
M(IOUringInFlightEvents, "Number of io_uring SQEs in flight") \
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
|
@ -646,6 +646,8 @@
|
||||
M(675, CANNOT_PARSE_IPV4) \
|
||||
M(676, CANNOT_PARSE_IPV6) \
|
||||
M(677, THREAD_WAS_CANCELED) \
|
||||
M(678, IO_URING_INIT_FAILED) \
|
||||
M(679, IO_URING_SUBMIT_ERROR) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -472,6 +472,10 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
|
||||
\
|
||||
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
|
||||
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \
|
||||
M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \
|
||||
M(IOUringCQEsCompleted, "Total number of successfully completed io_uring CQEs") \
|
||||
M(IOUringCQEsFailed, "Total number of completed io_uring CQEs with failures") \
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -603,7 +603,7 @@ class IColumn;
|
||||
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
|
||||
\
|
||||
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap.", 0) \
|
||||
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
|
||||
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool.", 0) \
|
||||
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
|
||||
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
|
||||
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
|
||||
|
341
src/Disks/IO/IOUringReader.cpp
Normal file
341
src/Disks/IO/IOUringReader.cpp
Normal file
@ -0,0 +1,341 @@
|
||||
#if defined(OS_LINUX)
|
||||
|
||||
#include "IOUringReader.h"
|
||||
#include <base/errnoToString.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/MemorySanitizer.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <future>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ReadBufferFromFileDescriptorRead;
|
||||
extern const Event ReadBufferFromFileDescriptorReadFailed;
|
||||
extern const Event ReadBufferFromFileDescriptorReadBytes;
|
||||
|
||||
extern const Event IOUringSQEsSubmitted;
|
||||
extern const Event IOUringSQEsResubmits;
|
||||
extern const Event IOUringCQEsCompleted;
|
||||
extern const Event IOUringCQEsFailed;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric IOUringPendingEvents;
|
||||
extern const Metric IOUringInFlightEvents;
|
||||
extern const Metric Read;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
|
||||
extern const int IO_URING_INIT_FAILED;
|
||||
extern const int IO_URING_SUBMIT_ERROR;
|
||||
}
|
||||
|
||||
IOUringReader::IOUringReader(uint32_t entries_)
|
||||
: log(&Poco::Logger::get("IOUringReader"))
|
||||
{
|
||||
struct io_uring_probe * probe = io_uring_get_probe();
|
||||
if (!probe)
|
||||
{
|
||||
is_supported = false;
|
||||
return;
|
||||
}
|
||||
|
||||
is_supported = io_uring_opcode_supported(probe, IORING_OP_READ);
|
||||
io_uring_free_probe(probe);
|
||||
|
||||
if (!is_supported)
|
||||
return;
|
||||
|
||||
struct io_uring_params params =
|
||||
{
|
||||
.cq_entries = 0, // filled by the kernel, initializing to silence warning
|
||||
.flags = 0,
|
||||
};
|
||||
|
||||
int ret = io_uring_queue_init_params(entries_, &ring, ¶ms);
|
||||
if (ret < 0)
|
||||
throwFromErrno("Failed initializing io_uring", ErrorCodes::IO_URING_INIT_FAILED, -ret);
|
||||
|
||||
cq_entries = params.cq_entries;
|
||||
ring_completion_monitor = ThreadFromGlobalPool([this] { monitorRing(); });
|
||||
}
|
||||
|
||||
std::future<IAsynchronousReader::Result> IOUringReader::submit(Request request)
|
||||
{
|
||||
assert(request.size);
|
||||
|
||||
// take lock here because we're modifying containers and submitting to the ring,
|
||||
// the monitor thread can also do the same
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
// use the requested read destination address as the request id, the assumption
|
||||
// here is that we won't get asked to fill in the same address more than once in parallel
|
||||
auto request_id = reinterpret_cast<UInt64>(request.buf);
|
||||
|
||||
std::promise<IAsynchronousReader::Result> promise;
|
||||
auto enqueued_request = EnqueuedRequest
|
||||
{
|
||||
.promise = std::move(promise),
|
||||
.request = request,
|
||||
.resubmitting = false,
|
||||
.bytes_read = 0
|
||||
};
|
||||
|
||||
// if there's room in the completion queue submit the request to the ring immediately,
|
||||
// otherwise push it to the back of the pending queue
|
||||
if (in_flight_requests.size() < cq_entries)
|
||||
{
|
||||
int ret = submitToRing(enqueued_request);
|
||||
if (ret > 0)
|
||||
{
|
||||
const auto [kv, success] = in_flight_requests.emplace(request_id, std::move(enqueued_request));
|
||||
if (!success)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
|
||||
return makeFailedResult(Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Tried enqueuing read request for {} that is already submitted", request_id));
|
||||
}
|
||||
return (kv->second).promise.get_future();
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
|
||||
return makeFailedResult(Exception(
|
||||
ErrorCodes::IO_URING_SUBMIT_ERROR, "Failed submitting SQE: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted"));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
CurrentMetrics::add(CurrentMetrics::IOUringPendingEvents);
|
||||
pending_requests.push_back(std::move(enqueued_request));
|
||||
return pending_requests.back().promise.get_future();
|
||||
}
|
||||
}
|
||||
|
||||
int IOUringReader::submitToRing(EnqueuedRequest & enqueued)
|
||||
{
|
||||
struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
|
||||
if (!sqe)
|
||||
return 0;
|
||||
|
||||
auto request = enqueued.request;
|
||||
auto request_id = reinterpret_cast<UInt64>(request.buf);
|
||||
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
|
||||
|
||||
io_uring_sqe_set_data64(sqe, request_id);
|
||||
io_uring_prep_read(sqe, fd, request.buf, static_cast<unsigned>(request.size - enqueued.bytes_read), request.offset + enqueued.bytes_read);
|
||||
int ret = 0;
|
||||
|
||||
do
|
||||
{
|
||||
ret = io_uring_submit(&ring);
|
||||
} while (ret == -EINTR || ret == -EAGAIN);
|
||||
|
||||
if (ret > 0 && !enqueued.resubmitting)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::IOUringSQEsSubmitted);
|
||||
CurrentMetrics::add(CurrentMetrics::IOUringInFlightEvents);
|
||||
CurrentMetrics::add(CurrentMetrics::Read);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void IOUringReader::failRequest(const EnqueuedIterator & requestIt, const Exception & ex)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
|
||||
(requestIt->second).promise.set_exception(std::make_exception_ptr(ex));
|
||||
|
||||
finalizeRequest(requestIt);
|
||||
}
|
||||
|
||||
void IOUringReader::finalizeRequest(const EnqueuedIterator & requestIt)
|
||||
{
|
||||
in_flight_requests.erase(requestIt);
|
||||
|
||||
CurrentMetrics::sub(CurrentMetrics::IOUringInFlightEvents);
|
||||
CurrentMetrics::sub(CurrentMetrics::Read);
|
||||
|
||||
// since we just finalized a request there's now room in the completion queue,
|
||||
// see if there are any pending requests and submit one from the front of the queue
|
||||
if (!pending_requests.empty())
|
||||
{
|
||||
auto pending_request = std::move(pending_requests.front());
|
||||
pending_requests.pop_front();
|
||||
|
||||
int ret = submitToRing(pending_request);
|
||||
if (ret > 0)
|
||||
{
|
||||
auto request_id = reinterpret_cast<UInt64>(pending_request.request.buf);
|
||||
if (!in_flight_requests.contains(request_id))
|
||||
in_flight_requests.emplace(request_id, std::move(pending_request));
|
||||
else
|
||||
failPromise(pending_request.promise, Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Tried enqueuing pending read request for {} that is already submitted", request_id));
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
|
||||
failPromise(pending_request.promise, Exception(ErrorCodes::IO_URING_SUBMIT_ERROR,
|
||||
"Failed submitting SQE for pending request: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted"));
|
||||
}
|
||||
|
||||
CurrentMetrics::sub(CurrentMetrics::IOUringPendingEvents);
|
||||
}
|
||||
}
|
||||
|
||||
void IOUringReader::monitorRing()
|
||||
{
|
||||
setThreadName("IOUringMonitor");
|
||||
|
||||
while (!cancelled.load(std::memory_order_relaxed))
|
||||
{
|
||||
// we can't use wait_cqe_* variants with timeouts as they can
|
||||
// submit timeout events in older kernels that do not support IORING_FEAT_EXT_ARG
|
||||
// and it is not safe to mix submission and consumption event threads.
|
||||
struct io_uring_cqe * cqe = nullptr;
|
||||
int ret = io_uring_wait_cqe(&ring, &cqe);
|
||||
|
||||
if (ret == -EAGAIN || ret == -EINTR)
|
||||
{
|
||||
LOG_DEBUG(log, "Restarting waiting for CQEs due to: {}", errnoToString(-ret));
|
||||
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ret < 0)
|
||||
{
|
||||
LOG_ERROR(log, "Failed waiting for io_uring CQEs: {}", errnoToString(-ret));
|
||||
continue;
|
||||
}
|
||||
|
||||
// user_data zero means a noop event sent from the destructor meant to interrupt the thread
|
||||
if (cancelled.load(std::memory_order_relaxed) || (cqe && cqe->user_data == 0))
|
||||
{
|
||||
LOG_DEBUG(log, "Stopping IOUringMonitor thread");
|
||||
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!cqe)
|
||||
{
|
||||
LOG_ERROR(log, "Unexpectedly got a null CQE, continuing");
|
||||
continue;
|
||||
}
|
||||
|
||||
// it is safe to re-submit events once we take the lock here
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
auto request_id = cqe->user_data;
|
||||
const auto it = in_flight_requests.find(request_id);
|
||||
if (it == in_flight_requests.end())
|
||||
{
|
||||
LOG_ERROR(log, "Got a completion event for a request {} that was not submitted", request_id);
|
||||
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto & enqueued = it->second;
|
||||
|
||||
if (cqe->res == -EAGAIN || cqe->res == -EINTR)
|
||||
{
|
||||
enqueued.resubmitting = true;
|
||||
ProfileEvents::increment(ProfileEvents::IOUringSQEsResubmits);
|
||||
|
||||
ret = submitToRing(enqueued);
|
||||
if (ret <= 0)
|
||||
{
|
||||
failRequest(it, Exception(ErrorCodes::IO_URING_SUBMIT_ERROR,
|
||||
"Failed re-submitting SQE: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted"));
|
||||
}
|
||||
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (cqe->res < 0)
|
||||
{
|
||||
auto req = enqueued.request;
|
||||
int fd = assert_cast<const LocalFileDescriptor &>(*req.descriptor).fd;
|
||||
failRequest(it, Exception(
|
||||
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR,
|
||||
"Failed reading {} bytes at offset {} to address {} from fd {}: {}",
|
||||
req.size, req.offset, static_cast<void*>(req.buf), fd, errnoToString(-cqe->res)
|
||||
));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::IOUringCQEsFailed);
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t bytes_read = cqe->res;
|
||||
size_t total_bytes_read = enqueued.bytes_read + bytes_read;
|
||||
|
||||
if (bytes_read > 0)
|
||||
{
|
||||
__msan_unpoison(enqueued.request.buf + enqueued.bytes_read, bytes_read);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
|
||||
}
|
||||
|
||||
if (bytes_read > 0 && total_bytes_read < enqueued.request.size)
|
||||
{
|
||||
// potential short read, re-submit
|
||||
enqueued.resubmitting = true;
|
||||
enqueued.bytes_read += bytes_read;
|
||||
|
||||
ret = submitToRing(enqueued);
|
||||
if (ret <= 0)
|
||||
{
|
||||
failRequest(it, Exception(ErrorCodes::IO_URING_SUBMIT_ERROR,
|
||||
"Failed re-submitting SQE for short read: {}", ret < 0 ? errnoToString(-ret) : "no SQE submitted"));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
enqueued.promise.set_value(Result{ .size = total_bytes_read, .offset = enqueued.request.ignore });
|
||||
finalizeRequest(it);
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::IOUringCQEsCompleted);
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
}
|
||||
|
||||
IOUringReader::~IOUringReader()
|
||||
{
|
||||
cancelled.store(true, std::memory_order_relaxed);
|
||||
|
||||
// interrupt the monitor thread by sending a noop event
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
struct io_uring_sqe * sqe = io_uring_get_sqe(&ring);
|
||||
io_uring_prep_nop(sqe);
|
||||
io_uring_sqe_set_data(sqe, nullptr);
|
||||
io_uring_submit(&ring);
|
||||
}
|
||||
|
||||
ring_completion_monitor.join();
|
||||
|
||||
io_uring_queue_exit(&ring);
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
78
src/Disks/IO/IOUringReader.h
Normal file
78
src/Disks/IO/IOUringReader.h
Normal file
@ -0,0 +1,78 @@
|
||||
#pragma once
|
||||
#if defined(OS_LINUX)
|
||||
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <IO/AsynchronousReader.h>
|
||||
#include <deque>
|
||||
#include <unordered_map>
|
||||
#include <liburing.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Perform reads using the io_uring Linux subsystem.
|
||||
*
|
||||
* The class sets up a single io_uring that clients submit read requests to, they are
|
||||
* placed in a map using the read buffer address as the key and the original request
|
||||
* with a promise as the value. A monitor thread continuously polls the completion queue,
|
||||
* looks up the completed request and completes the matching promise.
|
||||
*/
|
||||
class IOUringReader final : public IAsynchronousReader
|
||||
{
|
||||
private:
|
||||
bool is_supported;
|
||||
|
||||
std::mutex mutex;
|
||||
struct io_uring ring;
|
||||
uint32_t cq_entries;
|
||||
|
||||
std::atomic<bool> cancelled{false};
|
||||
ThreadFromGlobalPool ring_completion_monitor;
|
||||
|
||||
struct EnqueuedRequest
|
||||
{
|
||||
std::promise<IAsynchronousReader::Result> promise;
|
||||
Request request;
|
||||
bool resubmitting; // resubmits can happen due to short reads or when io_uring returns -EAGAIN
|
||||
size_t bytes_read; // keep track of bytes already read in case short reads happen
|
||||
};
|
||||
|
||||
std::deque<EnqueuedRequest> pending_requests;
|
||||
std::unordered_map<UInt64, EnqueuedRequest> in_flight_requests;
|
||||
|
||||
int submitToRing(EnqueuedRequest & enqueued);
|
||||
|
||||
using EnqueuedIterator = std::unordered_map<UInt64, EnqueuedRequest>::iterator;
|
||||
|
||||
void failRequest(const EnqueuedIterator & requestIt, const Exception & ex);
|
||||
void finalizeRequest(const EnqueuedIterator & requestIt);
|
||||
|
||||
void monitorRing();
|
||||
|
||||
template<typename T> inline void failPromise(std::promise<T> & promise, const Exception & ex)
|
||||
{
|
||||
promise.set_exception(std::make_exception_ptr(ex));
|
||||
}
|
||||
|
||||
inline std::future<Result> makeFailedResult(const Exception & ex)
|
||||
{
|
||||
auto promise = std::promise<Result>{};
|
||||
failPromise(promise, ex);
|
||||
return promise.get_future();
|
||||
}
|
||||
|
||||
const Poco::Logger * log;
|
||||
|
||||
public:
|
||||
IOUringReader(uint32_t entries_);
|
||||
|
||||
inline bool isSupported() { return is_supported; }
|
||||
std::future<Result> submit(Request request) override;
|
||||
|
||||
void wait() override {}
|
||||
|
||||
virtual ~IOUringReader() override;
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
@ -3,6 +3,7 @@
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/MMapReadBufferFromFileWithCache.h>
|
||||
#include <IO/AsynchronousReadBufferFromFile.h>
|
||||
#include <Disks/IO/IOUringReader.h>
|
||||
#include <Disks/IO/ThreadPoolReader.h>
|
||||
#include <IO/SynchronousReader.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
@ -23,6 +24,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
|
||||
@ -80,6 +82,19 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
||||
{
|
||||
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size);
|
||||
}
|
||||
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
|
||||
{
|
||||
#if defined(OS_LINUX)
|
||||
static std::shared_ptr<IOUringReader> reader = std::make_shared<IOUringReader>(512);
|
||||
if (!reader->isSupported())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
|
||||
|
||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||
*reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, buffer_alignment, file_size);
|
||||
#else
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Read method io_uring is only supported in Linux");
|
||||
#endif
|
||||
}
|
||||
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
|
||||
{
|
||||
auto context = Context::getGlobalContextInstance();
|
||||
|
@ -31,6 +31,13 @@ enum class LocalFSReadMethod
|
||||
*/
|
||||
mmap,
|
||||
|
||||
/**
|
||||
* Use the io_uring Linux subsystem for asynchronous reads.
|
||||
* Can use direct IO after specified size.
|
||||
* Can do prefetch with double buffering.
|
||||
*/
|
||||
io_uring,
|
||||
|
||||
/**
|
||||
* Checks if data is in page cache with 'preadv2' on modern Linux kernels.
|
||||
* If data is in page cache, read from the same thread.
|
||||
|
@ -491,7 +491,7 @@ class SettingsRandomizer:
|
||||
if random.random() < 0.2
|
||||
else random.randint(1, 1024 * 1024 * 1024),
|
||||
"local_filesystem_read_method": lambda: random.choice(
|
||||
["read", "pread", "mmap", "pread_threadpool"]
|
||||
["read", "pread", "mmap", "pread_threadpool", "io_uring"]
|
||||
),
|
||||
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
|
||||
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
||||
|
@ -1,5 +1,5 @@
|
||||
{% for index_granularity_bytes in [0, 10 * 1024 * 1024] -%}
|
||||
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] -%}
|
||||
{% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] -%}
|
||||
{% for direct_io in [0, 1] -%}
|
||||
{% for prefetch in [0, 1] -%}
|
||||
{% for priority in [0, 1] -%}
|
||||
|
@ -19,7 +19,7 @@ settings
|
||||
as select number, repeat(toString(number), 5) from numbers(1e6);
|
||||
|
||||
{# check each local_filesystem_read_method #}
|
||||
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] %}
|
||||
{% for read_method in ['read', 'mmap', 'io_uring', 'pread_threadpool', 'pread_fake_async'] %}
|
||||
{# check w/ O_DIRECT and w/o (min_bytes_to_use_direct_io) #}
|
||||
{% for direct_io in [0, 1] %}
|
||||
{# check local_filesystem_read_prefetch (just a smoke test) #}
|
||||
|
Loading…
Reference in New Issue
Block a user