From 5243315227f956aa2fb175e573698c3b6941ab19 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 25 Aug 2021 03:13:05 +0300 Subject: [PATCH] Improve performance in case without prefetch by avoiding swapping of memory buffers --- ...ynchronousReadBufferFromFileDescriptor.cpp | 70 ++++++++++++------- ...AsynchronousReadBufferFromFileDescriptor.h | 5 +- src/IO/ThreadPoolReader.h | 4 +- 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index 53a1056897e..4bffa342599 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -24,6 +24,19 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const } +std::future AsynchronousReadBufferFromFileDescriptor::readInto(char * data, size_t size) +{ + IAsynchronousReader::Request request; + request.descriptor = std::make_shared(fd); + request.buf = data; + request.size = size; + request.offset = file_offset_of_buffer_end; + request.priority = priority; + + return reader->submit(request); +} + + void AsynchronousReadBufferFromFileDescriptor::prefetch() { if (prefetch_future.valid()) @@ -31,37 +44,46 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch() /// Will request the same amount of data that is read in nextImpl. prefetch_buffer.resize(internal_buffer.size()); - - IAsynchronousReader::Request request; - request.descriptor = std::make_shared(fd); - request.buf = prefetch_buffer.data(); - request.size = prefetch_buffer.size(); - request.offset = file_offset_of_buffer_end; - request.priority = priority; - - prefetch_future = reader->submit(request); + prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); } bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { - if (!prefetch_future.valid()) - prefetch(); - - auto size = prefetch_future.get(); - prefetch_future = {}; - - file_offset_of_buffer_end += size; - - if (size) + if (prefetch_future.valid()) { - prefetch_buffer.swap(memory); - set(memory.data(), memory.size()); - working_buffer.resize(size); - return true; - } + /// Read request already in flight. Wait for its completion. - return false; + auto size = prefetch_future.get(); + prefetch_future = {}; + file_offset_of_buffer_end += size; + + if (size) + { + prefetch_buffer.swap(memory); + set(memory.data(), memory.size()); + working_buffer.resize(size); + return true; + } + + return false; + } + else + { + /// No pending request. Do synchronous read. + + auto size = readInto(memory.data(), memory.size()).get(); + file_offset_of_buffer_end += size; + + if (size) + { + set(memory.data(), memory.size()); + working_buffer.resize(size); + return true; + } + + return false; + } } diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.h b/src/IO/AsynchronousReadBufferFromFileDescriptor.h index 8081348baa4..c64341089d0 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.h +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.h @@ -38,7 +38,7 @@ public: AsynchronousReaderPtr reader_, Int32 priority_, int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) : ReadBufferFromFileBase(buf_size, existing_memory, alignment), - reader(std::move(reader_)), priority(priority_), prefetch_buffer(buf_size, alignment), required_alignment(alignment), fd(fd_) + reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_) { } @@ -61,6 +61,9 @@ public: /// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read. void rewind(); + +private: + std::future readInto(char * data, size_t size); }; } diff --git a/src/IO/ThreadPoolReader.h b/src/IO/ThreadPoolReader.h index a0ce47d08af..eea72462837 100644 --- a/src/IO/ThreadPoolReader.h +++ b/src/IO/ThreadPoolReader.h @@ -110,7 +110,7 @@ public: else if (errno == EAGAIN) { /// Data is not available. - std::cerr << "miss\n"; + //std::cerr << "miss\n"; break; } else if (errno == EINTR) @@ -134,7 +134,7 @@ public: if (bytes_read) { - std::cerr << "hit\n"; + //std::cerr << "hit\n"; promise.set_value(bytes_read); return future; }