2022-09-28 08:45:15 +00:00
|
|
|
#include "config.h"
|
2023-01-27 18:47:22 +00:00
|
|
|
#include <IO/S3Common.h>
|
2019-12-06 14:37:21 +00:00
|
|
|
|
|
|
|
#if USE_AWS_S3
|
2019-05-31 10:58:43 +00:00
|
|
|
|
2021-10-15 10:13:38 +00:00
|
|
|
#include <IO/ReadBufferFromIStream.h>
|
|
|
|
#include <IO/ReadBufferFromS3.h>
|
2023-01-27 18:47:22 +00:00
|
|
|
#include <IO/ResourceGuard.h>
|
2023-01-27 14:09:38 +00:00
|
|
|
#include <IO/S3/getObjectInfo.h>
|
2023-02-03 13:30:52 +00:00
|
|
|
#include <IO/S3/Requests.h>
|
2019-05-31 10:58:43 +00:00
|
|
|
|
2022-07-11 12:59:39 +00:00
|
|
|
#include <Common/Stopwatch.h>
|
|
|
|
#include <Common/Throttler.h>
|
2022-04-27 15:05:45 +00:00
|
|
|
#include <Common/logger_useful.h>
|
2023-02-07 17:50:31 +00:00
|
|
|
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
2021-10-15 10:13:38 +00:00
|
|
|
#include <base/sleep.h>
|
|
|
|
|
|
|
|
#include <utility>
|
2020-01-22 16:17:25 +00:00
|
|
|
|
2021-04-13 19:11:58 +00:00
|
|
|
|
2020-07-10 09:32:34 +00:00
|
|
|
namespace ProfileEvents
|
|
|
|
{
|
2022-04-24 01:42:18 +00:00
|
|
|
extern const Event ReadBufferFromS3Microseconds;
|
2023-02-07 17:50:31 +00:00
|
|
|
extern const Event ReadBufferFromS3InitMicroseconds;
|
2022-04-24 01:42:18 +00:00
|
|
|
extern const Event ReadBufferFromS3Bytes;
|
|
|
|
extern const Event ReadBufferFromS3RequestsErrors;
|
2021-10-31 19:53:24 +00:00
|
|
|
extern const Event ReadBufferSeekCancelConnection;
|
2022-09-19 17:23:22 +00:00
|
|
|
extern const Event S3GetObject;
|
2022-09-19 18:10:47 +00:00
|
|
|
extern const Event DiskS3GetObject;
|
2023-01-17 18:29:24 +00:00
|
|
|
extern const Event RemoteReadThrottlerBytes;
|
|
|
|
extern const Event RemoteReadThrottlerSleepMicroseconds;
|
2020-07-10 09:32:34 +00:00
|
|
|
}
|
|
|
|
|
2019-05-31 10:58:43 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
namespace ErrorCodes
|
2019-05-31 18:14:39 +00:00
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
extern const int S3_ERROR;
|
2020-01-28 12:48:01 +00:00
|
|
|
extern const int CANNOT_SEEK_THROUGH_FILE;
|
|
|
|
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
2021-10-15 10:13:38 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2022-09-06 16:38:34 +00:00
|
|
|
extern const int CANNOT_ALLOCATE_MEMORY;
|
2019-12-03 16:23:24 +00:00
|
|
|
}
|
2019-05-31 18:14:39 +00:00
|
|
|
|
|
|
|
|
2020-01-22 16:17:25 +00:00
|
|
|
ReadBufferFromS3::ReadBufferFromS3(
|
2023-02-03 13:30:52 +00:00
|
|
|
std::shared_ptr<const S3::Client> client_ptr_,
|
2021-10-31 19:53:24 +00:00
|
|
|
const String & bucket_,
|
|
|
|
const String & key_,
|
2022-03-15 00:10:14 +00:00
|
|
|
const String & version_id_,
|
2022-11-17 16:35:04 +00:00
|
|
|
const S3Settings::RequestSettings & request_settings_,
|
2021-10-31 19:53:24 +00:00
|
|
|
const ReadSettings & settings_,
|
|
|
|
bool use_external_buffer_,
|
2022-03-23 08:15:18 +00:00
|
|
|
size_t offset_,
|
2022-01-13 11:57:56 +00:00
|
|
|
size_t read_until_position_,
|
|
|
|
bool restricted_seek_)
|
2022-09-01 14:26:05 +00:00
|
|
|
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
|
2021-04-12 22:25:19 +00:00
|
|
|
, client_ptr(std::move(client_ptr_))
|
|
|
|
, bucket(bucket_)
|
|
|
|
, key(key_)
|
2022-03-15 00:10:14 +00:00
|
|
|
, version_id(version_id_)
|
2022-11-17 16:35:04 +00:00
|
|
|
, request_settings(request_settings_)
|
2022-03-23 08:15:18 +00:00
|
|
|
, offset(offset_)
|
|
|
|
, read_until_position(read_until_position_)
|
2021-10-10 22:53:51 +00:00
|
|
|
, read_settings(settings_)
|
2021-09-24 12:12:11 +00:00
|
|
|
, use_external_buffer(use_external_buffer_)
|
2022-01-13 11:57:56 +00:00
|
|
|
, restricted_seek(restricted_seek_)
|
2019-12-03 16:23:24 +00:00
|
|
|
{
|
2020-01-22 16:17:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool ReadBufferFromS3::nextImpl()
|
|
|
|
{
|
2021-10-20 22:57:43 +00:00
|
|
|
if (read_until_position)
|
2021-10-15 08:36:26 +00:00
|
|
|
{
|
2021-10-20 22:57:43 +00:00
|
|
|
if (read_until_position == offset)
|
2021-10-15 08:36:26 +00:00
|
|
|
return false;
|
2021-10-15 10:13:38 +00:00
|
|
|
|
2021-10-20 22:57:43 +00:00
|
|
|
if (read_until_position < offset)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
2021-10-15 08:36:26 +00:00
|
|
|
}
|
|
|
|
|
2021-04-12 22:25:19 +00:00
|
|
|
bool next_result = false;
|
|
|
|
|
2021-07-09 12:37:55 +00:00
|
|
|
if (impl)
|
|
|
|
{
|
2021-10-15 10:13:38 +00:00
|
|
|
if (use_external_buffer)
|
|
|
|
{
|
|
|
|
/**
|
|
|
|
* use_external_buffer -- means we read into the buffer which
|
|
|
|
* was passed to us from somewhere else. We do not check whether
|
2021-10-28 22:04:52 +00:00
|
|
|
* previously returned buffer was read or not (no hasPendingData() check is needed),
|
|
|
|
* because this branch means we are prefetching data,
|
|
|
|
* each nextImpl() call we can fill a different buffer.
|
2021-10-15 10:13:38 +00:00
|
|
|
*/
|
|
|
|
impl->set(internal_buffer.begin(), internal_buffer.size());
|
|
|
|
assert(working_buffer.begin() != nullptr);
|
|
|
|
assert(!internal_buffer.empty());
|
|
|
|
}
|
|
|
|
else
|
2021-09-24 12:12:11 +00:00
|
|
|
{
|
2021-10-01 08:38:58 +00:00
|
|
|
/**
|
2021-10-28 22:04:52 +00:00
|
|
|
* impl was initialized before, pass position() to it to make
|
|
|
|
* sure there is no pending data which was not read.
|
2021-10-01 08:38:58 +00:00
|
|
|
*/
|
2021-09-24 12:12:11 +00:00
|
|
|
impl->position() = position();
|
2021-09-30 12:35:59 +00:00
|
|
|
assert(!impl->hasPendingData());
|
2021-09-24 12:12:11 +00:00
|
|
|
}
|
2021-07-09 12:37:55 +00:00
|
|
|
}
|
2021-06-12 12:57:14 +00:00
|
|
|
|
2021-10-15 10:13:38 +00:00
|
|
|
size_t sleep_time_with_backoff_milliseconds = 100;
|
2022-11-17 16:35:04 +00:00
|
|
|
for (size_t attempt = 0; attempt < request_settings.max_single_read_retries && !next_result; ++attempt)
|
2021-04-12 22:25:19 +00:00
|
|
|
{
|
2021-08-09 16:19:45 +00:00
|
|
|
Stopwatch watch;
|
2021-04-12 22:25:19 +00:00
|
|
|
try
|
|
|
|
{
|
2021-10-15 10:13:38 +00:00
|
|
|
if (!impl)
|
|
|
|
{
|
|
|
|
impl = initialize();
|
|
|
|
|
|
|
|
if (use_external_buffer)
|
|
|
|
{
|
|
|
|
impl->set(internal_buffer.begin(), internal_buffer.size());
|
|
|
|
assert(working_buffer.begin() != nullptr);
|
|
|
|
assert(!internal_buffer.empty());
|
|
|
|
}
|
2022-06-02 15:09:42 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
/// use the buffer returned by `impl`
|
|
|
|
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
|
|
|
}
|
2021-10-15 10:13:38 +00:00
|
|
|
}
|
|
|
|
|
2021-07-09 12:37:55 +00:00
|
|
|
/// Try to read a next portion of data.
|
2021-04-12 22:25:19 +00:00
|
|
|
next_result = impl->next();
|
2021-08-09 16:19:45 +00:00
|
|
|
watch.stop();
|
2022-04-24 01:42:18 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
2021-04-12 22:25:19 +00:00
|
|
|
break;
|
|
|
|
}
|
2022-09-14 15:25:58 +00:00
|
|
|
catch (Exception & e)
|
2021-04-12 22:25:19 +00:00
|
|
|
{
|
2021-08-09 16:19:45 +00:00
|
|
|
watch.stop();
|
2022-04-24 01:42:18 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
|
|
|
|
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1);
|
2021-04-12 22:25:19 +00:00
|
|
|
|
2022-09-14 15:25:58 +00:00
|
|
|
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
|
2022-09-06 16:22:44 +00:00
|
|
|
{
|
|
|
|
/// It doesn't make sense to retry Access Denied or No Such Key
|
|
|
|
if (!s3_exception->isRetryableError())
|
|
|
|
{
|
2022-09-14 15:25:58 +00:00
|
|
|
s3_exception->addMessage("while reading key: {}, from bucket: {}", key, bucket);
|
2022-09-06 16:22:44 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// It doesn't make sense to retry allocator errors
|
|
|
|
if (e.code() == ErrorCodes::CANNOT_ALLOCATE_MEMORY)
|
2022-09-06 11:59:55 +00:00
|
|
|
{
|
|
|
|
tryLogCurrentException(log);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2022-03-15 00:10:14 +00:00
|
|
|
LOG_DEBUG(
|
|
|
|
log,
|
|
|
|
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, Attempt: {}, Message: {}",
|
|
|
|
bucket,
|
|
|
|
key,
|
|
|
|
version_id.empty() ? "Latest" : version_id,
|
|
|
|
getPosition(),
|
|
|
|
attempt,
|
|
|
|
e.message());
|
2021-04-13 09:09:03 +00:00
|
|
|
|
2022-11-17 16:35:04 +00:00
|
|
|
if (attempt + 1 == request_settings.max_single_read_retries)
|
2021-08-09 16:19:45 +00:00
|
|
|
throw;
|
|
|
|
|
2021-07-09 12:37:55 +00:00
|
|
|
/// Pause before next attempt.
|
2021-10-15 10:13:38 +00:00
|
|
|
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
|
2021-07-09 12:37:55 +00:00
|
|
|
sleep_time_with_backoff_milliseconds *= 2;
|
|
|
|
|
|
|
|
/// Try to reinitialize `impl`.
|
2021-04-12 22:25:19 +00:00
|
|
|
impl.reset();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!next_result)
|
2020-01-22 16:17:25 +00:00
|
|
|
return false;
|
2021-06-02 15:03:25 +00:00
|
|
|
|
2022-06-02 15:09:42 +00:00
|
|
|
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
2020-07-10 09:32:34 +00:00
|
|
|
|
2022-04-24 01:42:18 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, working_buffer.size());
|
2021-06-02 15:03:25 +00:00
|
|
|
offset += working_buffer.size();
|
2022-07-14 15:33:22 +00:00
|
|
|
if (read_settings.remote_throttler)
|
2023-01-17 18:29:24 +00:00
|
|
|
read_settings.remote_throttler->add(working_buffer.size(), ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
|
2021-06-02 15:03:25 +00:00
|
|
|
|
2020-01-22 16:17:25 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-10-31 19:53:24 +00:00
|
|
|
|
2020-01-28 12:48:01 +00:00
|
|
|
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
2020-01-27 19:17:22 +00:00
|
|
|
{
|
2022-01-26 18:43:23 +00:00
|
|
|
if (offset_ == offset && whence == SEEK_SET)
|
|
|
|
return offset;
|
|
|
|
|
2021-10-31 19:53:24 +00:00
|
|
|
if (impl && restricted_seek)
|
2023-03-30 12:21:13 +00:00
|
|
|
{
|
2022-03-08 13:29:09 +00:00
|
|
|
throw Exception(
|
2023-03-30 12:21:13 +00:00
|
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Seek is allowed only before first read attempt from the buffer (current offset: "
|
|
|
|
"{}, new offset: {}, reading until position: {}, available: {})",
|
|
|
|
offset, offset_, read_until_position, available());
|
|
|
|
}
|
2021-06-07 10:49:34 +00:00
|
|
|
|
2020-01-28 13:05:37 +00:00
|
|
|
if (whence != SEEK_SET)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed.");
|
2020-01-28 12:48:01 +00:00
|
|
|
|
2020-01-28 13:05:37 +00:00
|
|
|
if (offset_ < 0)
|
2023-01-23 21:13:58 +00:00
|
|
|
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);
|
2020-01-28 12:48:01 +00:00
|
|
|
|
2021-10-31 19:53:24 +00:00
|
|
|
if (!restricted_seek)
|
|
|
|
{
|
|
|
|
if (!working_buffer.empty()
|
2022-04-18 08:18:31 +00:00
|
|
|
&& static_cast<size_t>(offset_) >= offset - working_buffer.size()
|
2021-10-31 19:53:24 +00:00
|
|
|
&& offset_ < offset)
|
|
|
|
{
|
|
|
|
pos = working_buffer.end() - (offset - offset_);
|
|
|
|
assert(pos >= working_buffer.begin());
|
|
|
|
assert(pos <= working_buffer.end());
|
|
|
|
|
|
|
|
return getPosition();
|
|
|
|
}
|
|
|
|
|
|
|
|
auto position = getPosition();
|
|
|
|
if (offset_ > position)
|
|
|
|
{
|
|
|
|
size_t diff = offset_ - position;
|
|
|
|
if (diff < read_settings.remote_read_min_bytes_for_seek)
|
|
|
|
{
|
|
|
|
ignore(diff);
|
|
|
|
return offset_;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-07 14:20:26 +00:00
|
|
|
resetWorkingBuffer();
|
2021-10-31 19:53:24 +00:00
|
|
|
if (impl)
|
|
|
|
{
|
|
|
|
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
|
|
|
|
impl.reset();
|
|
|
|
}
|
|
|
|
}
|
2020-01-22 16:17:25 +00:00
|
|
|
|
2021-10-31 19:53:24 +00:00
|
|
|
offset = offset_;
|
2020-01-22 16:17:25 +00:00
|
|
|
return offset;
|
|
|
|
}
|
|
|
|
|
2022-05-25 14:49:40 +00:00
|
|
|
size_t ReadBufferFromS3::getFileSize()
|
2021-10-31 19:53:24 +00:00
|
|
|
{
|
|
|
|
if (file_size)
|
2022-05-25 14:49:40 +00:00
|
|
|
return *file_size;
|
2021-10-31 19:53:24 +00:00
|
|
|
|
2023-01-27 13:08:19 +00:00
|
|
|
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings, /* for_disk_s3= */ read_settings.for_object_storage);
|
2022-09-12 08:29:42 +00:00
|
|
|
|
2022-03-28 08:19:23 +00:00
|
|
|
file_size = object_size;
|
2022-05-25 14:49:40 +00:00
|
|
|
return *file_size;
|
2021-10-31 19:53:24 +00:00
|
|
|
}
|
|
|
|
|
2020-02-14 14:28:33 +00:00
|
|
|
off_t ReadBufferFromS3::getPosition()
|
|
|
|
{
|
2021-06-02 15:03:25 +00:00
|
|
|
return offset - available();
|
2020-02-14 14:28:33 +00:00
|
|
|
}
|
|
|
|
|
2022-01-13 11:57:56 +00:00
|
|
|
void ReadBufferFromS3::setReadUntilPosition(size_t position)
|
|
|
|
{
|
2022-01-26 09:35:46 +00:00
|
|
|
if (position != static_cast<size_t>(read_until_position))
|
|
|
|
{
|
|
|
|
read_until_position = position;
|
|
|
|
impl.reset();
|
|
|
|
}
|
2022-01-13 11:57:56 +00:00
|
|
|
}
|
|
|
|
|
2022-03-28 08:19:23 +00:00
|
|
|
SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
|
|
|
|
{
|
2022-04-04 14:32:37 +00:00
|
|
|
return Range{ .left = static_cast<size_t>(offset), .right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt };
|
2022-03-28 08:19:23 +00:00
|
|
|
}
|
|
|
|
|
2020-01-27 19:17:22 +00:00
|
|
|
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
|
|
|
{
|
2023-02-03 13:30:52 +00:00
|
|
|
S3::GetObjectRequest req;
|
2019-12-03 16:23:24 +00:00
|
|
|
req.SetBucket(bucket);
|
|
|
|
req.SetKey(key);
|
2022-03-15 00:10:14 +00:00
|
|
|
if (!version_id.empty())
|
|
|
|
{
|
|
|
|
req.SetVersionId(version_id);
|
|
|
|
}
|
2021-10-10 22:53:51 +00:00
|
|
|
|
2021-10-28 22:04:52 +00:00
|
|
|
/**
|
2021-11-10 07:46:18 +00:00
|
|
|
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
|
2021-10-28 22:04:52 +00:00
|
|
|
* exact byte ranges to read are always passed here.
|
|
|
|
*/
|
2021-10-20 22:57:43 +00:00
|
|
|
if (read_until_position)
|
2021-10-10 22:53:51 +00:00
|
|
|
{
|
2021-10-20 22:57:43 +00:00
|
|
|
if (offset >= read_until_position)
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
|
2021-10-15 10:13:38 +00:00
|
|
|
|
2021-10-20 22:57:43 +00:00
|
|
|
req.SetRange(fmt::format("bytes={}-{}", offset, read_until_position - 1));
|
2022-03-15 00:10:14 +00:00
|
|
|
LOG_TEST(
|
|
|
|
log,
|
|
|
|
"Read S3 object. Bucket: {}, Key: {}, Version: {}, Range: {}-{}",
|
|
|
|
bucket,
|
|
|
|
key,
|
|
|
|
version_id.empty() ? "Latest" : version_id,
|
|
|
|
offset,
|
|
|
|
read_until_position - 1);
|
2021-10-10 22:53:51 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-12-22 08:42:23 +00:00
|
|
|
if (offset)
|
|
|
|
req.SetRange(fmt::format("bytes={}-", offset));
|
2022-03-15 00:10:14 +00:00
|
|
|
LOG_TEST(
|
|
|
|
log,
|
|
|
|
"Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
|
|
|
|
bucket,
|
|
|
|
key,
|
|
|
|
version_id.empty() ? "Latest" : version_id,
|
|
|
|
offset);
|
2021-10-10 22:53:51 +00:00
|
|
|
}
|
2019-05-31 18:14:39 +00:00
|
|
|
|
2022-09-19 18:40:32 +00:00
|
|
|
ProfileEvents::increment(ProfileEvents::S3GetObject);
|
|
|
|
if (read_settings.for_object_storage)
|
|
|
|
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
|
|
|
|
|
2023-02-07 17:50:31 +00:00
|
|
|
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
|
2023-02-11 10:36:23 +00:00
|
|
|
|
2023-01-27 18:47:22 +00:00
|
|
|
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
|
|
|
|
constexpr ResourceCost estimated_cost = 1;
|
|
|
|
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
|
2019-12-03 16:23:24 +00:00
|
|
|
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
|
2023-01-27 18:47:22 +00:00
|
|
|
rlock.unlock();
|
2019-05-31 18:14:39 +00:00
|
|
|
|
2019-12-06 14:48:56 +00:00
|
|
|
if (outcome.IsSuccess())
|
|
|
|
{
|
2023-01-27 18:47:22 +00:00
|
|
|
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
|
|
|
|
read_settings.resource_link.adjust(estimated_cost, bytes_read);
|
2022-02-21 18:50:27 +00:00
|
|
|
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
|
2023-01-27 18:47:22 +00:00
|
|
|
read_result = outcome.GetResultWithOwnership();
|
2022-02-21 18:50:27 +00:00
|
|
|
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
|
2019-12-03 16:23:24 +00:00
|
|
|
}
|
2019-12-06 14:48:56 +00:00
|
|
|
else
|
2022-09-06 11:59:55 +00:00
|
|
|
{
|
2023-01-27 18:47:22 +00:00
|
|
|
read_settings.resource_link.accumulate(estimated_cost);
|
2022-09-06 11:59:55 +00:00
|
|
|
const auto & error = outcome.GetError();
|
|
|
|
throw S3Exception(error.GetMessage(), error.GetErrorType());
|
|
|
|
}
|
2019-05-31 18:14:39 +00:00
|
|
|
}
|
|
|
|
|
2022-03-24 09:38:11 +00:00
|
|
|
SeekableReadBufferPtr ReadBufferS3Factory::getReader()
|
|
|
|
{
|
|
|
|
const auto next_range = range_generator.nextRange();
|
|
|
|
if (!next_range)
|
|
|
|
return nullptr;
|
|
|
|
|
|
|
|
auto reader = std::make_shared<ReadBufferFromS3>(
|
|
|
|
client_ptr,
|
|
|
|
bucket,
|
|
|
|
key,
|
2022-04-07 14:07:12 +00:00
|
|
|
version_id,
|
2022-11-17 16:35:04 +00:00
|
|
|
request_settings,
|
2022-11-26 03:24:11 +00:00
|
|
|
read_settings.adjustBufferSize(object_size),
|
2022-03-24 09:38:11 +00:00
|
|
|
false /*use_external_buffer*/,
|
|
|
|
next_range->first,
|
|
|
|
next_range->second);
|
2022-11-26 03:24:11 +00:00
|
|
|
|
2022-03-24 09:38:11 +00:00
|
|
|
return reader;
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t ReadBufferS3Factory::seek(off_t off, [[maybe_unused]] int whence)
|
|
|
|
{
|
|
|
|
range_generator = RangeGenerator{object_size, range_step, static_cast<size_t>(off)};
|
|
|
|
return off;
|
|
|
|
}
|
|
|
|
|
2022-05-25 14:49:40 +00:00
|
|
|
size_t ReadBufferS3Factory::getFileSize()
|
2022-03-24 09:38:11 +00:00
|
|
|
{
|
|
|
|
return object_size;
|
|
|
|
}
|
2019-05-31 10:58:43 +00:00
|
|
|
}
|
2019-12-06 14:37:21 +00:00
|
|
|
|
2019-12-09 12:36:06 +00:00
|
|
|
#endif
|