From 00e20c7e3c28a7dfcd7d49857cf02a0d2b311b9a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 28 Jul 2021 09:08:29 +0300 Subject: [PATCH] Remove useless code --- src/IO/AIO.h | 13 ++- src/IO/AIOContextPool.cpp | 172 ------------------------------------ src/IO/AIOContextPool.h | 53 ----------- src/IO/AsynchronousReader.h | 4 +- src/IO/ya.make | 1 - 5 files changed, 14 insertions(+), 229 deletions(-) delete mode 100644 src/IO/AIOContextPool.cpp delete mode 100644 src/IO/AIOContextPool.h diff --git a/src/IO/AIO.h b/src/IO/AIO.h index 499d1f3bf60..40c0f97c416 100644 --- a/src/IO/AIO.h +++ b/src/IO/AIO.h @@ -33,10 +33,21 @@ int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event * events, struct AIOContext : private boost::noncopyable { - aio_context_t ctx; + aio_context_t ctx = 0; + AIOContext() {} AIOContext(unsigned int nr_events = 128); ~AIOContext(); + + AIOContext(AIOContext && rhs) + { + *this = std::move(rhs); + } + + AIOContext & operator=(AIOContext && rhs) + { + std::swap(ctx, rhs.ctx); + } }; #elif defined(OS_FREEBSD) diff --git a/src/IO/AIOContextPool.cpp b/src/IO/AIOContextPool.cpp deleted file mode 100644 index 99fdfc67d97..00000000000 --- a/src/IO/AIOContextPool.cpp +++ /dev/null @@ -1,172 +0,0 @@ -#if defined(OS_LINUX) || defined(__FreeBSD__) - -#include -#include -#include -#include -#include -#include - -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int CANNOT_IO_SUBMIT; - extern const int CANNOT_IO_GETEVENTS; -} - - -AIOContextPool::~AIOContextPool() -{ - cancelled.store(true, std::memory_order_relaxed); - io_completion_monitor.join(); -} - - -void AIOContextPool::doMonitor() -{ - /// continue checking for events unless cancelled - while (!cancelled.load(std::memory_order_relaxed)) - waitForCompletion(); - - /// wait until all requests have been completed - while (!promises.empty()) - waitForCompletion(); -} - - -void AIOContextPool::waitForCompletion() -{ - /// array to hold completion events - std::vector events(max_concurrent_events); - - try - { - const auto num_events = getCompletionEvents(events.data(), max_concurrent_events); - fulfillPromises(events.data(), num_events); - notifyProducers(num_events); - } - catch (...) - { - /// there was an error, log it, return to any producer and continue - reportExceptionToAnyProducer(); - tryLogCurrentException("AIOContextPool::waitForCompletion()"); - } -} - - -int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) const -{ - timespec timeout{timeout_sec, 0}; - - auto num_events = 0; - - /// request 1 to `max_events` events - while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) - if (errno != EINTR) - throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno); - - /// Unpoison the memory returned from a non-instrumented system call. - __msan_unpoison(events, sizeof(*events) * num_events); - - return num_events; -} - - -void AIOContextPool::fulfillPromises(const io_event events[], const int num_events) -{ - if (num_events == 0) - return; - - const std::lock_guard lock{mutex}; - - /// look at returned events and find corresponding promise, set result and erase promise from map - for (const auto & event : boost::make_iterator_range(events, events + num_events)) - { - /// get id from event -#if defined(__FreeBSD__) - const auto completed_id = (reinterpret_cast(event.udata))->aio_data; -#else - const auto completed_id = event.data; -#endif - - /// set value via promise and release it - const auto it = promises.find(completed_id); - if (it == std::end(promises)) - { - LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id {}", completed_id); - continue; - } - -#if defined(__FreeBSD__) - it->second.set_value(aio_return(reinterpret_cast(event.udata))); -#else - it->second.set_value(event.res); -#endif - promises.erase(it); - } -} - - -void AIOContextPool::notifyProducers(const int num_producers) const -{ - if (num_producers == 0) - return; - - if (num_producers > 1) - have_resources.notify_all(); - else - have_resources.notify_one(); -} - - -void AIOContextPool::reportExceptionToAnyProducer() -{ - const std::lock_guard lock{mutex}; - - const auto any_promise_it = std::begin(promises); - any_promise_it->second.set_exception(std::current_exception()); -} - - -std::future AIOContextPool::post(struct iocb & iocb) -{ - std::unique_lock lock{mutex}; - - /// get current id and increment it by one - const auto request_id = next_id; - ++next_id; - - /// create a promise and put request in "queue" - promises.emplace(request_id, std::promise{}); - /// store id in AIO request for further identification - iocb.aio_data = request_id; - - struct iocb * requests[] { &iocb }; - - /// submit a request - while (io_submit(aio_context.ctx, 1, requests) < 0) - { - if (errno == EAGAIN) - /// wait until at least one event has been completed (or a spurious wakeup) and try again - have_resources.wait(lock); - else if (errno != EINTR) - throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT); - } - - return promises[request_id].get_future(); -} - -AIOContextPool & AIOContextPool::instance() -{ - static AIOContextPool instance; - return instance; -} - -} - -#endif diff --git a/src/IO/AIOContextPool.h b/src/IO/AIOContextPool.h deleted file mode 100644 index 9f4047939f4..00000000000 --- a/src/IO/AIOContextPool.h +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#if defined(OS_LINUX) || defined(__FreeBSD__) - -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - -class AIOContextPool : private boost::noncopyable -{ - static const auto max_concurrent_events = 128; - static const auto timeout_sec = 1; - - AIOContext aio_context{max_concurrent_events}; - - using ID = size_t; - using BytesRead = ssize_t; - - /// Autoincremental id used to identify completed requests - ID next_id{}; - mutable std::mutex mutex; - mutable std::condition_variable have_resources; - std::map> promises; - - std::atomic cancelled{false}; - ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this}; - - ~AIOContextPool(); - - void doMonitor(); - void waitForCompletion(); - int getCompletionEvents(io_event events[], const int max_events) const; - void fulfillPromises(const io_event events[], const int num_events); - void notifyProducers(const int num_producers) const; - void reportExceptionToAnyProducer(); - -public: - static AIOContextPool & instance(); - - /// Request AIO read operation for iocb, returns a future with number of bytes read - std::future post(struct iocb & iocb); -}; - -} - -#endif diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index a78ad46de24..ad7286ca1c9 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -52,14 +52,14 @@ public: /// Less than requested amount of data can be returned. /// Also error can be returned in 'exception'. /// If no error, and the size is zero - the file has ended. - /// (for example, EINTR must be handled automatically) + /// (for example, EINTR must be handled by implementation automatically) struct Result { size_t size = 0; std::exception_ptr exception; }; - /// The methods 'submit' and 'wait' can be called concurrently from multiple threads + /// The methods 'submit' and 'wait' both can be called concurrently from multiple threads /// but only for different requests. /// Submit request and obtain a handle. This method don't perform any waits. diff --git a/src/IO/ya.make b/src/IO/ya.make index 0023c6d7dfe..057ab69ac0c 100644 --- a/src/IO/ya.make +++ b/src/IO/ya.make @@ -20,7 +20,6 @@ PEERDIR( SRCS( AIO.cpp - AIOContextPool.cpp BrotliReadBuffer.cpp BrotliWriteBuffer.cpp CascadeWriteBuffer.cpp