ClickHouse/dbms/src/IO/ReadBufferFromS3.cpp

83 lines
1.7 KiB
C++
Raw Normal View History

2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
2019-05-31 10:58:43 +00:00
2019-12-06 14:37:21 +00:00
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadBufferFromIStream.h>
2019-05-31 10:58:43 +00:00
#include <common/logger_useful.h>
2019-12-03 16:23:24 +00:00
#include <aws/s3/model/GetObjectRequest.h>
2019-12-11 14:21:48 +00:00
#include <aws/s3/S3Client.h>
2019-05-31 10:58:43 +00:00
#include <utility>
2019-05-31 10:58:43 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
namespace ErrorCodes
{
2019-12-03 16:23:24 +00:00
extern const int S3_ERROR;
}
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buffer_size_
)
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, buffer_size(buffer_size_)
2019-12-03 16:23:24 +00:00
{
}
bool ReadBufferFromS3::nextImpl()
{
if (!initialized)
{
impl = initialize();
initialized = true;
}
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
off_t ReadBufferFromS3::seek(off_t offset_, int) {
if (!initialized && offset_)
offset = offset_;
return offset;
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize() {
LOG_DEBUG(log, "Read S3 object. "
"Bucket: " + bucket + ", Key: " + key + ", Offset: " + std::to_string(offset));
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (offset != 0)
req.SetRange("bytes=" + std::to_string(offset) + "-");
2019-12-03 16:23:24 +00:00
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
2019-12-06 14:48:56 +00:00
if (outcome.IsSuccess())
{
2019-12-03 16:23:24 +00:00
read_result = outcome.GetResultWithOwnership();
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
2019-12-03 16:23:24 +00:00
}
2019-12-06 14:48:56 +00:00
else
2019-12-03 16:23:24 +00:00
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
2019-05-31 10:58:43 +00:00
}
2019-12-06 14:37:21 +00:00
#endif