ClickHouse/src/IO/ReadBufferFromS3.cpp

276 lines
8.1 KiB
C++
Raw Normal View History

2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
2019-05-31 10:58:43 +00:00
2021-10-15 10:13:38 +00:00
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
#include <Common/Stopwatch.h>
2021-10-15 10:13:38 +00:00
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
2021-10-31 19:53:24 +00:00
#include <aws/s3/model/HeadObjectRequest.h>
2019-05-31 10:58:43 +00:00
2021-10-15 10:13:38 +00:00
#include <base/logger_useful.h>
#include <base/sleep.h>
#include <utility>
2021-04-13 19:11:58 +00:00
namespace ProfileEvents
{
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
2021-04-12 22:25:19 +00:00
extern const Event S3ReadRequestsErrors;
2021-10-31 19:53:24 +00:00
extern const Event ReadBufferSeekCancelConnection;
}
2019-05-31 10:58:43 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
namespace ErrorCodes
{
2019-12-03 16:23:24 +00:00
extern const int S3_ERROR;
2020-01-28 12:48:01 +00:00
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
2021-10-15 10:13:38 +00:00
extern const int LOGICAL_ERROR;
2019-12-03 16:23:24 +00:00
}
ReadBufferFromS3::ReadBufferFromS3(
2021-10-31 19:53:24 +00:00
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer_,
2022-01-13 11:57:56 +00:00
size_t read_until_position_,
bool restricted_seek_)
2021-10-31 19:53:24 +00:00
: SeekableReadBufferWithSize(nullptr, 0)
2021-04-12 22:25:19 +00:00
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
2021-05-19 21:42:25 +00:00
, max_single_read_retries(max_single_read_retries_)
2021-10-10 22:53:51 +00:00
, read_settings(settings_)
2021-09-24 12:12:11 +00:00
, use_external_buffer(use_external_buffer_)
2021-10-20 22:57:43 +00:00
, read_until_position(read_until_position_)
2022-01-13 11:57:56 +00:00
, restricted_seek(restricted_seek_)
2019-12-03 16:23:24 +00:00
{
}
bool ReadBufferFromS3::nextImpl()
{
2021-10-20 22:57:43 +00:00
if (read_until_position)
{
2021-10-20 22:57:43 +00:00
if (read_until_position == offset)
return false;
2021-10-15 10:13:38 +00:00
2021-10-20 22:57:43 +00:00
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
}
2021-04-12 22:25:19 +00:00
bool next_result = false;
2021-07-09 12:37:55 +00:00
if (impl)
{
2021-10-15 10:13:38 +00:00
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
2021-10-28 22:04:52 +00:00
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
2021-10-15 10:13:38 +00:00
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
2021-09-24 12:12:11 +00:00
{
2021-10-01 08:38:58 +00:00
/**
2021-10-28 22:04:52 +00:00
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read.
2021-10-01 08:38:58 +00:00
*/
2021-09-24 12:12:11 +00:00
impl->position() = position();
2021-09-30 12:35:59 +00:00
assert(!impl->hasPendingData());
2021-09-24 12:12:11 +00:00
}
2021-07-09 12:37:55 +00:00
}
2021-06-12 12:57:14 +00:00
2021-10-15 10:13:38 +00:00
size_t sleep_time_with_backoff_milliseconds = 100;
2021-07-09 12:37:55 +00:00
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
2021-04-12 22:25:19 +00:00
{
2021-08-09 16:19:45 +00:00
Stopwatch watch;
2021-04-12 22:25:19 +00:00
try
{
2021-10-15 10:13:38 +00:00
if (!impl)
{
impl = initialize();
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
}
2021-07-09 12:37:55 +00:00
/// Try to read a next portion of data.
2021-04-12 22:25:19 +00:00
next_result = impl->next();
2021-08-09 16:19:45 +00:00
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
2021-04-12 22:25:19 +00:00
break;
}
catch (const Exception & e)
{
2021-08-09 16:19:45 +00:00
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
2021-04-12 22:25:19 +00:00
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
LOG_DEBUG(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
2021-04-13 09:09:03 +00:00
bucket, key, getPosition(), attempt, e.message());
2021-08-09 16:19:45 +00:00
if (attempt + 1 == max_single_read_retries)
throw;
2021-07-09 12:37:55 +00:00
/// Pause before next attempt.
2021-10-15 10:13:38 +00:00
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
2021-07-09 12:37:55 +00:00
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
2021-04-12 22:25:19 +00:00
impl.reset();
}
}
if (!next_result)
return false;
2021-07-09 12:37:55 +00:00
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
2021-07-09 12:37:55 +00:00
ProfileEvents::increment(ProfileEvents::S3ReadBytes, working_buffer.size());
offset += working_buffer.size();
return true;
}
2021-10-31 19:53:24 +00:00
2020-01-28 12:48:01 +00:00
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
2020-01-27 19:17:22 +00:00
{
if (offset_ == offset && whence == SEEK_SET)
return offset;
2021-10-31 19:53:24 +00:00
if (impl && restricted_seek)
2022-02-01 19:10:56 +00:00
throw Exception(
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
2022-02-02 14:25:25 +00:00
"Seek is allowed only before first read attempt from the buffer (current offset: {}, new offset: {}, reading until position: {})",
offset, offset_, read_until_position);
2021-06-07 10:49:34 +00:00
2020-01-28 13:05:37 +00:00
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
2020-01-28 12:48:01 +00:00
2020-01-28 13:05:37 +00:00
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
2020-01-28 12:48:01 +00:00
2021-10-31 19:53:24 +00:00
if (!restricted_seek)
{
if (!working_buffer.empty()
&& size_t(offset_) >= offset - working_buffer.size()
&& offset_ < offset)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
resetWorkingBuffer();
2021-10-31 19:53:24 +00:00
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
2021-10-31 19:53:24 +00:00
offset = offset_;
return offset;
}
2021-10-31 19:53:24 +00:00
std::optional<size_t> ReadBufferFromS3::getTotalSize()
{
if (file_size)
return file_size;
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
auto outcome = client_ptr->HeadObject(request);
auto head_result = outcome.GetResultWithOwnership();
file_size = head_result.GetContentLength();
return file_size;
}
2020-02-14 14:28:33 +00:00
off_t ReadBufferFromS3::getPosition()
{
return offset - available();
2020-02-14 14:28:33 +00:00
}
2022-01-13 11:57:56 +00:00
void ReadBufferFromS3::setReadUntilPosition(size_t position)
{
2022-01-26 09:35:46 +00:00
if (position != static_cast<size_t>(read_until_position))
{
read_until_position = position;
impl.reset();
}
2022-01-13 11:57:56 +00:00
}
2020-01-27 19:17:22 +00:00
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
2021-10-10 22:53:51 +00:00
2021-10-28 22:04:52 +00:00
/**
2021-11-10 07:46:18 +00:00
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
2021-10-28 22:04:52 +00:00
* exact byte ranges to read are always passed here.
*/
2021-10-20 22:57:43 +00:00
if (read_until_position)
2021-10-10 22:53:51 +00:00
{
2021-10-20 22:57:43 +00:00
if (offset >= read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
2021-10-15 10:13:38 +00:00
2021-10-20 22:57:43 +00:00
req.SetRange(fmt::format("bytes={}-{}", offset, read_until_position - 1));
LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, read_until_position - 1);
2021-10-10 22:53:51 +00:00
}
else
{
2021-12-22 08:42:23 +00:00
if (offset)
req.SetRange(fmt::format("bytes={}-", offset));
LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
2021-10-10 22:53:51 +00:00
}
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
2019-12-06 14:48:56 +00:00
if (outcome.IsSuccess())
{
2019-12-03 16:23:24 +00:00
read_result = outcome.GetResultWithOwnership();
2021-10-10 22:53:51 +00:00
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), read_settings.remote_fs_buffer_size);
2019-12-03 16:23:24 +00:00
}
2019-12-06 14:48:56 +00:00
else
2019-12-03 16:23:24 +00:00
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
2019-05-31 10:58:43 +00:00
}
2019-12-06 14:37:21 +00:00
#endif