From 55972db8668b1be2a65ef857590b5c9376961b42 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 1 Oct 2021 11:38:58 +0300 Subject: [PATCH] Small fix and more comments --- ...ynchronousReadIndirectBufferFromRemoteFS.h | 2 ++ src/Disks/ReadIndirectBufferFromWebServer.cpp | 27 +++++++++++++------ src/IO/ReadBufferFromS3.cpp | 13 +++++++++ src/IO/ReadWriteBufferFromHTTP.h | 12 +++++++++ 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h index 1095a22091f..afe58ae1c6a 100644 --- a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -24,6 +24,8 @@ namespace DB * Buffers chain for diskWeb: * AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS -> * -> ReadIndirectBufferFromWebServer -> ReadBufferFromHttp -> ReadBufferFromIStream. +* +* We pass either `memory` or `prefetch_buffer` through all this chain and return it back. */ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase { diff --git a/src/Disks/ReadIndirectBufferFromWebServer.cpp b/src/Disks/ReadIndirectBufferFromWebServer.cpp index f5ef1efae02..02a38c1d546 100644 --- a/src/Disks/ReadIndirectBufferFromWebServer.cpp +++ b/src/Disks/ReadIndirectBufferFromWebServer.cpp @@ -71,20 +71,31 @@ bool ReadIndirectBufferFromWebServer::nextImpl() if (impl) { - if (!use_external_buffer) + if (use_external_buffer) { + /** + * use_external_buffer -- means we read into the buffer which + * was passed to us from somewhere else. We do not check whether + * previously returned buffer was read or not, because this branch + * means we are prefetching data, each nextImpl() call we can fill + * a different buffer. + */ + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + } + else + { + /** + * impl was initialized before, pass position() to it to make + * sure there is no pending data which was not read, becuase + * this branch means we read sequentially. + */ impl->position() = position(); assert(!impl->hasPendingData()); } } - if (use_external_buffer) - { - impl->set(internal_buffer.begin(), internal_buffer.size()); - assert(working_buffer.begin() != nullptr); - assert(!internal_buffer.empty()); - } - WriteBufferFromOwnString error_msg; for (size_t i = 0; (i < max_tries) && !successful_read && !next_result; ++i) { diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 8b815021186..9dbe8cfd578 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -52,6 +52,13 @@ bool ReadBufferFromS3::nextImpl() { if (!use_external_buffer) { + /** + * use_external_buffer -- means we read into the buffer which + * was passed to us from somewhere else. We do not check whether + * previously returned buffer was read or not, because this branch + * means we are prefetching data, each nextImpl() call we can fill + * a different buffer. + */ impl->position() = position(); assert(!impl->hasPendingData()); } @@ -65,6 +72,12 @@ bool ReadBufferFromS3::nextImpl() if (use_external_buffer) { + /** + * use_external_buffer -- means we read into the buffer which + * was passed to us from somewhere else. We do not check whether + * previously returned buffer was read or not, because this branch + * means we are prefetching data. + */ impl->set(internal_buffer.begin(), internal_buffer.size()); assert(working_buffer.begin() != nullptr); assert(!internal_buffer.empty()); diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 2cf2c587922..33edcdb2e42 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -213,12 +213,24 @@ namespace detail if (use_external_buffer) { + /** + * use_external_buffer -- means we read into the buffer which + * was passed to us from somewhere else. We do not check whether + * previously returned buffer was read or not, because this branch + * means we are prefetching data, each nextImpl() call we can fill + * a different buffer. + */ impl->set(internal_buffer.begin(), internal_buffer.size()); assert(working_buffer.begin() != nullptr); assert(!internal_buffer.empty()); } else { + /** + * impl was initialized before, pass position() to it to make + * sure there is no pending data which was not read, becuase + * this branch means we read sequentially. + */ if (!working_buffer.empty()) impl->position() = position(); }