diff --git a/src/Disks/ReadBufferFromRemoteFSGather.cpp b/src/Disks/ReadBufferFromRemoteFSGather.cpp index 67ba4448d20..500d6b651b1 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/ReadBufferFromRemoteFSGather.cpp @@ -163,7 +163,6 @@ void ReadBufferFromRemoteFSGather::seek(off_t offset) { current_buf.reset(); absolute_position = offset; - // initialize(); } diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 9dd3aeb4625..907b75f0417 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -36,13 +36,13 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe Int32 priority_, std::shared_ptr impl_, size_t buf_size_, - size_t /* min_bytes_for_seek_ */) + size_t min_bytes_for_seek_) : ReadBufferFromFileBase(buf_size_, nullptr, 0) , reader(reader_) , priority(priority_) , impl(impl_) , prefetch_buffer(buf_size_) - // , min_bytes_for_seek(min_bytes_for_seek_) + , min_bytes_for_seek(min_bytes_for_seek_) { ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBuffers); buffer_events += impl->getFileName() + " : "; @@ -69,15 +69,23 @@ std::future AsynchronousReadIndirectBufferFromRemot void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() { - if (hasPendingData()) - return; + if (prefetch_future.valid()) + return; - if (prefetch_future.valid()) - return; + /// Everything is already read. + if (absolute_position == last_offset) + return; - prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); - ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); - buffer_events += "-- Prefetch (" + toString(absolute_position) + ") --"; + if (absolute_position > last_offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})", + absolute_position, last_offset); + + /// Prefetch even in case hasPendingData() == true. + prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); + ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); + + buffer_events += fmt::format("-- PREFETCH from offset: {}, upper bound: {} --", + toString(absolute_position), toString(last_offset)); } @@ -86,10 +94,15 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset) buffer_events += "-- Set last offset " + toString(offset) + "--"; if (prefetch_future.valid()) { - buffer_events += "-- Cancelling because of offset update --"; - ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches); - prefetch_future.wait(); - prefetch_future = {}; + std::cerr << buffer_events << std::endl; + /// TODO: Planning to put logical error here after more testing, + // because seems like future is never supposed to be valid at this point. + std::terminate(); + + // buffer_events += "-- Cancelling because of offset update --"; + // ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches); + // prefetch_future.wait(); + // prefetch_future = {}; } last_offset = offset; @@ -99,6 +112,14 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset) bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() { + /// Everything is already read. + if (absolute_position == last_offset) + return false; + + if (absolute_position > last_offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})", + absolute_position, last_offset); + ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBufferReads); size_t size = 0; @@ -138,6 +159,13 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() } prefetch_future = {}; + + /// TODO: it does not really seem to improve anything to call prefecth() here, + /// but it does not make any worse at the same time. + /// Need to test, it might be useful because in fact sometimes (minority of cases though) + /// we can read without prefetching several times in a row. + prefetch(); + return size; } @@ -192,16 +220,17 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence pos = working_buffer.end(); - // if (static_cast(absolute_position) >= getPosition() - // && static_cast(absolute_position) < getPosition() + static_cast(min_bytes_for_seek)) - // { - // /** - // * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. - // */ - // // bytes_to_ignore = absolute_position - getPosition(); - // impl->seek(absolute_position); /// SEEK_SET. - // } - // else + /// Note: we read in range [absolute_position, last_offset). + if (absolute_position < last_offset + && static_cast(absolute_position) >= getPosition() + && static_cast(absolute_position) < getPosition() + static_cast(min_bytes_for_seek)) + { + /** + * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. + */ + bytes_to_ignore = absolute_position - getPosition(); + } + else { buffer_events += "-- Impl seek --"; impl->seek(absolute_position); /// SEEK_SET. diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index af67efe1218..2dcd89016ed 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -70,10 +70,11 @@ private: String buffer_events; - // size_t min_bytes_for_seek; + size_t min_bytes_for_seek; size_t bytes_to_ignore = 0; - Int64 last_offset = 0; + + size_t last_offset = 0; }; } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 40bda7d6bfe..f01fd9a87a1 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -2,15 +2,17 @@ #if USE_AWS_S3 -# include -# include -# include +#include +#include +#include -# include -# include -# include +#include +#include -# include +#include +#include + +#include namespace ProfileEvents @@ -27,6 +29,7 @@ namespace ErrorCodes extern const int S3_ERROR; extern const int CANNOT_SEEK_THROUGH_FILE; extern const int SEEK_POSITION_OUT_OF_BOUND; + extern const int LOGICAL_ERROR; } @@ -49,19 +52,29 @@ bool ReadBufferFromS3::nextImpl() if (last_offset) { if (static_cast(last_offset) == offset) - { - impl.reset(); - working_buffer.resize(0); return false; - } + + if (static_cast(last_offset) < offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1); } bool next_result = false; - /// `impl` has been initialized earlier and now we're at the end of the current portion of data. if (impl) { - if (!use_external_buffer) + if (use_external_buffer) + { + /** + * use_external_buffer -- means we read into the buffer which + * was passed to us from somewhere else. We do not check whether + * previously returned buffer was read or not, because this branch + * means we are prefetching data. + */ + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + } + else { /** * use_external_buffer -- means we read into the buffer which @@ -74,32 +87,25 @@ bool ReadBufferFromS3::nextImpl() assert(!impl->hasPendingData()); } } - else - { - /// `impl` is not initialized and we're about to read the first portion of data. - impl = initialize(); - next_result = impl->hasPendingData(); - } - if (use_external_buffer) - { - /** - * use_external_buffer -- means we read into the buffer which - * was passed to us from somewhere else. We do not check whether - * previously returned buffer was read or not, because this branch - * means we are prefetching data. - */ - impl->set(internal_buffer.begin(), internal_buffer.size()); - assert(working_buffer.begin() != nullptr); - assert(!internal_buffer.empty()); - } - - auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100); + size_t sleep_time_with_backoff_milliseconds = 100; for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt) { Stopwatch watch; try { + if (!impl) + { + impl = initialize(); + + if (use_external_buffer) + { + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + } + } + /// Try to read a next portion of data. next_result = impl->next(); watch.stop(); @@ -119,19 +125,11 @@ bool ReadBufferFromS3::nextImpl() throw; /// Pause before next attempt. - std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds); + sleepForMilliseconds(sleep_time_with_backoff_milliseconds); sleep_time_with_backoff_milliseconds *= 2; /// Try to reinitialize `impl`. impl.reset(); - impl = initialize(); - if (use_external_buffer) - { - impl->set(internal_buffer.begin(), internal_buffer.size()); - assert(working_buffer.begin() != nullptr); - assert(!internal_buffer.empty()); - } - next_result = impl->hasPendingData(); } } @@ -173,10 +171,11 @@ std::unique_ptr ReadBufferFromS3::initialize() req.SetBucket(bucket); req.SetKey(key); - // auto right_offset = read_settings.remote_read_right_offset; - if (last_offset) { + if (offset >= static_cast(last_offset)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1); + req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1)); LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1); }