#if !defined(ARCADIA_BUILD) #include #endif #if USE_AZURE_BLOB_STORAGE #include #include #include namespace DB { namespace ErrorCodes { extern const int CANNOT_SEEK_THROUGH_FILE; extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int RECEIVED_EMPTY_DATA; extern const int LOGICAL_ERROR; } ReadBufferFromBlobStorage::ReadBufferFromBlobStorage( std::shared_ptr blob_container_client_, const String & path_, size_t tmp_buffer_size_, bool use_external_buffer_, size_t read_until_position_) : SeekableReadBuffer(nullptr, 0) , blob_container_client(blob_container_client_) , path(path_) , use_external_buffer(use_external_buffer_) , read_until_position(read_until_position_) , tmp_buffer_size(tmp_buffer_size_) // NOTE: field used only here in the constructor { if (!use_external_buffer) { tmp_buffer.resize(tmp_buffer_size); data_ptr = tmp_buffer.data(); data_capacity = tmp_buffer_size; } } bool ReadBufferFromBlobStorage::nextImpl() { // TODO: is this "stream" approach better than a single DownloadTo approach (last commit 90fc230c4dfacc1a9d50d2d65b91363150caa784) ? if (read_until_position) { if (read_until_position == offset) return false; if (read_until_position < offset) throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1); } if (!initialized) initialize(); if (use_external_buffer) { data_ptr = internal_buffer.begin(); data_capacity = internal_buffer.size(); } size_t to_read_bytes = std::min(total_size - offset, data_capacity); size_t bytes_read = 0; auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100); for (int i = 0; i < 3; i++) // TODO: setting for max retries? { try { bytes_read = data_stream->ReadToCount(reinterpret_cast(data_ptr), to_read_bytes); break; } catch (const Azure::Storage::StorageException & e) { LOG_INFO(log, "Exception caught during Azure Read for file {} : {}", path, e.Message); std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds); sleep_time_with_backoff_milliseconds *= 2; initialized = false; initialize(); } } if (bytes_read == 0) return false; BufferBase::set(data_ptr, bytes_read, 0); offset += bytes_read; return true; } off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence) { if (initialized) throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); if (whence != SEEK_SET) throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); if (offset_ < 0) throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); offset = offset_; return offset; } off_t ReadBufferFromBlobStorage::getPosition() { // TODO: why is it `return offset - available()` in S3? return offset; } void ReadBufferFromBlobStorage::initialize() { if (initialized) return; Azure::Storage::Blobs::DownloadBlobOptions download_options; Azure::Nullable length {}; if (read_until_position != 0) length = {static_cast(read_until_position - offset)}; download_options.Range = {static_cast(offset), length}; blob_client = std::make_unique(blob_container_client->GetBlobClient(path)); try { auto download_response = blob_client->Download(download_options); data_stream = std::move(download_response.Value.BodyStream); } catch (const Azure::Storage::StorageException & e) { LOG_INFO(log, "Exception caught during Azure Download for file {} at offset {} : {}", path, offset, e.Message); throw e; } if (data_stream == nullptr) throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path); total_size = data_stream->Length() + offset; initialized = true; } } #endif