Remove useless code

This commit is contained in:
Alexey Milovidov 2021-07-28 09:08:29 +03:00
parent e841996486
commit 00e20c7e3c
5 changed files with 14 additions and 229 deletions

View File

@ -33,10 +33,21 @@ int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event * events,
struct AIOContext : private boost::noncopyable
{
aio_context_t ctx;
aio_context_t ctx = 0;
AIOContext() {}
AIOContext(unsigned int nr_events = 128);
~AIOContext();
AIOContext(AIOContext && rhs)
{
*this = std::move(rhs);
}
AIOContext & operator=(AIOContext && rhs)
{
std::swap(ctx, rhs.ctx);
}
};
#elif defined(OS_FREEBSD)

View File

@ -1,172 +0,0 @@
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <Common/MemorySanitizer.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp>
#include <errno.h>
#include <IO/AIOContextPool.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
}
AIOContextPool::~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
io_completion_monitor.join();
}
void AIOContextPool::doMonitor()
{
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void AIOContextPool::waitForCompletion()
{
/// array to hold completion events
std::vector<io_event> events(max_concurrent_events);
try
{
const auto num_events = getCompletionEvents(events.data(), max_concurrent_events);
fulfillPromises(events.data(), num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::waitForCompletion()");
}
}
int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) const
{
timespec timeout{timeout_sec, 0};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno);
/// Unpoison the memory returned from a non-instrumented system call.
__msan_unpoison(events, sizeof(*events) * num_events);
return num_events;
}
void AIOContextPool::fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
#if defined(__FreeBSD__)
const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data;
#else
const auto completed_id = event.data;
#endif
/// set value via promise and release it
const auto it = promises.find(completed_id);
if (it == std::end(promises))
{
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id {}", completed_id);
continue;
}
#if defined(__FreeBSD__)
it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata)));
#else
it->second.set_value(event.res);
#endif
promises.erase(it);
}
}
void AIOContextPool::notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void AIOContextPool::reportExceptionToAnyProducer()
{
const std::lock_guard lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb)
{
std::unique_lock lock{mutex};
/// get current id and increment it by one
const auto request_id = next_id;
++next_id;
/// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<BytesRead>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
struct iocb * requests[] { &iocb };
/// submit a request
while (io_submit(aio_context.ctx, 1, requests) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
return promises[request_id].get_future();
}
AIOContextPool & AIOContextPool::instance()
{
static AIOContextPool instance;
return instance;
}
}
#endif

View File

@ -1,53 +0,0 @@
#pragma once
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <condition_variable>
#include <future>
#include <mutex>
#include <map>
#include <IO/AIO.h>
#include <Common/ThreadPool.h>
namespace DB
{
class AIOContextPool : private boost::noncopyable
{
static const auto max_concurrent_events = 128;
static const auto timeout_sec = 1;
AIOContext aio_context{max_concurrent_events};
using ID = size_t;
using BytesRead = ssize_t;
/// Autoincremental id used to identify completed requests
ID next_id{};
mutable std::mutex mutex;
mutable std::condition_variable have_resources;
std::map<ID, std::promise<BytesRead>> promises;
std::atomic<bool> cancelled{false};
ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool();
void doMonitor();
void waitForCompletion();
int getCompletionEvents(io_event events[], const int max_events) const;
void fulfillPromises(const io_event events[], const int num_events);
void notifyProducers(const int num_producers) const;
void reportExceptionToAnyProducer();
public:
static AIOContextPool & instance();
/// Request AIO read operation for iocb, returns a future with number of bytes read
std::future<BytesRead> post(struct iocb & iocb);
};
}
#endif

View File

@ -52,14 +52,14 @@ public:
/// Less than requested amount of data can be returned.
/// Also error can be returned in 'exception'.
/// If no error, and the size is zero - the file has ended.
/// (for example, EINTR must be handled automatically)
/// (for example, EINTR must be handled by implementation automatically)
struct Result
{
size_t size = 0;
std::exception_ptr exception;
};
/// The methods 'submit' and 'wait' can be called concurrently from multiple threads
/// The methods 'submit' and 'wait' both can be called concurrently from multiple threads
/// but only for different requests.
/// Submit request and obtain a handle. This method don't perform any waits.

View File

@ -20,7 +20,6 @@ PEERDIR(
SRCS(
AIO.cpp
AIOContextPool.cpp
BrotliReadBuffer.cpp
BrotliWriteBuffer.cpp
CascadeWriteBuffer.cpp