From 91173b8934b7315aef4352d4786e615028fc1ffb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 4 Aug 2021 03:07:04 +0300 Subject: [PATCH] Improve performance --- ...ynchronousReadBufferFromFileDescriptor.cpp | 44 +++--- ...AsynchronousReadBufferFromFileDescriptor.h | 2 +- src/IO/AsynchronousReader.h | 26 +--- src/IO/ReadBufferFromFileDescriptor.cpp | 7 +- src/IO/SynchronousReader.h | 65 +++------ src/IO/ThreadPoolReader.h | 136 +++++------------- src/IO/createReadBufferFromFileBase.cpp | 1 - 7 files changed, 81 insertions(+), 200 deletions(-) diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index 27c0e9b5c8a..5702d21a9bb 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -26,7 +26,7 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const void AsynchronousReadBufferFromFileDescriptor::prefetch() { - if (prefetch_request_id) + if (prefetch_future.valid()) return; /// Will request the same amount of data that is read in nextImpl. @@ -38,28 +38,25 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch() request.size = prefetch_buffer.size(); request.offset = file_offset_of_buffer_end; - prefetch_request_id = reader->submit(request); + prefetch_future = reader->submit(request); } bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { - if (!prefetch_request_id) + if (!prefetch_future.valid()) prefetch(); - auto response = reader->wait(*prefetch_request_id, {}); - prefetch_request_id.reset(); + auto size = prefetch_future.get(); + prefetch_future = {}; - if (response->exception) - std::rethrow_exception(response->exception); + file_offset_of_buffer_end += size; - file_offset_of_buffer_end += response->size; - - if (response->size) + if (size) { prefetch_buffer.swap(memory); set(memory.data(), memory.size()); - working_buffer.resize(response->size); + working_buffer.resize(size); return true; } @@ -69,10 +66,10 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() void AsynchronousReadBufferFromFileDescriptor::finalize() { - if (prefetch_request_id) + if (prefetch_future.valid()) { - reader->wait(*prefetch_request_id, {}); - prefetch_request_id.reset(); + prefetch_future.wait(); + prefetch_future = {}; } } @@ -105,26 +102,25 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence) if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) return new_pos; - /// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos, - /// so the second inequality is strict. if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) - && new_pos < file_offset_of_buffer_end) + && new_pos <= file_offset_of_buffer_end) { /// Position is still inside the buffer. + /// Probably it is at the end of the buffer - then we will load data on the following 'next' call. pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; assert(pos >= working_buffer.begin()); - assert(pos < working_buffer.end()); + assert(pos <= working_buffer.end()); return new_pos; } else { - if (prefetch_request_id) + if (prefetch_future.valid()) { std::cerr << "Ignoring prefetched data" << "\n"; - reader->wait(*prefetch_request_id, {}); - prefetch_request_id.reset(); + prefetch_future.wait(); + prefetch_future = {}; } /// Position is out of the buffer, we need to do real seek. @@ -151,10 +147,10 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence) void AsynchronousReadBufferFromFileDescriptor::rewind() { - if (prefetch_request_id) + if (prefetch_future.valid()) { - reader->wait(*prefetch_request_id, {}); - prefetch_request_id.reset(); + prefetch_future.wait(); + prefetch_future = {}; } /// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'. diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.h b/src/IO/AsynchronousReadBufferFromFileDescriptor.h index 2a0d531c824..30a9022855d 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.h +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.h @@ -19,7 +19,7 @@ protected: AsynchronousReaderPtr reader; Memory<> prefetch_buffer; - std::optional prefetch_request_id; + std::future prefetch_future; const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned. size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index ad7286ca1c9..b362daa108f 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -20,8 +21,6 @@ namespace DB class IAsynchronousReader { public: - using RequestID = UInt64; - /// For local filesystems, the file descriptor is simply integer /// but it can be arbitrary opaque object for remote filesystems. struct IFileDescriptor @@ -50,30 +49,15 @@ public: }; /// Less than requested amount of data can be returned. - /// Also error can be returned in 'exception'. - /// If no error, and the size is zero - the file has ended. + /// If size is zero - the file has ended. /// (for example, EINTR must be handled by implementation automatically) - struct Result - { - size_t size = 0; - std::exception_ptr exception; - }; - - /// The methods 'submit' and 'wait' both can be called concurrently from multiple threads - /// but only for different requests. + using Result = size_t; /// Submit request and obtain a handle. This method don't perform any waits. /// If this method did not throw, the caller must wait for the result with 'wait' method /// or destroy the whole reader before destroying the buffer for request. - virtual RequestID submit(Request request) = 0; - - /// Wait for request completion or timeout. - /// Optional timeout can be specified, otherwise waits until completion. - /// In case of timeout, nullopt is returned. - /// In case of completion, Result object is returned. Result may contain exception. - /// In case of timeout, the caller must call wait again until completion - /// or destroy the whole reader before destroying the buffer for request. - virtual std::optional wait(RequestID id, std::optional microseconds) = 0; + /// The method can be called concurrently from multiple threads. + virtual std::future submit(Request request) = 0; /// Destructor must wait for all not completed request and ignore the results. /// It may also cancel the requests. diff --git a/src/IO/ReadBufferFromFileDescriptor.cpp b/src/IO/ReadBufferFromFileDescriptor.cpp index 78eca3de884..2685ea9ff78 100644 --- a/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/src/IO/ReadBufferFromFileDescriptor.cpp @@ -143,16 +143,15 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence) if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) return new_pos; - /// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos, - /// so the second inequality is strict. if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) - && new_pos < file_offset_of_buffer_end) + && new_pos <= file_offset_of_buffer_end) { /// Position is still inside the buffer. + /// Probably it is at the end of the buffer - then we will load data on the following 'next' call. pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; assert(pos >= working_buffer.begin()); - assert(pos < working_buffer.end()); + assert(pos <= working_buffer.end()); return new_pos; } diff --git a/src/IO/SynchronousReader.h b/src/IO/SynchronousReader.h index b4d2c6458b2..769ffc7fd54 100644 --- a/src/IO/SynchronousReader.h +++ b/src/IO/SynchronousReader.h @@ -25,13 +25,8 @@ namespace ErrorCodes */ class SynchronousReader final : public IAsynchronousReader { -private: - UInt64 counter = 0; - std::unordered_map requests; - std::mutex mutex; - public: - RequestID submit(Request request) override + std::future submit(Request request) override { #if defined(POSIX_FADV_WILLNEED) int fd = assert_cast(*request.descriptor).fd; @@ -39,54 +34,26 @@ public: throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE); #endif - std::lock_guard lock(mutex); - ++counter; - requests.emplace(counter, request); - return counter; - } - - /// Timeout is not implemented. - std::optional wait(RequestID id, std::optional) override - { - Request request; - Result result; - + return std::async(std::launch::deferred, [fd, request] { - std::lock_guard lock(mutex); - auto it = requests.find(id); - if (it == requests.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find request by id {}", id); + /// TODO Instrumentation. - request = it->second; - requests.erase(it); - } - - int fd = assert_cast(*request.descriptor).fd; - - /// TODO Instrumentation. - - size_t bytes_read = 0; - while (!bytes_read) - { - ssize_t res = ::pread(fd, request.buf, request.size, request.offset); - if (!res) - break; - - if (-1 == res && errno != EINTR) + size_t bytes_read = 0; + while (!bytes_read) { - result.exception = std::make_exception_ptr(ErrnoException( - fmt::format("Cannot read from file {}, {}", fd, - errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)), - ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)); - return result; + ssize_t res = ::pread(fd, request.buf, request.size, request.offset); + if (!res) + break; + + if (-1 == res && errno != EINTR) + throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + + if (res > 0) + bytes_read += res; } - if (res > 0) - bytes_read += res; - } - - result.size = bytes_read; - return result; + return bytes_read; + }); } ~SynchronousReader() override diff --git a/src/IO/ThreadPoolReader.h b/src/IO/ThreadPoolReader.h index 4858ee25ecf..71df96501f4 100644 --- a/src/IO/ThreadPoolReader.h +++ b/src/IO/ThreadPoolReader.h @@ -7,8 +7,7 @@ #include #include #include -#include -#include +#include #include #include @@ -60,46 +59,18 @@ namespace ErrorCodes class ThreadPoolReader final : public IAsynchronousReader { private: - UInt64 counter = 0; - - struct RequestInfo - { - bool already_read = false; - Poco::Event event; - Result result; - }; - - using Requests = std::unordered_map; - Requests requests; - std::mutex mutex; - ThreadPool pool; - size_t queue_size; public: ThreadPoolReader(size_t pool_size, size_t queue_size_) - : pool(pool_size, pool_size, queue_size_), queue_size(queue_size_) + : pool(pool_size, pool_size, queue_size_) { } - RequestID submit(Request request) override + std::future submit(Request request) override { - Requests::iterator it; - - { - std::lock_guard lock(mutex); - - if (requests.size() >= queue_size) - throw Exception("Too many read requests in flight", ErrorCodes::CANNOT_SCHEDULE_TASK); - - ++counter; - it = requests.emplace(std::piecewise_construct, std::forward_as_tuple(counter), std::forward_as_tuple()).first; - } - int fd = assert_cast(*request.descriptor).fd; - RequestInfo & info = it->second; - #if defined(__linux__) /// Check if data is already in page cache with preadv2 syscall. @@ -110,6 +81,9 @@ public: if (has_pread_nowait_support.load(std::memory_order_relaxed)) { + std::promise promise; + std::future future = promise.get_future(); + size_t bytes_read = 0; while (!bytes_read) { @@ -120,10 +94,12 @@ public: static_cast(request.offset), static_cast(request.offset >> 32), RWF_NOWAIT); + //ssize_t res = ::pread(fd, request.buf, request.size, request.offset); + if (!res) { - info.already_read = true; - break; + promise.set_value(0); + return future; } if (-1 == res) @@ -144,92 +120,52 @@ public: } else { - info.already_read = true; - info.result.exception = std::make_exception_ptr(ErrnoException( + promise.set_exception(std::make_exception_ptr(ErrnoException( fmt::format("Cannot read from file {}, {}", fd, errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)), - ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)); - break; + ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno))); + return future; } } else { bytes_read += res; - info.already_read = true; } } - info.result.size = bytes_read; + if (bytes_read) + { + promise.set_value(bytes_read); + return future; + } } #endif - if (!info.already_read) + auto task = std::make_shared>([request, fd] { - pool.scheduleOrThrow([request, fd, &info] - { - setThreadName("ThreadPoolRead"); + setThreadName("ThreadPoolRead"); - /// TODO Instrumentation. + /// TODO Instrumentation. - size_t bytes_read = 0; - while (!bytes_read) - { - ssize_t res = ::pread(fd, request.buf, request.size, request.offset); - if (!res) - break; - - if (-1 == res && errno != EINTR) - { - info.result.exception = std::make_exception_ptr(ErrnoException( - fmt::format("Cannot read from file {}, {}", fd, - errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)), - ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)); - break; - } - - bytes_read += res; - } - - info.result.size = bytes_read; - info.event.set(); - }, - request.priority); - } - - return it->first; - } - - std::optional wait(RequestID id, std::optional microseconds) override - { - Result result; - Requests::iterator it; - - { - std::lock_guard lock(mutex); - it = requests.find(id); - if (it == requests.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find request by id {}", id); - } - - if (!it->second.already_read) - { - if (microseconds) + size_t bytes_read = 0; + while (!bytes_read) { - if (!it->second.event.tryWait(*microseconds / 1000)) - return {}; + ssize_t res = ::pread(fd, request.buf, request.size, request.offset); + if (!res) + break; + + if (-1 == res && errno != EINTR) + throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + + bytes_read += res; } - else - it->second.event.wait(); - } - Result res = it->second.result; + return bytes_read; + }); - { - std::lock_guard lock(mutex); - requests.erase(it); - } - - return res; + auto future = task->get_future(); + pool.scheduleOrThrow([task]{ (*task)(); }, request.priority); + return future; } ~ThreadPoolReader() override diff --git a/src/IO/createReadBufferFromFileBase.cpp b/src/IO/createReadBufferFromFileBase.cpp index b793bcd0b1a..a2b7c1f4d84 100644 --- a/src/IO/createReadBufferFromFileBase.cpp +++ b/src/IO/createReadBufferFromFileBase.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include