Merge pull request #4305 from vitlibar/add-freebsd-aio-support

Add FreeBSD AIO support.
This commit is contained in:
alexey-milovidov 2019-02-09 01:06:41 +03:00 committed by GitHub
commit e7541f03dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 162 additions and 11 deletions

View File

@ -53,4 +53,90 @@ AIOContext::~AIOContext()
io_destroy(ctx);
}
#elif defined(__FreeBSD__)
# include <aio.h>
# include <boost/noncopyable.hpp>
# include <sys/event.h>
# include <sys/time.h>
# include <sys/types.h>
# include <Common/Exception.h>
# include <IO/AIO.h>
/** Small wrappers for asynchronous I/O.
*/
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IOSETUP;
}
}
int io_setup(void)
{
return kqueue();
}
int io_destroy(int ctx)
{
return close(ctx);
}
int io_submit(int ctx, long nr, struct iocb * iocbpp[])
{
for (long i = 0; i < nr; i++)
{
struct aiocb * iocb = &iocbpp[i]->aio;
struct sigevent * se = &iocb->aio_sigevent;
se->sigev_notify_kqueue = ctx;
se->sigev_notify_kevent_flags = 0;
se->sigev_notify = SIGEV_KEVENT;
se->sigev_value.sival_ptr = iocbpp[i];
switch (iocb->aio_lio_opcode)
{
case LIO_READ:
{
int r = aio_read(iocb);
if (r < 0)
return r;
break;
}
case LIO_WRITE:
{
int r = aio_write(iocb);
if (r < 0)
return r;
break;
}
}
}
return nr;
}
int io_getevents(int ctx, long, long max_nr, struct kevent * events, struct timespec * timeout)
{
return kevent(ctx, NULL, 0, events, max_nr, timeout);
}
AIOContext::AIOContext(unsigned int)
{
ctx = io_setup();
if (ctx < 0)
DB::throwFromErrno("io_setup failed", DB::ErrorCodes::CANNOT_IOSETUP);
}
AIOContext::~AIOContext()
{
io_destroy(ctx);
}
#endif

View File

@ -39,4 +39,39 @@ struct AIOContext : private boost::noncopyable
~AIOContext();
};
#elif defined(__FreeBSD__)
#include <boost/noncopyable.hpp>
#include <aio.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
typedef struct kevent io_event;
typedef int aio_context_t;
struct iocb
{
struct aiocb aio;
long aio_data;
};
int io_setup(void);
int io_destroy(void);
/// last argument is an array of pointers technically speaking
int io_submit(int ctx, long nr, struct iocb * iocbpp[]);
int io_getevents(int ctx, long min_nr, long max_nr, struct kevent * events, struct timespec * timeout);
struct AIOContext : private boost::noncopyable
{
int ctx;
AIOContext(unsigned int nr_events = 128);
~AIOContext();
};
#endif

View File

@ -1,4 +1,4 @@
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <Common/Exception.h>
#include <common/logger_useful.h>
@ -94,7 +94,11 @@ void AIOContextPool::fulfillPromises(const io_event events[], const int num_even
continue;
}
#if defined(__FreeBSD__)
it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata)));
#else
it->second.set_value(event.res);
#endif
promises.erase(it);
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <ext/singleton.h>
#include <condition_variable>

View File

@ -1,4 +1,4 @@
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/ReadBufferAIO.h>
#include <IO/AIOContextPool.h>
@ -120,11 +120,19 @@ bool ReadBufferAIO::nextImpl()
/// Create an asynchronous request.
prepare();
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_READ;
request.aio.aio_fildes = fd;
request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
request.aio.aio_nbytes = region_aligned_size;
request.aio.aio_offset = region_aligned_begin;
#else
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
request.aio_nbytes = region_aligned_size;
request.aio_offset = region_aligned_begin;
#endif
/// Send the request.
try

View File

@ -1,6 +1,6 @@
#pragma once
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBuffer.h>

View File

@ -1,4 +1,4 @@
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/WriteBufferAIO.h>
#include <Common/ProfileEvents.h>
@ -110,11 +110,19 @@ void WriteBufferAIO::nextImpl()
/// Create a request for asynchronous write.
prepare();
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_WRITE;
request.aio.aio_fildes = fd;
request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
request.aio.aio_nbytes = region_aligned_size;
request.aio.aio_offset = region_aligned_begin;
#else
request.aio_lio_opcode = IOCB_CMD_PWRITE;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
request.aio_nbytes = region_aligned_size;
request.aio_offset = region_aligned_begin;
#endif
/// Send the request.
while (io_submit(aio_context.ctx, 1, &request_ptr) < 0)
@ -193,7 +201,11 @@ bool WriteBufferAIO::waitForAIOCompletion()
}
is_pending_write = false;
#if defined(__FreeBSD__)
bytes_written = aio_return(reinterpret_cast<struct aiocb *>(event.udata));
#else
bytes_written = event.res;
#endif
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
@ -396,7 +408,13 @@ void WriteBufferAIO::finalize()
bytes_written -= truncation_count;
off_t pos_offset = bytes_written - (pos_in_file - request.aio_offset);
#if defined(__FreeBSD__)
off_t aio_offset = request.aio.aio_offset;
#else
off_t aio_offset = request.aio_offset;
#endif
off_t pos_offset = bytes_written - (pos_in_file - aio_offset);
if (pos_in_file > (std::numeric_limits<off_t>::max() - pos_offset))
throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
pos_in_file += pos_offset;

View File

@ -1,6 +1,6 @@
#pragma once
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>

View File

@ -1,6 +1,6 @@
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/ReadBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -31,7 +31,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
}
else
{
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else

View File

@ -1,6 +1,6 @@
#include <IO/createWriteBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <IO/WriteBufferAIO.h>
#endif
#include <Common/ProfileEvents.h>
@ -33,7 +33,7 @@ std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(const std
}
else
{
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
#else