diff --git a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 62a8d816782..fe433c76d9b 100644 --- a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -27,7 +27,8 @@ namespace ErrorCodes AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS( AsynchronousReaderPtr reader_, Int32 priority_, std::shared_ptr impl_) - : reader(reader_), priority(priority_), impl(impl_) + : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) + , reader(reader_), priority(priority_), impl(impl_), prefetch_buffer(DBMS_DEFAULT_BUFFER_SIZE) { } @@ -48,6 +49,9 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() return; std::lock_guard lock(mutex); + assert(prefetch_buffer.data() != nullptr); + prefetch_buffer.resize(DBMS_DEFAULT_BUFFER_SIZE); + impl->set(prefetch_buffer.data(), prefetch_buffer.size()); prefetch_future = readNext(); } @@ -65,9 +69,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() size = prefetch_future.get(); if (size) { - set(impl->buffer().begin(), impl->buffer().size()); + memory.swap(prefetch_buffer); + set(memory.data(), memory.size()); working_buffer.resize(size); - impl->reset(); absolute_position += size; } } @@ -77,12 +81,15 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() else { std::lock_guard lock(mutex); + impl->check = true; + impl->set(memory.data(), memory.size()); + assert(memory.data() != nullptr); + assert(impl->buffer().begin() != nullptr); size = readNext().get(); if (size) { - set(impl->buffer().begin(), impl->buffer().size()); + set(memory.data(), memory.size()); working_buffer.resize(size); - impl->reset(); absolute_position += size; } } diff --git a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h index d108aa387b4..0dd59d98864 100644 --- a/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/Disks/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -14,7 +14,17 @@ namespace DB { -/// Reads data from S3/HDFS/Web using stored paths in metadata. +/** +* Reads data from S3/HDFS/Web using stored paths in metadata. +* +* Buffers chain for diskS3: +* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS -> +* -> ReadBufferFromS3 -> ReadBufferFromIStream. +* +* Buffers chain for diskWeb: +* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS -> +* -> ReadIndirectBufferFromWebServer -> ReadBufferFromHttp -> ReadBufferFromIStream. +*/ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase { public: @@ -31,6 +41,8 @@ public: void prefetch() override; + bool check = false; + private: bool nextImpl() override; @@ -45,6 +57,7 @@ private: size_t absolute_position = 0; std::mutex mutex; + Memory<> prefetch_buffer; }; } diff --git a/src/IO/ReadBufferFromRemoteFS.cpp b/src/IO/ReadBufferFromRemoteFS.cpp index 5af34447bc5..63403ba0599 100644 --- a/src/IO/ReadBufferFromRemoteFS.cpp +++ b/src/IO/ReadBufferFromRemoteFS.cpp @@ -72,9 +72,16 @@ bool ReadBufferFromRemoteFS::nextImpl() bool ReadBufferFromRemoteFS::read() { + if (check) + { + assert(!internal_buffer.empty()); + assert(working_buffer.begin() != nullptr); + } /// Transfer current position and working_buffer to actual ReadBuffer swap(*current_buf); /// Position and working_buffer will be updated in next() call + if (check) + assert(current_buf->buffer().begin() != nullptr); auto result = current_buf->next(); /// Assign result to current buffer. swap(*current_buf); @@ -111,7 +118,7 @@ void ReadBufferFromRemoteFS::reset(bool reset_inner_buf) { if (reset_inner_buf) current_buf.reset(); - BufferBase::set(nullptr, 0, 0); + // BufferBase::set(nullptr, 0, 0); } } diff --git a/src/IO/ReadBufferFromRemoteFS.h b/src/IO/ReadBufferFromRemoteFS.h index 559a6550cfc..da522e36020 100644 --- a/src/IO/ReadBufferFromRemoteFS.h +++ b/src/IO/ReadBufferFromRemoteFS.h @@ -22,6 +22,7 @@ public: void reset(bool reset_inner_buf = false); + bool check = false; protected: size_t fetch(size_t offset); diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index a482b124fee..d99f565419d 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -50,16 +50,12 @@ bool ReadBufferFromS3::nextImpl() /// `impl` has been initialized earlier and now we're at the end of the current portion of data. if (impl) { - if (use_external_buffer) - { - impl->set(working_buffer.begin(), working_buffer.size()); - } - else + if (!use_external_buffer) { impl->position() = position(); + assert(!impl->hasPendingData()); } - assert(!impl->hasPendingData()); } else { @@ -67,6 +63,14 @@ bool ReadBufferFromS3::nextImpl() impl = initialize(); next_result = impl->hasPendingData(); } + if (use_external_buffer) + { + // assert(!impl->buffer().begin() || impl->buffer().begin() != working_buffer.begin()); + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + // impl->BufferBase::set(nullptr, 0, 0); + } auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100); for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt) @@ -99,6 +103,14 @@ bool ReadBufferFromS3::nextImpl() /// Try to reinitialize `impl`. impl.reset(); impl = initialize(); + if (use_external_buffer) + { + // assert(!impl->buffer().begin() || impl->buffer().begin() != working_buffer.begin()); + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + // impl->BufferBase::set(nullptr, 0, 0); + } next_result = impl->hasPendingData(); } }