Merge pull request #17909 from Jokser/disk-s3-send-metadata

Ability to set metadata when put S3 object
This commit is contained in:
Kruglov Pavel 2020-12-11 17:01:14 +03:00 committed by GitHub
commit 6300cd285f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 57 additions and 30 deletions

View File

@ -3,6 +3,7 @@
#include "Disks/DiskFactory.h"
#include <random>
#include <optional>
#include <utility>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
@ -326,11 +327,19 @@ namespace
const String & bucket_,
Metadata metadata_,
const String & s3_path_,
std::optional<DiskS3::ObjectMetadata> object_metadata_,
bool is_multipart,
size_t min_upload_part_size,
size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, is_multipart, buf_size_))
, impl(WriteBufferFromS3(
client_ptr_,
bucket_,
metadata_.s3_root_path + s3_path_,
min_upload_part_size,
is_multipart,
std::move(object_metadata_),
buf_size_))
, metadata(std::move(metadata_))
, s3_path(s3_path_)
{
@ -522,7 +531,8 @@ DiskS3::DiskS3(
String metadata_path_,
size_t min_upload_part_size_,
size_t min_multi_part_upload_size_,
size_t min_bytes_for_seek_)
size_t min_bytes_for_seek_,
bool send_metadata_)
: IDisk(std::make_unique<AsyncExecutor>())
, name(std::move(name_))
, client(std::move(client_))
@ -533,6 +543,7 @@ DiskS3::DiskS3(
, min_upload_part_size(min_upload_part_size_)
, min_multi_part_upload_size(min_multi_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, send_metadata(send_metadata_)
{
}
@ -653,6 +664,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
}
/// Path to store new S3 object.
auto s3_path = getRandomName();
auto object_metadata = createObjectMetadata(path);
bool is_multipart = estimated_size >= min_multi_part_upload_size;
if (!exist || mode == WriteMode::Rewrite)
{
@ -664,9 +676,9 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, is_multipart, min_upload_part_size, buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, object_metadata, is_multipart, min_upload_part_size, buf_size);
}
else
{
@ -675,7 +687,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, is_multipart, min_upload_part_size, buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, object_metadata, is_multipart, min_upload_part_size, buf_size);
}
}
@ -847,4 +859,12 @@ void DiskS3::shutdown()
client->DisableRequestProcessing();
}
std::optional<DiskS3::ObjectMetadata> DiskS3::createObjectMetadata(const String & path) const
{
if (send_metadata)
return (DiskS3::ObjectMetadata){{"path", path}};
return {};
}
}

View File

@ -19,6 +19,8 @@ namespace DB
class DiskS3 : public IDisk
{
public:
using ObjectMetadata = std::map<std::string, std::string>;
friend class DiskS3Reservation;
class AwsS3KeyKeeper;
@ -32,7 +34,8 @@ public:
String metadata_path_,
size_t min_upload_part_size_,
size_t min_multi_part_upload_size_,
size_t min_bytes_for_seek_);
size_t min_bytes_for_seek_,
bool send_metadata_);
const String & getName() const override { return name; }
@ -116,6 +119,7 @@ private:
void removeMeta(const String & path, AwsS3KeyKeeper & keys);
void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys);
void removeAws(const AwsS3KeyKeeper & keys);
std::optional<ObjectMetadata> createObjectMetadata(const String & path) const;
private:
const String name;
@ -127,6 +131,7 @@ private:
size_t min_upload_part_size;
size_t min_multi_part_upload_size;
size_t min_bytes_for_seek;
bool send_metadata;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;

View File

@ -149,7 +149,8 @@ void registerDiskS3(DiskFactory & factory)
metadata_path,
context.getSettingsRef().s3_min_upload_part_size,
config.getUInt64(config_prefix + ".min_multi_part_upload_size", 10 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_object_metadata", false));
/// This code is used only to check access to the corresponding disk.
if (!config.getBool(config_prefix + ".skip_access_check", false))

View File

@ -43,11 +43,13 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & key_,
size_t minimum_upload_part_size_,
bool is_multipart_,
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, is_multipart(is_multipart_)
, bucket(bucket_)
, key(key_)
, object_metadata(std::move(object_metadata_))
, client_ptr(std::move(client_ptr_))
, minimum_upload_part_size{minimum_upload_part_size_}
, temporary_buffer{std::make_unique<WriteBufferFromOwnString>()}
@ -116,6 +118,8 @@ void WriteBufferFromS3::initiate()
Aws::S3::Model::CreateMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (object_metadata.has_value())
req.SetMetadata(object_metadata.value());
auto outcome = client_ptr->CreateMultipartUpload(req);
@ -217,6 +221,8 @@ void WriteBufferFromS3::complete()
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (object_metadata.has_value())
req.SetMetadata(object_metadata.value());
/// This could be improved using an adapter to WriteBuffer.
const std::shared_ptr<Aws::IOStream> input_data = Aws::MakeShared<Aws::StringStream>("temporary buffer", temporary_buffer->str());

View File

@ -28,6 +28,7 @@ private:
String bucket;
String key;
std::optional<std::map<String, String>> object_metadata;
std::shared_ptr<Aws::S3::S3Client> client_ptr;
size_t minimum_upload_part_size;
std::unique_ptr<WriteBufferFromOwnString> temporary_buffer;
@ -47,6 +48,7 @@ public:
const String & key_,
size_t minimum_upload_part_size_,
bool is_multipart,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
void nextImpl() override;

View File

@ -8,18 +8,6 @@
<count>10</count>
</logger>
<storage_configuration>
<disks>
<default>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</default>
</disks>
</storage_configuration>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>

View File

@ -7,6 +7,7 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_object_metadata>true</send_object_metadata>
</default>
</disks>
</storage_configuration>

View File

@ -24,34 +24,38 @@ def cluster():
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path='data/'):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
print(s3_objects, file=sys.stderr)
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
logging.info("Existing S3 object: %s", str(object_meta))
assert objects_count == len(s3_objects)
@pytest.mark.parametrize(
"log_engine,files_overhead,files_overhead_per_insert",
[("TinyLog", 1, 1), ("Log", 2, 1), ("StripeLog", 1, 2)])
def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_insert):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("CREATE TABLE s3_test (id UInt64) Engine={}".format(log_engine))
node.query("INSERT INTO s3_test SELECT number FROM numbers(5)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n"
print(list(minio.list_objects(cluster.minio_bucket, 'data/')), file=sys.stderr)
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead_per_insert + files_overhead
assert_objects_count(cluster, files_overhead_per_insert + files_overhead)
node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)")
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n"
print(list(minio.list_objects(cluster.minio_bucket, 'data/')), file=sys.stderr)
assert len(
list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead_per_insert * 2 + files_overhead
assert_objects_count(cluster, files_overhead_per_insert * 2 + files_overhead)
node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)")
assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
print(list(minio.list_objects(cluster.minio_bucket, 'data/')), file=sys.stderr)
assert len(
list(minio.list_objects(cluster.minio_bucket, 'data/'))) == files_overhead_per_insert * 3 + files_overhead
assert_objects_count(cluster, files_overhead_per_insert * 3 + files_overhead)
node.query("TRUNCATE TABLE s3_test")
print(list(minio.list_objects(cluster.minio_bucket, 'data/')), file=sys.stderr)
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
assert_objects_count(cluster, 0)
node.query("DROP TABLE s3_test")