From f236ec93be40b15307f99e8782ce5d4bc97d402c Mon Sep 17 00:00:00 2001 From: urgordeadbeef Date: Thu, 7 Feb 2019 18:08:45 +0300 Subject: [PATCH] Add FreeBSD AIO support. --- dbms/src/IO/AIO.cpp | 87 +++++++++++++++++++ dbms/src/IO/AIO.h | 34 ++++++++ dbms/src/IO/AIOContextPool.cpp | 6 +- dbms/src/IO/AIOContextPool.h | 2 +- dbms/src/IO/ReadBufferAIO.cpp | 13 ++- dbms/src/IO/ReadBufferAIO.h | 2 +- dbms/src/IO/WriteBufferAIO.cpp | 19 ++-- dbms/src/IO/WriteBufferAIO.h | 2 +- dbms/src/IO/createReadBufferFromFileBase.cpp | 4 +- dbms/src/IO/createWriteBufferFromFileBase.cpp | 4 +- 10 files changed, 156 insertions(+), 17 deletions(-) diff --git a/dbms/src/IO/AIO.cpp b/dbms/src/IO/AIO.cpp index e73319319b1..5b684e8ef60 100644 --- a/dbms/src/IO/AIO.cpp +++ b/dbms/src/IO/AIO.cpp @@ -53,4 +53,91 @@ AIOContext::~AIOContext() io_destroy(ctx); } +#elif defined(__FreeBSD__) + +#include +#include +#include +#include +#include +#include + +#include + + +/** Small wrappers for asynchronous I/O. + */ + +namespace DB +{ + namespace ErrorCodes + { + extern const int CANNOT_IOSETUP; + } +} + + +int io_setup(void) +{ + return kqueue(); +} + +int io_destroy(int ctx) +{ + return close(ctx); +} + +int io_submit(int ctx, long nr, struct iocb * iocbpp[]) +{ + long i; + int r; + struct sigevent *se; + struct aiocb *iocb; + + for (i = 0; i < nr; i ++) { + iocb = &iocbpp[i]->aio; + + se = &iocb->aio_sigevent; + se->sigev_notify_kqueue = ctx; + se->sigev_notify_kevent_flags = 0; + se->sigev_notify = SIGEV_KEVENT; + se->sigev_value.sival_ptr = iocbpp[i]; + + switch(iocb->aio_lio_opcode) { + case LIO_READ: + r = aio_read(iocb); + break; + case LIO_WRITE: + r = aio_write(iocb); + break; + default: break; + } + if (r < 0) { + return r; + } + } + + return i; +} + +int io_getevents(int ctx, long min_nr, long max_nr, struct kevent * events, struct timespec * timeout) +{ + min_nr = 0; + return kevent(ctx, NULL, 0, events, max_nr, timeout); +} + + +AIOContext::AIOContext(unsigned int nr_events) +{ + nr_events = 0; + ctx = io_setup(); + if (ctx < 0) + DB::throwFromErrno("io_setup failed", DB::ErrorCodes::CANNOT_IOSETUP); +} + +AIOContext::~AIOContext() +{ + io_destroy(ctx); +} + #endif diff --git a/dbms/src/IO/AIO.h b/dbms/src/IO/AIO.h index d99505fb017..09363838aac 100644 --- a/dbms/src/IO/AIO.h +++ b/dbms/src/IO/AIO.h @@ -39,4 +39,38 @@ struct AIOContext : private boost::noncopyable ~AIOContext(); }; +#elif defined(__FreeBSD__) + +#include +#include +#include +#include +#include + +typedef struct kevent io_event; +typedef int aio_context_t; + +struct iocb { + struct aiocb aio; + long aio_data; +}; + +int io_setup(void); + +int io_destroy(void); + +/// last argument is an array of pointers technically speaking +int io_submit(int ctx, long nr, struct iocb * iocbpp[]); + +int io_getevents(int ctx, long min_nr, long max_nr, struct kevent * events, struct timespec * timeout); + + +struct AIOContext : private boost::noncopyable +{ + int ctx; + + AIOContext(unsigned int nr_events = 128); + ~AIOContext(); +}; + #endif diff --git a/dbms/src/IO/AIOContextPool.cpp b/dbms/src/IO/AIOContextPool.cpp index 65161fcc600..db73e29fee6 100644 --- a/dbms/src/IO/AIOContextPool.cpp +++ b/dbms/src/IO/AIOContextPool.cpp @@ -1,4 +1,4 @@ -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #include @@ -94,7 +94,11 @@ void AIOContextPool::fulfillPromises(const io_event events[], const int num_even continue; } +#if defined(__FreeBSD__) + it->second.set_value(aio_return((struct aiocb *)event.udata)); +#else it->second.set_value(event.res); +#endif promises.erase(it); } } diff --git a/dbms/src/IO/AIOContextPool.h b/dbms/src/IO/AIOContextPool.h index ca92e14b6ed..d90a79ba4cb 100644 --- a/dbms/src/IO/AIOContextPool.h +++ b/dbms/src/IO/AIOContextPool.h @@ -1,6 +1,6 @@ #pragma once -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #include diff --git a/dbms/src/IO/ReadBufferAIO.cpp b/dbms/src/IO/ReadBufferAIO.cpp index 0e9fc400328..78e11ed367d 100644 --- a/dbms/src/IO/ReadBufferAIO.cpp +++ b/dbms/src/IO/ReadBufferAIO.cpp @@ -1,4 +1,4 @@ -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #include @@ -120,11 +120,16 @@ bool ReadBufferAIO::nextImpl() /// Create an asynchronous request. prepare(); +#if defined(__FreeBSD__) + request.aio.aio_lio_opcode = LIO_READ; + request.aio.aio_buf = reinterpret_cast(buffer_begin); +#else request.aio_lio_opcode = IOCB_CMD_PREAD; - request.aio_fildes = fd; request.aio_buf = reinterpret_cast(buffer_begin); - request.aio_nbytes = region_aligned_size; - request.aio_offset = region_aligned_begin; +#endif + request.aio.aio_fildes = fd; + request.aio.aio_nbytes = region_aligned_size; + request.aio.aio_offset = region_aligned_begin; /// Send the request. try diff --git a/dbms/src/IO/ReadBufferAIO.h b/dbms/src/IO/ReadBufferAIO.h index a30057565c0..e8d7265f69f 100644 --- a/dbms/src/IO/ReadBufferAIO.h +++ b/dbms/src/IO/ReadBufferAIO.h @@ -1,6 +1,6 @@ #pragma once -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #include diff --git a/dbms/src/IO/WriteBufferAIO.cpp b/dbms/src/IO/WriteBufferAIO.cpp index fdd6a61fb7b..d22e1cf5412 100644 --- a/dbms/src/IO/WriteBufferAIO.cpp +++ b/dbms/src/IO/WriteBufferAIO.cpp @@ -1,4 +1,4 @@ -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #include @@ -110,11 +110,16 @@ void WriteBufferAIO::nextImpl() /// Create a request for asynchronous write. prepare(); +#if defined(__FreeBSD__) + request.aio.aio_lio_opcode = LIO_WRITE; + request.aio.aio_buf = reinterpret_cast(buffer_begin); +#else request.aio_lio_opcode = IOCB_CMD_PWRITE; - request.aio_fildes = fd; request.aio_buf = reinterpret_cast(buffer_begin); - request.aio_nbytes = region_aligned_size; - request.aio_offset = region_aligned_begin; +#endif + request.aio.aio_fildes = fd; + request.aio.aio_nbytes = region_aligned_size; + request.aio.aio_offset = region_aligned_begin; /// Send the request. while (io_submit(aio_context.ctx, 1, &request_ptr) < 0) @@ -193,7 +198,11 @@ bool WriteBufferAIO::waitForAIOCompletion() } is_pending_write = false; +#if defined(__FreeBSD__) + bytes_written = aio_return((struct aiocb *)event.udata); +#else bytes_written = event.res; +#endif ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite); ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written); @@ -396,7 +405,7 @@ void WriteBufferAIO::finalize() bytes_written -= truncation_count; - off_t pos_offset = bytes_written - (pos_in_file - request.aio_offset); + off_t pos_offset = bytes_written - (pos_in_file - request.aio.aio_offset); if (pos_in_file > (std::numeric_limits::max() - pos_offset)) throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); pos_in_file += pos_offset; diff --git a/dbms/src/IO/WriteBufferAIO.h b/dbms/src/IO/WriteBufferAIO.h index 7b8d275dfcd..fba0c274098 100644 --- a/dbms/src/IO/WriteBufferAIO.h +++ b/dbms/src/IO/WriteBufferAIO.h @@ -1,6 +1,6 @@ #pragma once -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #include diff --git a/dbms/src/IO/createReadBufferFromFileBase.cpp b/dbms/src/IO/createReadBufferFromFileBase.cpp index 7db36924201..7deef749369 100644 --- a/dbms/src/IO/createReadBufferFromFileBase.cpp +++ b/dbms/src/IO/createReadBufferFromFileBase.cpp @@ -1,6 +1,6 @@ #include #include -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #endif #include @@ -31,7 +31,7 @@ std::unique_ptr createReadBufferFromFileBase(const std:: } else { -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO); return std::make_unique(filename_, buffer_size_, flags_, existing_memory_); #else diff --git a/dbms/src/IO/createWriteBufferFromFileBase.cpp b/dbms/src/IO/createWriteBufferFromFileBase.cpp index 1fa26d21c6a..26c92e249b4 100644 --- a/dbms/src/IO/createWriteBufferFromFileBase.cpp +++ b/dbms/src/IO/createWriteBufferFromFileBase.cpp @@ -1,6 +1,6 @@ #include #include -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) #include #endif #include @@ -33,7 +33,7 @@ std::unique_ptr createWriteBufferFromFileBase(const std } else { -#if defined(__linux__) +#if defined(__linux__) || defined(__FreeBSD__) ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO); return std::make_unique(filename_, buffer_size_, flags_, mode, existing_memory_); #else