Merge pull request #34571 from surahman/CH-31221-AWS-S3-object-version-id

[CH-31221] AWS S3 Object Version Specification
This commit is contained in:
Sergei Trifonov 2022-04-28 15:45:33 +02:00 committed by GitHub
commit 11f40376ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 169 additions and 15 deletions

View File

@ -46,7 +46,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
auto remote_file_reader_creator = [=, this]()
{
return std::make_unique<ReadBufferFromS3>(
client_ptr, bucket, remote_path, max_single_read_retries,
client_ptr, bucket, remote_path, version_id, max_single_read_retries,
settings, /* use_external_buffer */true, /* offset */ 0, read_until_position, /* restricted_seek */true);
};

View File

@ -103,6 +103,7 @@ public:
ReadBufferFromS3Gather(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & version_id_,
const std::string & common_path_prefix_,
const BlobsPathToSize & blobs_to_read_,
size_t max_single_read_retries_,
@ -110,6 +111,7 @@ public:
: ReadBufferFromRemoteFSGather(common_path_prefix_, blobs_to_read_, settings_)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, version_id(version_id_)
, max_single_read_retries(max_single_read_retries_)
{
}
@ -119,6 +121,7 @@ public:
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String version_id;
UInt64 max_single_read_retries;
};
#endif

View File

@ -109,6 +109,7 @@ DiskS3::DiskS3(
String name_,
String bucket_,
String s3_root_path_,
String version_id_,
DiskPtr metadata_disk_,
FileCachePtr cache_,
ContextPtr context_,
@ -116,6 +117,7 @@ DiskS3::DiskS3(
GetDiskSettings settings_getter_)
: IDiskRemote(name_, s3_root_path_, metadata_disk_, std::move(cache_), "DiskS3", settings_->thread_pool_size)
, bucket(std::move(bucket_))
, version_id(std::move(version_id_))
, current_settings(std::move(settings_))
, settings_getter(settings_getter_)
, context(context_)
@ -196,7 +198,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
}
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
settings->client, bucket, metadata.remote_fs_root_path, metadata.remote_fs_objects,
settings->client, bucket, version_id, metadata.remote_fs_root_path, metadata.remote_fs_objects,
settings->s3_settings.max_single_read_retries, disk_read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
@ -354,6 +356,7 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
settings->client,
source_bucket,
source_path + SCHEMA_VERSION_OBJECT,
version_id,
settings->s3_settings.max_single_read_retries,
context->getReadSettings());

View File

@ -66,6 +66,7 @@ public:
String name_,
String bucket_,
String s3_root_path_,
String version_id_,
DiskPtr metadata_disk_,
FileCachePtr cache_,
ContextPtr context_,
@ -157,6 +158,8 @@ private:
const String bucket;
const String version_id;
MultiVersion<DiskS3Settings> current_settings;
/// Gets disk settings from context.
GetDiskSettings settings_getter;

View File

@ -195,6 +195,7 @@ void registerDiskS3(DiskFactory & factory)
name,
uri.bucket,
uri.key,
uri.version_id,
metadata_disk,
std::move(cache),
context,

View File

@ -40,6 +40,7 @@ ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
const String & version_id_,
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer_,
@ -50,6 +51,7 @@ ReadBufferFromS3::ReadBufferFromS3(
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, version_id(version_id_)
, max_single_read_retries(max_single_read_retries_)
, offset(offset_)
, read_until_position(read_until_position_)
@ -128,8 +130,15 @@ bool ReadBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
LOG_DEBUG(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message());
LOG_DEBUG(
log,
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, Attempt: {}, Message: {}",
bucket,
key,
version_id.empty() ? "Latest" : version_id,
getPosition(),
attempt,
e.message());
if (attempt + 1 == max_single_read_retries)
throw;
@ -213,7 +222,7 @@ std::optional<size_t> ReadBufferFromS3::getFileSize()
if (file_size)
return file_size;
auto object_size = S3::getObjectSize(client_ptr, bucket, key, false);
auto object_size = S3::getObjectSize(client_ptr, bucket, key, version_id, false);
if (!object_size)
{
@ -248,6 +257,10 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
{
req.SetVersionId(version_id);
}
/**
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
@ -259,13 +272,26 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
req.SetRange(fmt::format("bytes={}-{}", offset, read_until_position - 1));
LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, read_until_position - 1);
LOG_TEST(
log,
"Read S3 object. Bucket: {}, Key: {}, Version: {}, Range: {}-{}",
bucket,
key,
version_id.empty() ? "Latest" : version_id,
offset,
read_until_position - 1);
}
else
{
if (offset)
req.SetRange(fmt::format("bytes={}-", offset));
LOG_TEST(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
LOG_TEST(
log,
"Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket,
key,
version_id.empty() ? "Latest" : version_id,
offset);
}
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
@ -293,6 +319,7 @@ SeekableReadBufferPtr ReadBufferS3Factory::getReader()
client_ptr,
bucket,
key,
version_id,
s3_max_single_read_retries,
read_settings,
false /*use_external_buffer*/,

View File

@ -32,6 +32,7 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String key;
String version_id;
UInt64 max_single_read_retries;
/// These variables are atomic because they can be used for `logging only`
@ -50,6 +51,7 @@ public:
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
const String & version_id_,
UInt64 max_single_read_retries_,
const ReadSettings & settings_,
bool use_external_buffer = false,
@ -95,6 +97,7 @@ public:
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
const String & version_id_,
size_t range_step_,
size_t object_size_,
UInt64 s3_max_single_read_retries_,
@ -102,6 +105,7 @@ public:
: client_ptr(client_ptr_)
, bucket(bucket_)
, key(key_)
, version_id(version_id_)
, read_settings(read_settings_)
, range_generator(object_size_, range_step_)
, range_step(range_step_)
@ -124,6 +128,7 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String bucket;
const String key;
const String version_id;
ReadSettings read_settings;
RangeGenerator range_generator;

View File

@ -779,13 +779,27 @@ namespace S3
static constexpr auto OBS = "OBS";
static constexpr auto OSS = "OSS";
uri = uri_;
storage_name = S3;
if (uri.getHost().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Host is empty in S3 URI.");
/// Extract object version ID from query string.
{
version_id = "";
const String version_key = "versionId=";
const auto query_string = uri.getQuery();
auto start = query_string.rfind(version_key);
if (start != std::string::npos)
{
start += version_key.length();
auto end = query_string.find_first_of('&', start);
version_id = query_string.substr(start, end == std::string::npos ? std::string::npos : end - start);
}
}
String name;
String endpoint_authority_from_uri;
@ -842,12 +856,15 @@ namespace S3
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
size_t getObjectSize(std::shared_ptr<Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, bool throw_on_error)
size_t getObjectSize(std::shared_ptr<Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
{
Aws::S3::Model::HeadObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
Aws::S3::Model::HeadObjectOutcome outcome = client_ptr->HeadObject(req);
if (outcome.IsSuccess())

View File

@ -66,6 +66,7 @@ struct URI
String endpoint;
String bucket;
String key;
String version_id;
String storage_name;
bool is_virtual_hosted_style;
@ -75,7 +76,7 @@ struct URI
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
size_t getObjectSize(std::shared_ptr<Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, bool throw_on_error = true);
size_t getObjectSize(std::shared_ptr<Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true);
}

View File

@ -3,12 +3,79 @@
#if USE_AWS_S3
# include <IO/S3Common.h>
#include <IO/S3Common.h>
namespace
{
using namespace DB;
struct TestCase
{
S3::URI uri;
String endpoint;
String bucket;
String key;
String version_id;
bool is_virtual_hosted_style;
};
const TestCase TestCases[] = {
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?firstKey=someKey&secondKey=anotherKey")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?versionId=testVersionId&anotherKey=someOtherKey")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"testVersionId",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?firstKey=someKey&versionId=testVersionId&anotherKey=someOtherKey")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"testVersionId",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?anotherKey=someOtherKey&versionId=testVersionId")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"testVersionId",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?versionId=testVersionId")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"testVersionId",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?versionId=")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?versionId&")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"",
true},
{S3::URI(Poco::URI("https://bucketname.s3.us-east-2.amazonaws.com/data?versionId")),
"https://s3.us-east-2.amazonaws.com",
"bucketname",
"data",
"",
true},
};
class S3UriTest : public testing::TestWithParam<std::string>
{
};
@ -20,6 +87,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3.amazonaws.com", uri.endpoint);
ASSERT_EQ("jokserfn", uri.bucket);
ASSERT_EQ("", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
@ -27,6 +95,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3.amazonaws.com", uri.endpoint);
ASSERT_EQ("jokserfn", uri.bucket);
ASSERT_EQ("", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(false, uri.is_virtual_hosted_style);
}
{
@ -34,6 +103,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://amazonaws.com", uri.endpoint);
ASSERT_EQ("bucket", uri.bucket);
ASSERT_EQ("", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(false, uri.is_virtual_hosted_style);
}
{
@ -41,6 +111,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3.amazonaws.com", uri.endpoint);
ASSERT_EQ("jokserfn", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
@ -48,6 +119,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://storage.amazonaws.com", uri.endpoint);
ASSERT_EQ("jokserfn", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(false, uri.is_virtual_hosted_style);
}
{
@ -55,6 +127,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://cos.ap-beijing.myqcloud.com", uri.endpoint);
ASSERT_EQ("bucketname", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
@ -62,6 +135,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3.us-east-2.amazonaws.com", uri.endpoint);
ASSERT_EQ("bucketname", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
@ -69,6 +143,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3.us-east-2.amazonaws.com", uri.endpoint);
ASSERT_EQ("bucketname", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(false, uri.is_virtual_hosted_style);
}
{
@ -76,6 +151,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3-us-east-2.amazonaws.com", uri.endpoint);
ASSERT_EQ("bucketname", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
@ -83,6 +159,7 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("https://s3-us-east-2.amazonaws.com", uri.endpoint);
ASSERT_EQ("bucketname", uri.bucket);
ASSERT_EQ("data", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(false, uri.is_virtual_hosted_style);
}
}
@ -92,6 +169,18 @@ TEST_P(S3UriTest, invalidPatterns)
ASSERT_ANY_THROW(S3::URI(Poco::URI(GetParam())));
}
TEST(S3UriTest, versionIdChecks)
{
for (const auto& test_case : TestCases)
{
ASSERT_EQ(test_case.endpoint, test_case.uri.endpoint);
ASSERT_EQ(test_case.bucket, test_case.uri.bucket);
ASSERT_EQ(test_case.key, test_case.uri.key);
ASSERT_EQ(test_case.version_id, test_case.uri.version_id);
ASSERT_EQ(test_case.is_virtual_hosted_style, test_case.uri.is_virtual_hosted_style);
}
}
INSTANTIATE_TEST_SUITE_P(
S3,
S3UriTest,

View File

@ -232,12 +232,14 @@ StorageS3Source::StorageS3Source(
String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket_,
const String & version_id_,
std::shared_ptr<IteratorWrapper> file_iterator_,
const size_t download_thread_num_)
: SourceWithProgress(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_)
, name(std::move(name_))
, bucket(bucket_)
, version_id(version_id_)
, format(format_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
@ -291,7 +293,7 @@ bool StorageS3Source::initialize()
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key)
{
const size_t object_size = DB::S3::getObjectSize(client, bucket, key, false);
const size_t object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false);
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
@ -299,7 +301,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
if (!use_parallel_download || object_too_small)
{
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
return std::make_unique<ReadBufferFromS3>(client, bucket, key, max_single_read_retries, getContext()->getReadSettings());
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, max_single_read_retries, getContext()->getReadSettings());
}
assert(object_size > 0);
@ -311,7 +313,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
}
auto factory = std::make_unique<ReadBufferS3Factory>(
client, bucket, key, download_buffer_size, object_size, max_single_read_retries, getContext()->getReadSettings());
client, bucket, key, version_id, download_buffer_size, object_size, max_single_read_retries, getContext()->getReadSettings());
LOG_TRACE(
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
@ -693,6 +695,7 @@ Pipe StorageS3::read(
compression_method,
s3_configuration.client,
s3_configuration.uri.bucket,
s3_configuration.uri.version_id,
iterator_wrapper,
max_download_threads));
}
@ -966,7 +969,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
first = false;
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(key, compression_method));
};

View File

@ -73,6 +73,7 @@ public:
String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket,
const String & version_id,
std::shared_ptr<IteratorWrapper> file_iterator_,
size_t download_thread_num);
@ -85,6 +86,7 @@ public:
private:
String name;
String bucket;
String version_id;
String file_path;
String format;
ColumnsDescription columns_desc;