diff --git a/src/IO/ReadBufferFromBlobStorage.cpp b/src/IO/ReadBufferFromBlobStorage.cpp index 174721f5957..db6b1972c20 100644 --- a/src/IO/ReadBufferFromBlobStorage.cpp +++ b/src/IO/ReadBufferFromBlobStorage.cpp @@ -25,65 +25,32 @@ ReadBufferFromBlobStorage::ReadBufferFromBlobStorage( Azure::Storage::Blobs::BlobContainerClient blob_container_client_, const String & path_, UInt64 max_single_read_retries_, - size_t /* buf_size_ */) : + size_t buf_size_) : SeekableReadBuffer(nullptr, 0), blob_container_client(blob_container_client_), + tmp_buffer(buf_size_), max_single_read_retries(max_single_read_retries_), - path(path_) {} + path(path_), + buf_size(buf_size_) {} bool ReadBufferFromBlobStorage::nextImpl() { - bool next_result = false; + // TODO: is this "stream" approach better than a single DownloadTo approach (last commit 90fc230c4dfacc1a9d50d2d65b91363150caa784) ? - if (impl) - { - /// `impl` has been initialized earlier and now we're at the end of the current portion of data. - impl->position() = position(); - assert(!impl->hasPendingData()); - } - else - { - /// `impl` is not initialized and we're about to read the first portion of data. - impl = initialize(); - next_result = impl->hasPendingData(); - } + if (!initialized) + initialize(); - auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100); - for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt) - { - try - { - /// Try to read a next portion of data. - next_result = impl->next(); - break; - } - catch (const Exception & e) - { - // TODO: can't get this to compile, getting "error: reference to overloaded function could not be resolved; did you mean to call it?" - // LOG_DEBUG(log, "Caught exception while reading Blob Storage object. Object: {}, Offset: {}, Attempt: {}, Message: {}", - // path, getPosition(), attempt, e.message()); - - std::cout << "Caught exception while reading Blob Storage object. Object: " << path << ", Offset: " - << getPosition() << ", Attempt: " << attempt << ", Message: " << e.message() << "\n"; - - /// Pause before next attempt. - std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds); - sleep_time_with_backoff_milliseconds *= 2; - - /// Try to reinitialize `impl`. - impl.reset(); - impl = initialize(); - next_result = impl->hasPendingData(); - } - } - - if (!next_result) + if (static_cast(offset) >= total_size) return false; - BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl` + size_t to_read_bytes = std::min(total_size - offset, buf_size); - offset += working_buffer.size(); + size_t bytes_read = data_stream->Read(tmp_buffer.data(), to_read_bytes); + + BufferBase::set(reinterpret_cast(tmp_buffer.data()), bytes_read, 0); + + offset += bytes_read; return true; } @@ -91,7 +58,7 @@ bool ReadBufferFromBlobStorage::nextImpl() off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence) { - if (impl) + if (initialized) throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); if (whence != SEEK_SET) @@ -108,26 +75,50 @@ off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence) off_t ReadBufferFromBlobStorage::getPosition() { - return offset - available(); + // TODO: which one is the right one? + // return offset - available(); + + return offset; } -std::unique_ptr ReadBufferFromBlobStorage::initialize() +void ReadBufferFromBlobStorage::initialize() { + if (initialized) + return; + auto blob_client = blob_container_client.GetBlobClient(path); - auto prop = blob_client.GetProperties(); - auto blob_size = prop.Value.BlobSize; -#ifdef VERBOSE_DEBUG_MODE - std::cout << "path: " << path << "\n"; - std::cout << "blob_size: " << blob_size << "\n"; -#endif + auto download_response = blob_client.Download(); - tmp_buffer.resize(blob_size); + data_stream = std::move(download_response.Value.BodyStream); - blob_client.DownloadTo(tmp_buffer.data(), blob_size); + if (data_stream == nullptr) + { + // TODO: change error code + throw Exception("Null data stream obtained from blob Download", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + } - return std::make_unique(tmp_buffer); + total_size = data_stream->Length(); + + if (offset != 0) + { + // TODO: is it the right way? + /// try to rewind to offset in the buffer + size_t total_read_bytes = 0; + while (total_read_bytes < static_cast(offset)) + { + size_t to_read_bytes = std::min(offset - total_read_bytes, buf_size); + size_t bytes_read = data_stream->Read(tmp_buffer.data(), to_read_bytes); + total_read_bytes += bytes_read; + } + } + + initialized = true; + + // TODO: dummy if to avoid warning for max_single_read_retries + if (max_single_read_retries == 0) + return; } } diff --git a/src/IO/ReadBufferFromBlobStorage.h b/src/IO/ReadBufferFromBlobStorage.h index 7f6f18ba935..7c2bd96a96f 100644 --- a/src/IO/ReadBufferFromBlobStorage.h +++ b/src/IO/ReadBufferFromBlobStorage.h @@ -30,14 +30,17 @@ public: private: - std::unique_ptr initialize(); + void initialize(); + std::unique_ptr data_stream; Azure::Storage::Blobs::BlobContainerClient blob_container_client; - std::unique_ptr impl; std::vector tmp_buffer; - UInt64 max_single_read_retries; + UInt64 max_single_read_retries; // TODO: unused field const String & path; off_t offset = 0; + size_t buf_size; + size_t total_size; + bool initialized = false; };