diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index b2be45471c8..0d1e32bb2b5 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -26,6 +26,7 @@ namespace DB namespace ErrorCodes { extern const int ARGUMENT_OUT_OF_BOUND; + extern const int LOGICAL_ERROR; } @@ -53,14 +54,52 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch() if (prefetch_future.valid()) return; - /// Will request the same amount of data that is read in nextImpl. - prefetch_buffer.resize(internal_buffer.size()); + size_t read_size; + if (read_until_position) + { + /// Everything is already read. + if (file_offset_of_buffer_end == *read_until_position) + return; + + if (file_offset_of_buffer_end > *read_until_position) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})", + file_offset_of_buffer_end, *read_until_position); + + /// Read range [file_offset_of_buffer_end, read_until_position). + read_size = *read_until_position - file_offset_of_buffer_end - 1; + } + else + { + read_size = internal_buffer.size(); + } + + prefetch_buffer.resize(read_size); prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); } +void AsynchronousReadBufferFromFileDescriptor::setReadUntilPosition(size_t position) +{ + if (prefetch_future.valid()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition"); + + read_until_position = position; +} + + bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { + if (read_until_position) + { + /// Everything is already read. + if (file_offset_of_buffer_end == *read_until_position) + return false; + + if (file_offset_of_buffer_end > *read_until_position) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})", + file_offset_of_buffer_end, *read_until_position); + } + if (prefetch_future.valid()) { /// Read request already in flight. Wait for its completion. @@ -201,4 +240,3 @@ void AsynchronousReadBufferFromFileDescriptor::rewind() } } - diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.h b/src/IO/AsynchronousReadBufferFromFileDescriptor.h index c64341089d0..271518dd1eb 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.h +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.h @@ -24,6 +24,7 @@ protected: const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned. size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). + std::optional read_until_position; int fd; bool nextImpl() override; @@ -33,6 +34,8 @@ protected: void finalize(); + void setReadUntilPosition(size_t position) override; + public: AsynchronousReadBufferFromFileDescriptor( AsynchronousReaderPtr reader_, Int32 priority_, @@ -67,4 +70,3 @@ private: }; } - diff --git a/tests/config/install.sh b/tests/config/install.sh index ba6ba0cd07c..fd43a611f79 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -49,6 +49,7 @@ ln -sf $SRC_PATH/users.d/remote_queries.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/ +ln -sf $SRC_PATH/users.d/read_method.xml $DEST_SERVER_PATH/users.d/ # FIXME DataPartsExchange may hang for http_send_timeout seconds # when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"), diff --git a/tests/config/users.d/read_method.xml b/tests/config/users.d/read_method.xml new file mode 100644 index 00000000000..4247561e921 --- /dev/null +++ b/tests/config/users.d/read_method.xml @@ -0,0 +1,7 @@ + + + + pread_threadpool + + +