ClickHouse/src/IO/ReadBufferFromS3.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

481 lines
15 KiB
C++
Raw Normal View History

2023-01-27 18:47:22 +00:00
#include <IO/S3Common.h>
2023-06-16 13:30:56 +00:00
#include "config.h"
2019-12-06 14:37:21 +00:00
#if USE_AWS_S3
2019-05-31 10:58:43 +00:00
2021-10-15 10:13:38 +00:00
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
2023-01-27 18:47:22 +00:00
#include <IO/ResourceGuard.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/Requests.h>
2019-05-31 10:58:43 +00:00
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2023-02-07 17:50:31 +00:00
#include <Common/ElapsedTimeProfileEventIncrement.h>
2021-10-15 10:13:38 +00:00
#include <base/sleep.h>
#include <utility>
2021-04-13 19:11:58 +00:00
namespace ProfileEvents
{
extern const Event ReadBufferFromS3Microseconds;
2023-02-07 17:50:31 +00:00
extern const Event ReadBufferFromS3InitMicroseconds;
extern const Event ReadBufferFromS3Bytes;
extern const Event ReadBufferFromS3RequestsErrors;
2023-06-16 13:30:56 +00:00
extern const Event ReadBufferFromS3ResetSessions;
extern const Event ReadBufferFromS3PreservedSessions;
2021-10-31 19:53:24 +00:00
extern const Event ReadBufferSeekCancelConnection;
2022-09-19 17:23:22 +00:00
extern const Event S3GetObject;
extern const Event DiskS3GetObject;
extern const Event RemoteReadThrottlerBytes;
extern const Event RemoteReadThrottlerSleepMicroseconds;
}
2023-06-12 14:19:05 +00:00
namespace
{
void resetSession(Aws::S3::Model::GetObjectResult & read_result)
{
if (auto * session_aware_stream = dynamic_cast<DB::S3::SessionAwareIOStream<DB::PooledHTTPSessionPtr> *>(&read_result.GetBody()))
{
auto & session
= static_cast<Poco::Net::HTTPClientSession &>(*static_cast<DB::PooledHTTPSessionPtr &>(session_aware_stream->getSession()));
session.reset();
}
else if (!dynamic_cast<DB::S3::SessionAwareIOStream<DB::HTTPSessionPtr> *>(&read_result.GetBody()))
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session of unexpected type encountered");
}
}
2023-06-16 13:30:56 +00:00
void resetSessionIfNeeded(bool read_all_range_successfully, std::optional<Aws::S3::Model::GetObjectResult> & read_result)
{
if (!read_all_range_successfully && read_result)
{
/// When we abandon a session with an ongoing GetObject request and there is another one trying to delete the same object this delete
/// operation will hang until GetObject's session idle timeouts. So we have to call `reset()` on GetObject's session session immediately.
resetSession(*read_result);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3ResetSessions);
}
else
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3PreservedSessions);
}
2023-06-12 14:19:05 +00:00
}
2019-05-31 10:58:43 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
namespace ErrorCodes
{
2019-12-03 16:23:24 +00:00
extern const int S3_ERROR;
2020-01-28 12:48:01 +00:00
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
2021-10-15 10:13:38 +00:00
extern const int LOGICAL_ERROR;
2022-09-06 16:38:34 +00:00
extern const int CANNOT_ALLOCATE_MEMORY;
2019-12-03 16:23:24 +00:00
}
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
2021-10-31 19:53:24 +00:00
const String & bucket_,
const String & key_,
const String & version_id_,
const S3Settings::RequestSettings & request_settings_,
2021-10-31 19:53:24 +00:00
const ReadSettings & settings_,
bool use_external_buffer_,
2022-03-23 08:15:18 +00:00
size_t offset_,
2022-01-13 11:57:56 +00:00
size_t read_until_position_,
2023-03-28 20:28:28 +00:00
bool restricted_seek_,
std::optional<size_t> file_size_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
2021-04-12 22:25:19 +00:00
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, version_id(version_id_)
, request_settings(request_settings_)
2022-03-23 08:15:18 +00:00
, offset(offset_)
, read_until_position(read_until_position_)
2021-10-10 22:53:51 +00:00
, read_settings(settings_)
2021-09-24 12:12:11 +00:00
, use_external_buffer(use_external_buffer_)
2022-01-13 11:57:56 +00:00
, restricted_seek(restricted_seek_)
2019-12-03 16:23:24 +00:00
{
}
bool ReadBufferFromS3::nextImpl()
{
2021-10-20 22:57:43 +00:00
if (read_until_position)
{
2021-10-20 22:57:43 +00:00
if (read_until_position == offset)
return false;
2021-10-15 10:13:38 +00:00
2021-10-20 22:57:43 +00:00
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
}
2021-04-12 22:25:19 +00:00
bool next_result = false;
2021-07-09 12:37:55 +00:00
if (impl)
{
2021-10-15 10:13:38 +00:00
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
2021-10-28 22:04:52 +00:00
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
2021-10-15 10:13:38 +00:00
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
2021-09-24 12:12:11 +00:00
{
2021-10-01 08:38:58 +00:00
/**
2021-10-28 22:04:52 +00:00
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read.
2021-10-01 08:38:58 +00:00
*/
2021-09-24 12:12:11 +00:00
impl->position() = position();
2021-09-30 12:35:59 +00:00
assert(!impl->hasPendingData());
2021-09-24 12:12:11 +00:00
}
2021-07-09 12:37:55 +00:00
}
2021-06-12 12:57:14 +00:00
2021-10-15 10:13:38 +00:00
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 0; !next_result; ++attempt)
2021-04-12 22:25:19 +00:00
{
bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
2021-04-12 22:25:19 +00:00
try
{
2021-10-15 10:13:38 +00:00
if (!impl)
{
impl = initialize();
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
{
/// use the buffer returned by `impl`
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
}
2021-10-15 10:13:38 +00:00
}
2021-07-09 12:37:55 +00:00
/// Try to read a next portion of data.
2021-04-12 22:25:19 +00:00
next_result = impl->next();
break;
}
catch (Exception & e)
2021-04-12 22:25:19 +00:00
{
if (!processException(e, getPosition(), attempt) || last_attempt)
2021-08-09 16:19:45 +00:00
throw;
2021-07-09 12:37:55 +00:00
/// Pause before next attempt.
2021-10-15 10:13:38 +00:00
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
2021-07-09 12:37:55 +00:00
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
2023-03-28 20:28:28 +00:00
resetWorkingBuffer();
2021-04-12 22:25:19 +00:00
impl.reset();
}
}
if (!next_result)
2023-06-12 14:19:05 +00:00
{
read_all_range_successfully = true;
return false;
2023-06-12 14:19:05 +00:00
}
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, working_buffer.size());
offset += working_buffer.size();
2022-07-14 15:33:22 +00:00
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(working_buffer.size(), ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
return true;
}
2021-10-31 19:53:24 +00:00
size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback)
{
if (n == 0)
return 0;
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 0;; ++attempt)
{
bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries;
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3Microseconds);
try
{
auto result = sendRequest(range_begin, range_begin + n - 1);
std::istream & istr = result.GetBody();
size_t bytes = copyFromIStreamWithProgressCallback(istr, to, n, progress_callback);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, bytes);
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
return bytes;
}
catch (Poco::Exception & e)
{
if (!processException(e, range_begin, attempt) || last_attempt)
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
}
}
}
bool ReadBufferFromS3::processException(Poco::Exception & e, size_t read_offset, size_t attempt) const
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1);
LOG_DEBUG(
log,
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, "
"Attempt: {}, Message: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, e.message());
if (auto * s3_exception = dynamic_cast<S3Exception *>(&e))
{
/// It doesn't make sense to retry Access Denied or No Such Key
if (!s3_exception->isRetryableError())
{
s3_exception->addMessage("while reading key: {}, from bucket: {}", key, bucket);
return false;
}
}
/// It doesn't make sense to retry allocator errors
if (e.code() == ErrorCodes::CANNOT_ALLOCATE_MEMORY)
{
tryLogCurrentException(log);
return false;
}
return true;
}
2020-01-28 12:48:01 +00:00
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
2020-01-27 19:17:22 +00:00
{
2023-03-28 20:28:28 +00:00
if (offset_ == getPosition() && whence == SEEK_SET)
return offset_;
2023-06-12 14:19:05 +00:00
read_all_range_successfully = false;
2021-10-31 19:53:24 +00:00
if (impl && restricted_seek)
2023-03-30 12:21:13 +00:00
{
2022-03-08 13:29:09 +00:00
throw Exception(
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
2023-03-30 12:21:13 +00:00
"Seek is allowed only before first read attempt from the buffer (current offset: "
"{}, new offset: {}, reading until position: {}, available: {})",
getPosition(), offset_, read_until_position, available());
2023-03-30 12:21:13 +00:00
}
2021-06-07 10:49:34 +00:00
2020-01-28 13:05:37 +00:00
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed.");
2020-01-28 12:48:01 +00:00
2020-01-28 13:05:37 +00:00
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);
2020-01-28 12:48:01 +00:00
2021-10-31 19:53:24 +00:00
if (!restricted_seek)
{
if (!working_buffer.empty()
&& static_cast<size_t>(offset_) >= offset - working_buffer.size()
2021-10-31 19:53:24 +00:00
&& offset_ < offset)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
2023-03-28 20:28:28 +00:00
assert(pos < working_buffer.end());
2021-10-31 19:53:24 +00:00
return getPosition();
}
2023-03-13 19:29:59 +00:00
off_t position = getPosition();
if (impl && offset_ > position)
2021-10-31 19:53:24 +00:00
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
resetWorkingBuffer();
2021-10-31 19:53:24 +00:00
if (impl)
{
2023-03-13 19:29:59 +00:00
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
2021-10-31 19:53:24 +00:00
impl.reset();
}
}
2021-10-31 19:53:24 +00:00
offset = offset_;
return offset;
}
2022-05-25 14:49:40 +00:00
size_t ReadBufferFromS3::getFileSize()
2021-10-31 19:53:24 +00:00
{
if (file_size)
2022-05-25 14:49:40 +00:00
return *file_size;
2021-10-31 19:53:24 +00:00
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings, /* for_disk_s3= */ read_settings.for_object_storage);
2022-09-12 08:29:42 +00:00
2022-03-28 08:19:23 +00:00
file_size = object_size;
2022-05-25 14:49:40 +00:00
return *file_size;
2021-10-31 19:53:24 +00:00
}
2020-02-14 14:28:33 +00:00
off_t ReadBufferFromS3::getPosition()
{
return offset - available();
2020-02-14 14:28:33 +00:00
}
2022-01-13 11:57:56 +00:00
void ReadBufferFromS3::setReadUntilPosition(size_t position)
{
2022-01-26 09:35:46 +00:00
if (position != static_cast<size_t>(read_until_position))
{
2023-06-12 14:19:05 +00:00
read_all_range_successfully = false;
if (impl)
{
2023-03-13 19:29:59 +00:00
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
2023-03-28 20:28:28 +00:00
offset = getPosition();
resetWorkingBuffer();
impl.reset();
}
2022-01-26 09:35:46 +00:00
read_until_position = position;
}
2022-01-13 11:57:56 +00:00
}
void ReadBufferFromS3::setReadUntilEnd()
2022-03-28 08:19:23 +00:00
{
if (read_until_position)
{
2023-06-12 14:19:05 +00:00
read_all_range_successfully = false;
read_until_position = 0;
if (impl)
{
2023-03-13 19:29:59 +00:00
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
2023-03-28 20:28:28 +00:00
offset = getPosition();
resetWorkingBuffer();
impl.reset();
}
2022-01-26 09:35:46 +00:00
}
2022-01-13 11:57:56 +00:00
}
2023-03-28 20:28:28 +00:00
bool ReadBufferFromS3::atEndOfRequestedRangeGuess()
{
2023-03-13 19:29:59 +00:00
if (!impl)
return true;
if (read_until_position)
return getPosition() >= read_until_position;
if (file_size)
return getPosition() >= static_cast<off_t>(*file_size);
2023-03-13 19:29:59 +00:00
return false;
2022-03-28 08:19:23 +00:00
}
2023-06-12 14:19:05 +00:00
ReadBufferFromS3::~ReadBufferFromS3()
{
try
{
2023-06-16 13:30:56 +00:00
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
2023-06-12 14:19:05 +00:00
}
catch (...)
{
tryLogCurrentException(log);
}
}
2020-01-27 19:17:22 +00:00
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
2023-06-16 13:30:56 +00:00
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
2023-06-12 14:19:05 +00:00
read_all_range_successfully = false;
/**
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
* exact byte ranges to read are always passed here.
*/
if (read_until_position && offset >= read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
read_result = sendRequest(offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
2023-06-12 14:19:05 +00:00
return std::make_unique<ReadBufferFromIStream>(read_result->GetBody(), buffer_size);
}
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const
2020-01-27 19:17:22 +00:00
{
S3::GetObjectRequest req;
2019-12-03 16:23:24 +00:00
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
2021-10-10 22:53:51 +00:00
if (range_end_incl)
2021-10-10 22:53:51 +00:00
{
req.SetRange(fmt::format("bytes={}-{}", range_begin, *range_end_incl));
LOG_TEST(
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Range: {}-{}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin, *range_end_incl);
2021-10-10 22:53:51 +00:00
}
else if (range_begin)
2021-10-10 22:53:51 +00:00
{
req.SetRange(fmt::format("bytes={}-", range_begin));
LOG_TEST(
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
2021-10-10 22:53:51 +00:00
}
2022-09-19 18:40:32 +00:00
ProfileEvents::increment(ProfileEvents::S3GetObject);
if (read_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
2023-02-07 17:50:31 +00:00
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
2023-01-27 18:47:22 +00:00
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
2023-01-27 18:47:22 +00:00
rlock.unlock();
2019-12-06 14:48:56 +00:00
if (outcome.IsSuccess())
{
2023-01-27 18:47:22 +00:00
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
read_settings.resource_link.adjust(estimated_cost, bytes_read);
return outcome.GetResultWithOwnership();
2019-12-03 16:23:24 +00:00
}
2019-12-06 14:48:56 +00:00
else
{
2023-01-27 18:47:22 +00:00
read_settings.resource_link.accumulate(estimated_cost);
const auto & error = outcome.GetError();
throw S3Exception(error.GetMessage(), error.GetErrorType());
}
}
2023-06-16 13:30:56 +00:00
bool ReadBufferFromS3::readAllRangeSuccessfully() const
{
return read_until_position ? offset == read_until_position : read_all_range_successfully;
}
2019-05-31 10:58:43 +00:00
}
2019-12-06 14:37:21 +00:00
#endif