2021-10-04 13:46:03 +00:00
|
|
|
#if !defined(ARCADIA_BUILD)
|
|
|
|
#include <Common/config.h>
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#if USE_AZURE_BLOB_STORAGE
|
|
|
|
|
|
|
|
#include <IO/ReadBufferFromBlobStorage.h>
|
|
|
|
#include <IO/ReadBufferFromString.h>
|
2021-11-17 19:47:34 +00:00
|
|
|
#include <base/logger_useful.h>
|
2021-10-08 14:34:40 +00:00
|
|
|
|
2021-10-04 13:46:03 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int CANNOT_SEEK_THROUGH_FILE;
|
|
|
|
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
2021-10-15 09:04:22 +00:00
|
|
|
extern const int RECEIVED_EMPTY_DATA;
|
2021-11-17 19:47:34 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2021-10-04 13:46:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
|
2021-10-19 09:30:15 +00:00
|
|
|
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
|
2021-10-04 13:46:03 +00:00
|
|
|
const String & path_,
|
2021-11-18 09:03:57 +00:00
|
|
|
size_t max_single_read_retries_,
|
2021-11-17 19:47:34 +00:00
|
|
|
size_t tmp_buffer_size_,
|
|
|
|
bool use_external_buffer_,
|
|
|
|
size_t read_until_position_)
|
2021-11-18 09:03:57 +00:00
|
|
|
// TODO: shall this notation be used in all constructors?
|
2021-11-17 19:47:34 +00:00
|
|
|
: SeekableReadBuffer(nullptr, 0)
|
|
|
|
, blob_container_client(blob_container_client_)
|
|
|
|
, path(path_)
|
2021-11-18 09:03:57 +00:00
|
|
|
, max_single_read_retries(max_single_read_retries_)
|
|
|
|
, tmp_buffer_size(tmp_buffer_size_) // NOTE: field used only here in the constructor
|
2021-11-17 19:47:34 +00:00
|
|
|
, use_external_buffer(use_external_buffer_)
|
|
|
|
, read_until_position(read_until_position_)
|
|
|
|
{
|
|
|
|
if (!use_external_buffer)
|
|
|
|
{
|
|
|
|
tmp_buffer.resize(tmp_buffer_size);
|
|
|
|
data_ptr = tmp_buffer.data();
|
|
|
|
data_capacity = tmp_buffer_size;
|
|
|
|
}
|
|
|
|
}
|
2021-10-04 13:46:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
bool ReadBufferFromBlobStorage::nextImpl()
|
|
|
|
{
|
2021-10-11 14:54:22 +00:00
|
|
|
// TODO: is this "stream" approach better than a single DownloadTo approach (last commit 90fc230c4dfacc1a9d50d2d65b91363150caa784) ?
|
2021-10-04 13:46:03 +00:00
|
|
|
|
2021-11-17 19:47:34 +00:00
|
|
|
if (read_until_position)
|
|
|
|
{
|
|
|
|
if (read_until_position == offset)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
if (read_until_position < offset)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
|
|
|
}
|
|
|
|
|
2021-10-11 14:54:22 +00:00
|
|
|
if (!initialized)
|
|
|
|
initialize();
|
2021-10-08 14:34:40 +00:00
|
|
|
|
2021-11-17 19:47:34 +00:00
|
|
|
if (use_external_buffer)
|
|
|
|
{
|
|
|
|
data_ptr = internal_buffer.begin();
|
|
|
|
data_capacity = internal_buffer.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t to_read_bytes = std::min(total_size - offset, data_capacity);
|
2021-11-09 11:13:29 +00:00
|
|
|
size_t bytes_read = 0;
|
|
|
|
|
|
|
|
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
|
2021-11-18 09:03:57 +00:00
|
|
|
for (size_t i = 0; i < max_single_read_retries; ++i) // TODO: Is this try part necessary with Azure reliable streams?
|
2021-11-09 11:13:29 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2021-11-17 19:47:34 +00:00
|
|
|
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
|
2021-11-09 11:13:29 +00:00
|
|
|
break;
|
|
|
|
}
|
2021-11-15 13:43:02 +00:00
|
|
|
catch (const Azure::Storage::StorageException & e)
|
2021-11-09 11:13:29 +00:00
|
|
|
{
|
2021-11-15 13:43:02 +00:00
|
|
|
LOG_INFO(log, "Exception caught during Azure Read for file {} : {}", path, e.Message);
|
2021-11-09 11:13:29 +00:00
|
|
|
|
|
|
|
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
|
|
|
|
sleep_time_with_backoff_milliseconds *= 2;
|
|
|
|
initialized = false;
|
|
|
|
initialize();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (bytes_read == 0)
|
|
|
|
return false;
|
2021-10-04 13:46:03 +00:00
|
|
|
|
2021-11-17 19:47:34 +00:00
|
|
|
BufferBase::set(data_ptr, bytes_read, 0);
|
2021-10-11 14:54:22 +00:00
|
|
|
offset += bytes_read;
|
2021-10-04 13:46:03 +00:00
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
|
|
|
|
{
|
2021-10-11 14:54:22 +00:00
|
|
|
if (initialized)
|
2021-10-04 13:46:03 +00:00
|
|
|
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
|
|
|
|
|
|
|
if (whence != SEEK_SET)
|
|
|
|
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
|
|
|
|
|
|
|
if (offset_ < 0)
|
|
|
|
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
|
|
|
|
|
|
|
offset = offset_;
|
|
|
|
|
|
|
|
return offset;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
off_t ReadBufferFromBlobStorage::getPosition()
|
|
|
|
{
|
2021-11-16 13:35:21 +00:00
|
|
|
// TODO: why is it `return offset - available()` in S3?
|
2021-10-11 14:54:22 +00:00
|
|
|
|
|
|
|
return offset;
|
2021-10-04 13:46:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-10-11 14:54:22 +00:00
|
|
|
void ReadBufferFromBlobStorage::initialize()
|
2021-10-04 13:46:03 +00:00
|
|
|
{
|
2021-10-11 14:54:22 +00:00
|
|
|
if (initialized)
|
|
|
|
return;
|
|
|
|
|
2021-11-09 11:13:29 +00:00
|
|
|
Azure::Storage::Blobs::DownloadBlobOptions download_options;
|
2021-11-17 19:47:34 +00:00
|
|
|
|
|
|
|
Azure::Nullable<int64_t> length {};
|
|
|
|
if (read_until_position != 0)
|
|
|
|
length = {static_cast<int64_t>(read_until_position - offset)};
|
|
|
|
|
|
|
|
download_options.Range = {static_cast<int64_t>(offset), length};
|
2021-11-09 11:13:29 +00:00
|
|
|
|
|
|
|
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
|
|
|
|
|
2021-11-15 13:43:02 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
auto download_response = blob_client->Download(download_options);
|
|
|
|
data_stream = std::move(download_response.Value.BodyStream);
|
|
|
|
}
|
|
|
|
catch (const Azure::Storage::StorageException & e)
|
|
|
|
{
|
2021-11-16 13:35:21 +00:00
|
|
|
LOG_INFO(log, "Exception caught during Azure Download for file {} at offset {} : {}", path, offset, e.Message);
|
2021-11-15 13:43:02 +00:00
|
|
|
throw e;
|
|
|
|
}
|
|
|
|
|
2021-10-11 14:54:22 +00:00
|
|
|
if (data_stream == nullptr)
|
2021-11-16 13:35:21 +00:00
|
|
|
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path);
|
2021-10-11 14:54:22 +00:00
|
|
|
|
2021-11-10 10:35:44 +00:00
|
|
|
total_size = data_stream->Length() + offset;
|
2021-10-11 14:54:22 +00:00
|
|
|
|
|
|
|
initialized = true;
|
2021-10-04 13:46:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|