ReadBufferFromBlobStorage operating like a proper stream, rather than a single download-read buffer

This commit is contained in:
Jakub Kuklis 2021-10-11 14:54:22 +00:00
parent 95816875f4
commit 956348073b
2 changed files with 56 additions and 62 deletions

View File

@ -25,65 +25,32 @@ ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
const String & path_,
UInt64 max_single_read_retries_,
size_t /* buf_size_ */) :
size_t buf_size_) :
SeekableReadBuffer(nullptr, 0),
blob_container_client(blob_container_client_),
tmp_buffer(buf_size_),
max_single_read_retries(max_single_read_retries_),
path(path_) {}
path(path_),
buf_size(buf_size_) {}
bool ReadBufferFromBlobStorage::nextImpl()
{
bool next_result = false;
// TODO: is this "stream" approach better than a single DownloadTo approach (last commit 90fc230c4dfacc1a9d50d2d65b91363150caa784) ?
if (impl)
{
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
impl->position() = position();
assert(!impl->hasPendingData());
}
else
{
/// `impl` is not initialized and we're about to read the first portion of data.
impl = initialize();
next_result = impl->hasPendingData();
}
if (!initialized)
initialize();
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
{
try
{
/// Try to read a next portion of data.
next_result = impl->next();
break;
}
catch (const Exception & e)
{
// TODO: can't get this to compile, getting "error: reference to overloaded function could not be resolved; did you mean to call it?"
// LOG_DEBUG(log, "Caught exception while reading Blob Storage object. Object: {}, Offset: {}, Attempt: {}, Message: {}",
// path, getPosition(), attempt, e.message());
std::cout << "Caught exception while reading Blob Storage object. Object: " << path << ", Offset: "
<< getPosition() << ", Attempt: " << attempt << ", Message: " << e.message() << "\n";
/// Pause before next attempt.
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
impl.reset();
impl = initialize();
next_result = impl->hasPendingData();
}
}
if (!next_result)
if (static_cast<size_t>(offset) >= total_size)
return false;
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
size_t to_read_bytes = std::min(total_size - offset, buf_size);
offset += working_buffer.size();
size_t bytes_read = data_stream->Read(tmp_buffer.data(), to_read_bytes);
BufferBase::set(reinterpret_cast<char *>(tmp_buffer.data()), bytes_read, 0);
offset += bytes_read;
return true;
}
@ -91,7 +58,7 @@ bool ReadBufferFromBlobStorage::nextImpl()
off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
{
if (impl)
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
@ -108,26 +75,50 @@ off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
off_t ReadBufferFromBlobStorage::getPosition()
{
return offset - available();
// TODO: which one is the right one?
// return offset - available();
return offset;
}
std::unique_ptr<ReadBuffer> ReadBufferFromBlobStorage::initialize()
void ReadBufferFromBlobStorage::initialize()
{
if (initialized)
return;
auto blob_client = blob_container_client.GetBlobClient(path);
auto prop = blob_client.GetProperties();
auto blob_size = prop.Value.BlobSize;
#ifdef VERBOSE_DEBUG_MODE
std::cout << "path: " << path << "\n";
std::cout << "blob_size: " << blob_size << "\n";
#endif
auto download_response = blob_client.Download();
tmp_buffer.resize(blob_size);
data_stream = std::move(download_response.Value.BodyStream);
blob_client.DownloadTo(tmp_buffer.data(), blob_size);
if (data_stream == nullptr)
{
// TODO: change error code
throw Exception("Null data stream obtained from blob Download", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
return std::make_unique<ReadBufferFromString>(tmp_buffer);
total_size = data_stream->Length();
if (offset != 0)
{
// TODO: is it the right way?
/// try to rewind to offset in the buffer
size_t total_read_bytes = 0;
while (total_read_bytes < static_cast<size_t>(offset))
{
size_t to_read_bytes = std::min(offset - total_read_bytes, buf_size);
size_t bytes_read = data_stream->Read(tmp_buffer.data(), to_read_bytes);
total_read_bytes += bytes_read;
}
}
initialized = true;
// TODO: dummy if to avoid warning for max_single_read_retries
if (max_single_read_retries == 0)
return;
}
}

View File

@ -30,14 +30,17 @@ public:
private:
std::unique_ptr<ReadBuffer> initialize();
void initialize();
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream;
Azure::Storage::Blobs::BlobContainerClient blob_container_client;
std::unique_ptr<ReadBuffer> impl;
std::vector<uint8_t> tmp_buffer;
UInt64 max_single_read_retries;
UInt64 max_single_read_retries; // TODO: unused field
const String & path;
off_t offset = 0;
size_t buf_size;
size_t total_size;
bool initialized = false;
};