Add FreeBSD AIO support.

This commit is contained in:
urgordeadbeef 2019-02-07 18:08:45 +03:00 committed by Vitaly Baranov
parent 2556a96e9e
commit f236ec93be
10 changed files with 156 additions and 17 deletions

View File

@ -53,4 +53,91 @@ AIOContext::~AIOContext()
io_destroy(ctx);
}
#elif defined(__FreeBSD__)
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <aio.h>
#include <IO/AIO.h>
/** Small wrappers for asynchronous I/O.
*/
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IOSETUP;
}
}
int io_setup(void)
{
return kqueue();
}
int io_destroy(int ctx)
{
return close(ctx);
}
int io_submit(int ctx, long nr, struct iocb * iocbpp[])
{
long i;
int r;
struct sigevent *se;
struct aiocb *iocb;
for (i = 0; i < nr; i ++) {
iocb = &iocbpp[i]->aio;
se = &iocb->aio_sigevent;
se->sigev_notify_kqueue = ctx;
se->sigev_notify_kevent_flags = 0;
se->sigev_notify = SIGEV_KEVENT;
se->sigev_value.sival_ptr = iocbpp[i];
switch(iocb->aio_lio_opcode) {
case LIO_READ:
r = aio_read(iocb);
break;
case LIO_WRITE:
r = aio_write(iocb);
break;
default: break;
}
if (r < 0) {
return r;
}
}
return i;
}
int io_getevents(int ctx, long min_nr, long max_nr, struct kevent * events, struct timespec * timeout)
{
min_nr = 0;
return kevent(ctx, NULL, 0, events, max_nr, timeout);
}
AIOContext::AIOContext(unsigned int nr_events)
{
nr_events = 0;
ctx = io_setup();
if (ctx < 0)
DB::throwFromErrno("io_setup failed", DB::ErrorCodes::CANNOT_IOSETUP);
}
AIOContext::~AIOContext()
{
io_destroy(ctx);
}
#endif

View File

@ -39,4 +39,38 @@ struct AIOContext : private boost::noncopyable
~AIOContext();
};
#elif defined(__FreeBSD__)
#include <boost/noncopyable.hpp>
#include <aio.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
typedef struct kevent io_event;
typedef int aio_context_t;
struct iocb {
struct aiocb aio;
long aio_data;
};
int io_setup(void);
int io_destroy(void);
/// last argument is an array of pointers technically speaking
int io_submit(int ctx, long nr, struct iocb * iocbpp[]);
int io_getevents(int ctx, long min_nr, long max_nr, struct kevent * events, struct timespec * timeout);
struct AIOContext : private boost::noncopyable
{
int ctx;
AIOContext(unsigned int nr_events = 128);
~AIOContext();
};
#endif

View File

@ -1,4 +1,4 @@
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <Common/Exception.h>
#include <common/logger_useful.h>
@ -94,7 +94,11 @@ void AIOContextPool::fulfillPromises(const io_event events[], const int num_even
continue;
}
#if defined(__FreeBSD__)
it->second.set_value(aio_return((struct aiocb *)event.udata));
#else
it->second.set_value(event.res);
#endif
promises.erase(it);
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <ext/singleton.h>
#include <condition_variable>

View File

@ -1,4 +1,4 @@
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/ReadBufferAIO.h>
#include <IO/AIOContextPool.h>
@ -120,11 +120,16 @@ bool ReadBufferAIO::nextImpl()
/// Create an asynchronous request.
prepare();
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_READ;
request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
#else
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
request.aio_nbytes = region_aligned_size;
request.aio_offset = region_aligned_begin;
#endif
request.aio.aio_fildes = fd;
request.aio.aio_nbytes = region_aligned_size;
request.aio.aio_offset = region_aligned_begin;
/// Send the request.
try

View File

@ -1,6 +1,6 @@
#pragma once
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBuffer.h>

View File

@ -1,4 +1,4 @@
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/WriteBufferAIO.h>
#include <Common/ProfileEvents.h>
@ -110,11 +110,16 @@ void WriteBufferAIO::nextImpl()
/// Create a request for asynchronous write.
prepare();
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_WRITE;
request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
#else
request.aio_lio_opcode = IOCB_CMD_PWRITE;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
request.aio_nbytes = region_aligned_size;
request.aio_offset = region_aligned_begin;
#endif
request.aio.aio_fildes = fd;
request.aio.aio_nbytes = region_aligned_size;
request.aio.aio_offset = region_aligned_begin;
/// Send the request.
while (io_submit(aio_context.ctx, 1, &request_ptr) < 0)
@ -193,7 +198,11 @@ bool WriteBufferAIO::waitForAIOCompletion()
}
is_pending_write = false;
#if defined(__FreeBSD__)
bytes_written = aio_return((struct aiocb *)event.udata);
#else
bytes_written = event.res;
#endif
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
@ -396,7 +405,7 @@ void WriteBufferAIO::finalize()
bytes_written -= truncation_count;
off_t pos_offset = bytes_written - (pos_in_file - request.aio_offset);
off_t pos_offset = bytes_written - (pos_in_file - request.aio.aio_offset);
if (pos_in_file > (std::numeric_limits<off_t>::max() - pos_offset))
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
pos_in_file += pos_offset;

View File

@ -1,6 +1,6 @@
#pragma once
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>

View File

@ -1,6 +1,6 @@
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/ReadBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -31,7 +31,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
}
else
{
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else

View File

@ -1,6 +1,6 @@
#include <IO/createWriteBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/WriteBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -33,7 +33,7 @@ std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(const std
}
else
{
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
#else