2021-10-04 13:46:03 +00:00
|
|
|
#if !defined(ARCADIA_BUILD)
|
|
|
|
#include <Common/config.h>
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#if USE_AZURE_BLOB_STORAGE
|
|
|
|
|
|
|
|
#include <iostream>
|
|
|
|
|
|
|
|
#include <IO/ReadBufferFromBlobStorage.h>
|
|
|
|
// #include <IO/ReadBufferFromIStream.h>
|
|
|
|
#include <IO/ReadBufferFromString.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int S3_ERROR;
|
|
|
|
extern const int CANNOT_SEEK_THROUGH_FILE;
|
|
|
|
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
|
|
|
|
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
|
|
|
|
const String & main_path_,
|
|
|
|
const String & path_,
|
|
|
|
size_t buf_size_) :
|
|
|
|
SeekableReadBuffer(nullptr, 0),
|
|
|
|
blob_container_client(blob_container_client_),
|
|
|
|
main_path(main_path_),
|
|
|
|
path(path_),
|
|
|
|
buf_size(buf_size_) {}
|
|
|
|
|
|
|
|
|
|
|
|
bool ReadBufferFromBlobStorage::nextImpl()
|
|
|
|
{
|
|
|
|
bool next_result = false;
|
|
|
|
|
|
|
|
if (impl)
|
|
|
|
{
|
|
|
|
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
|
|
|
|
impl->position() = position();
|
|
|
|
assert(!impl->hasPendingData());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// `impl` is not initialized and we're about to read the first portion of data.
|
|
|
|
impl = initialize();
|
|
|
|
next_result = impl->hasPendingData();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!next_result)
|
|
|
|
{
|
|
|
|
/// Try to read a next portion of data.
|
|
|
|
next_result = impl->next();
|
|
|
|
}
|
|
|
|
|
|
|
|
// std::cout << "\nReadBufferFromBlobStorage::nextImpl next_result: " << next_result << "\n";
|
|
|
|
|
|
|
|
if (!next_result)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
|
|
|
|
|
|
|
|
offset += working_buffer.size();
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
|
|
|
|
{
|
|
|
|
if (impl)
|
|
|
|
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
|
|
|
|
|
|
|
if (whence != SEEK_SET)
|
|
|
|
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
|
|
|
|
|
|
|
if (offset_ < 0)
|
|
|
|
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
|
|
|
|
|
|
|
offset = offset_;
|
|
|
|
|
|
|
|
return offset;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
off_t ReadBufferFromBlobStorage::getPosition()
|
|
|
|
{
|
|
|
|
return offset - available();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<ReadBuffer> ReadBufferFromBlobStorage::initialize()
|
|
|
|
{
|
|
|
|
std::cout << "path: " << path << "\n";
|
|
|
|
std::cout << "main_path: " << main_path << "\n";
|
|
|
|
std::cout << "buf_size: " << buf_size << "\n";
|
|
|
|
|
|
|
|
std::cout << "blob_container_client.GetUrl(): " << blob_container_client.GetUrl() << "\n";
|
|
|
|
|
|
|
|
auto blob_client = blob_container_client.GetBlobClient(path);
|
|
|
|
// auto blob_client = blob_container_client.GetBlobClient("cwtwuujiyxiqpylavpoziotxlygiaias");
|
|
|
|
|
2021-10-05 12:46:38 +00:00
|
|
|
auto prop = blob_client.GetProperties();
|
2021-10-04 13:46:03 +00:00
|
|
|
|
2021-10-05 12:46:38 +00:00
|
|
|
auto proper_buf_size = prop.Value.BlobSize;
|
|
|
|
|
|
|
|
std::cout << "proper_buf_size: " << proper_buf_size << "\n";
|
|
|
|
|
|
|
|
std::vector<uint8_t> v(proper_buf_size);
|
|
|
|
|
|
|
|
blob_client.DownloadTo(v.data(), proper_buf_size);
|
2021-10-04 13:46:03 +00:00
|
|
|
|
|
|
|
// std::cout << "v contents (" << v.size() << "): ";
|
|
|
|
|
|
|
|
// for (auto i : v)
|
|
|
|
// {
|
|
|
|
// std::cout << static_cast<int>(i) << " ";
|
|
|
|
// }
|
|
|
|
|
|
|
|
// std::cout << "\n";
|
|
|
|
|
|
|
|
// auto downloaded_data = blob_client.Download();
|
|
|
|
|
|
|
|
return std::make_unique<ReadBufferFromString>(v);
|
|
|
|
|
|
|
|
// return std::make_unique<ReadBufferFromString>(downloaded_data.RawResponse->GetBody());
|
|
|
|
// return std::make_unique<ReadBufferFromIStream>(downloaded_data.RawResponse->ExtractBodyStream(), buf_size);
|
|
|
|
// return std::make_unique<ReadBufferFromIStream>(downloaded_data.Value.BodyStream, buf_size);
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|