ClickHouse/src/IO/ReadBufferFromS3.cpp

140 lines
3.8 KiB
C++
Raw Normal View History

2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
2019-05-31 10:58:43 +00:00
2020-01-28 13:05:37 +00:00
# include <IO/ReadBufferFromIStream.h>
# include <IO/ReadBufferFromS3.h>
# include <Common/Stopwatch.h>
2020-01-28 13:05:37 +00:00
# include <aws/s3/S3Client.h>
# include <aws/s3/model/GetObjectRequest.h>
# include <common/logger_useful.h>
2019-05-31 10:58:43 +00:00
2020-01-28 13:05:37 +00:00
# include <utility>
2021-04-13 19:11:58 +00:00
namespace ProfileEvents
{
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
2021-04-12 22:25:19 +00:00
extern const Event S3ReadRequestsErrors;
}
2019-05-31 10:58:43 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
namespace ErrorCodes
{
2019-12-03 16:23:24 +00:00
extern const int S3_ERROR;
2020-01-28 12:48:01 +00:00
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
2019-12-03 16:23:24 +00:00
}
ReadBufferFromS3::ReadBufferFromS3(
2021-04-19 08:02:36 +00:00
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, UInt64 s3_max_single_read_retries_, size_t buffer_size_)
2021-04-12 22:25:19 +00:00
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, buffer_size(buffer_size_)
2019-12-03 16:23:24 +00:00
{
}
bool ReadBufferFromS3::nextImpl()
{
2021-04-13 19:11:58 +00:00
/// Restoring valid value of `count()` during `nextImpl()`. See `ReadBuffer::next()`.
pos = working_buffer.begin();
2021-04-12 22:25:19 +00:00
if (!impl)
impl = initialize();
2020-07-15 11:15:12 +00:00
Stopwatch watch;
2021-04-12 22:25:19 +00:00
bool next_result = false;
2021-04-19 08:02:36 +00:00
for (Int64 attempt = static_cast<Int64>(s3_max_single_read_retries); attempt >= 0; --attempt)
2021-04-12 22:25:19 +00:00
{
if (!impl)
impl = initialize();
try
{
next_result = impl->next();
2021-04-13 19:11:58 +00:00
/// FIXME. 1. Poco `istream` cannot read less than buffer_size or this state is being discarded during
/// istream <-> iostream conversion. `gcount` always contains 0,
/// that's why we always have error "Cannot read from istream at offset 0".
2021-04-12 22:25:19 +00:00
break;
}
catch (const Exception & e)
{
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
2021-04-13 09:09:03 +00:00
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Remaining attempts: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message());
2021-04-12 22:25:19 +00:00
impl.reset();
if (!attempt)
throw;
}
}
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
2021-04-12 22:25:19 +00:00
if (!next_result)
return false;
internal_buffer = impl->buffer();
ProfileEvents::increment(ProfileEvents::S3ReadBytes, internal_buffer.size());
working_buffer = internal_buffer;
return true;
}
2020-01-28 12:48:01 +00:00
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
2020-01-27 19:17:22 +00:00
{
2021-04-12 22:25:19 +00:00
if (impl)
2020-01-28 13:05:37 +00:00
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
2020-01-28 12:48:01 +00:00
2020-01-28 13:05:37 +00:00
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
2020-01-28 12:48:01 +00:00
2020-01-28 13:05:37 +00:00
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
2020-01-28 12:48:01 +00:00
offset = offset_;
return offset;
}
2020-02-14 14:28:33 +00:00
off_t ReadBufferFromS3::getPosition()
{
return offset + count();
}
2020-01-27 19:17:22 +00:00
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
2021-04-13 19:11:58 +00:00
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, getPosition());
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
2021-04-13 19:11:58 +00:00
if (getPosition())
req.SetRange("bytes=" + std::to_string(getPosition()) + "-");
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
2019-12-06 14:48:56 +00:00
if (outcome.IsSuccess())
{
2019-12-03 16:23:24 +00:00
read_result = outcome.GetResultWithOwnership();
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
2019-12-03 16:23:24 +00:00
}
2019-12-06 14:48:56 +00:00
else
2019-12-03 16:23:24 +00:00
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
2019-05-31 10:58:43 +00:00
}
2019-12-06 14:37:21 +00:00
#endif