#if !defined(ARCADIA_BUILD) #include #endif #if USE_AZURE_BLOB_STORAGE #include #include #include #include namespace DB { namespace ErrorCodes { extern const int CANNOT_SEEK_THROUGH_FILE; extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int RECEIVED_EMPTY_DATA; } ReadBufferFromBlobStorage::ReadBufferFromBlobStorage( std::shared_ptr blob_container_client_, const String & path_, size_t buf_size_) : SeekableReadBuffer(nullptr, 0), blob_container_client(blob_container_client_), tmp_buffer(buf_size_), path(path_), buf_size(buf_size_) {} bool ReadBufferFromBlobStorage::nextImpl() { // TODO: is this "stream" approach better than a single DownloadTo approach (last commit 90fc230c4dfacc1a9d50d2d65b91363150caa784) ? if (!initialized) initialize(); if (offset >= total_size) return false; size_t to_read_bytes = std::min(total_size - offset, buf_size); size_t bytes_read = 0; auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100); for (int i = 0; i < 3; i++) { try { bytes_read = data_stream->ReadToCount(tmp_buffer.data(), to_read_bytes); break; } catch (const Azure::Storage::StorageException & e) { LOG_INFO(log, "Exception caught during Azure Read for file {} : {}", path, e.Message); std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds); sleep_time_with_backoff_milliseconds *= 2; initialized = false; initialize(); } } if (bytes_read == 0) return false; BufferBase::set(reinterpret_cast(tmp_buffer.data()), bytes_read, 0); offset += bytes_read; return true; } off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence) { if (initialized) throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); if (whence != SEEK_SET) throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); if (offset_ < 0) throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); offset = offset_; return offset; } off_t ReadBufferFromBlobStorage::getPosition() { // TODO: why is it `return offset - available()` in S3? return offset; } void ReadBufferFromBlobStorage::initialize() { if (initialized) return; Azure::Storage::Blobs::DownloadBlobOptions download_options; if (offset != 0) download_options.Range = {static_cast(offset), {}}; blob_client = std::make_unique(blob_container_client->GetBlobClient(path)); try { auto download_response = blob_client->Download(download_options); data_stream = std::move(download_response.Value.BodyStream); } catch (const Azure::Storage::StorageException & e) { LOG_INFO(log, "Exception caught during Azure Download for file {} at offset {} : {}", path, offset, e.Message); throw e; } if (data_stream == nullptr) throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path); total_size = data_stream->Length() + offset; initialized = true; } } #endif