From c1ea44b9c2df004fd43b6b1434fa713f6595ab0e Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 24 Sep 2021 13:38:08 +0300 Subject: [PATCH] Fixes --- ...chronousReadIndirectBufferFromRemoteFS.cpp | 29 +++++++++++-------- src/IO/ReadBufferFromRemoteFS.cpp | 7 ++--- src/IO/ReadBufferFromS3.cpp | 3 +- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp index ada29155b3f..a42e15325d8 100644 --- a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -36,6 +36,13 @@ std::future AsynchronousReadIndirectBufferFromRemot { IAsynchronousReader::Request request; + impl->set(impl->buffer().begin(), impl->buffer().size()); + if (impl->initialized()) + { + impl->position() = impl->buffer().end(); + assert(!impl->hasPendingData()); + } + auto remote_fd = std::make_shared(); remote_fd->impl = impl; @@ -51,11 +58,6 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() if (prefetch_future.valid()) return; - if (impl->initialized()) - { - impl->position() = impl->buffer().end(); /// May be should try to do this differently. - assert(!impl->hasPendingData()); - } prefetch_future = readNext(); } @@ -74,17 +76,19 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() } else { - if (impl->initialized()) - { - impl->position() = position(); - assert(!impl->hasPendingData()); - } + // if (impl->initialized()) + // { + // impl->position() = position(); + // assert(!impl->hasPendingData()); + // } size = readNext().get(); } if (size) { - BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); + //set(working_buffer.begin(), working_buffer.size()); + swap(*impl); + // BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); impl->absolute_position += working_buffer.size(); } @@ -138,8 +142,9 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence prefetch_future = {}; } - impl->seek(impl->absolute_position, SEEK_SET); + // impl->seek(impl->absolute_position, SEEK_SET); pos = working_buffer.end(); + impl->reset(); return impl->absolute_position; } diff --git a/src/IO/ReadBufferFromRemoteFS.cpp b/src/IO/ReadBufferFromRemoteFS.cpp index 88b3fffcf8b..b52104790af 100644 --- a/src/IO/ReadBufferFromRemoteFS.cpp +++ b/src/IO/ReadBufferFromRemoteFS.cpp @@ -85,17 +85,16 @@ off_t ReadBufferFromRemoteFS::seek([[maybe_unused]] off_t offset_, int whence) { if (whence != SEEK_SET) throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET is allowed"); - /// We already made a seek and adjusted position in ReadIndirectBufferFromRemoteFS. - assert(offset_ == static_cast(absolute_position)); - current_buf = initialize(); + // current_buf = initialize(); return absolute_position; } void ReadBufferFromRemoteFS::reset() { - set(nullptr, 0); + current_buf.reset(); + // set(nullptr, 0); } } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 9485bec5679..737090c4bce 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -48,7 +48,8 @@ bool ReadBufferFromS3::nextImpl() if (impl) { /// `impl` has been initialized earlier and now we're at the end of the current portion of data. - impl->position() = position(); + // impl->position() = position(); + impl->set(working_buffer.begin(), working_buffer.size()); assert(!impl->hasPendingData()); } else