Switched paths in S3 metadata to relative.

This commit is contained in:
Vladimir Chebotarev 2020-06-23 19:00:15 +03:00
parent 174b687a44
commit 8128ca10f4

View File

@ -20,6 +20,9 @@
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -60,10 +63,14 @@ namespace
struct Metadata
{
/// Metadata file version.
static constexpr UInt32 VERSION = 1;
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
using PathAndSize = std::pair<String, size_t>;
/// S3 root path.
const String & s3_root_path;
/// Disk path.
const String & disk_path;
/// Relative path to metadata file on local FS.
@ -76,8 +83,8 @@ namespace
UInt32 ref_count;
/// Load metadata by path or create empty if `create` flag is set.
explicit Metadata(const String & disk_path_, const String & metadata_file_path_, bool create = false)
: disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0)
explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false)
: s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0)
{
if (create)
return;
@ -87,10 +94,10 @@ namespace
UInt32 version;
readIntText(version, buf);
if (version != VERSION)
if (version != VERSION_RELATIVE_PATHS && version != VERSION_ABSOLUTE_PATHS)
throw Exception(
"Unknown metadata file version. Path: " + disk_path + metadata_file_path
+ " Version: " + std::to_string(version) + ", Expected version: " + std::to_string(VERSION),
+ " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_RELATIVE_PATHS),
ErrorCodes::UNKNOWN_FORMAT);
assertChar('\n', buf);
@ -108,6 +115,15 @@ namespace
readIntText(s3_object_size, buf);
assertChar('\t', buf);
readEscapedString(s3_object_path, buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
if (!boost::algorithm::starts_with(s3_object_path, s3_root_path))
throw Exception(
"Path in metadata does not correspond S3 root path. Path: " + s3_object_path
+ ", root path: " + s3_root_path + ", disk path: " + disk_path_,
ErrorCodes::UNKNOWN_FORMAT);
s3_object_path = s3_object_path.substr(s3_root_path.size());
}
assertChar('\n', buf);
s3_objects[i] = {s3_object_path, s3_object_size};
}
@ -127,7 +143,7 @@ namespace
{
WriteBufferFromFile buf(disk_path + metadata_file_path, 1024);
writeIntText(VERSION, buf);
writeIntText(VERSION_RELATIVE_PATHS, buf);
writeChar('\n', buf);
writeIntText(s3_objects.size(), buf);
@ -213,7 +229,7 @@ namespace
const auto & [path, size] = metadata.s3_objects[i];
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
buf->seek(offset, SEEK_SET);
return buf;
}
@ -242,7 +258,7 @@ namespace
++current_buf_idx;
const auto & path = metadata.s3_objects[current_buf_idx].first;
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, path, buf_size);
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
current_buf->next();
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
@ -272,7 +288,7 @@ namespace
size_t min_upload_part_size,
size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_))
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, buf_size_))
, metadata(std::move(metadata_))
, s3_path(s3_path_)
{
@ -440,7 +456,7 @@ bool DiskS3::isDirectory(const String & path) const
size_t DiskS3::getFileSize(const String & path) const
{
Metadata metadata(metadata_path, path);
Metadata metadata(s3_root_path, metadata_path, path);
return metadata.total_size;
}
@ -493,16 +509,16 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
if (exists(to_path))
remove(to_path);
Metadata from(metadata_path, from_path);
Metadata to(metadata_path, to_path, true);
Metadata from(s3_root_path, metadata_path, from_path);
Metadata to(s3_root_path, metadata_path, to_path, true);
for (const auto & [path, size] : from.s3_objects)
{
auto new_path = s3_root_path + getRandomName();
auto new_path = getRandomName();
Aws::S3::Model::CopyObjectRequest req;
req.SetCopySource(bucket + "/" + path);
req.SetCopySource(bucket + "/" + s3_root_path + path);
req.SetBucket(bucket);
req.SetKey(new_path);
req.SetKey(s3_root_path + new_path);
throwIfError(client->CopyObject(req));
to.addObject(new_path, size);
@ -513,7 +529,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const
{
Metadata metadata(metadata_path, path);
Metadata metadata(s3_root_path, metadata_path, path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());
@ -525,27 +541,27 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
{
bool exist = exists(path);
/// Path to store new S3 object.
auto s3_path = s3_root_path + getRandomName();
auto s3_path = getRandomName();
if (!exist || mode == WriteMode::Rewrite)
{
/// If metadata file exists - remove and create new.
if (exist)
remove(path);
Metadata metadata(metadata_path, path, true);
Metadata metadata(s3_root_path, metadata_path, path, true);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
}
else
{
Metadata metadata(metadata_path, path);
Metadata metadata(s3_root_path, metadata_path, path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
backQuote(metadata_path + path), s3_path, metadata.s3_objects.size());
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, min_upload_part_size, buf_size);
}
@ -558,7 +574,7 @@ void DiskS3::remove(const String & path)
Poco::File file(metadata_path + path);
if (file.isFile())
{
Metadata metadata(metadata_path, path);
Metadata metadata(s3_root_path, metadata_path, path);
/// If there is no references - delete content from S3.
if (metadata.ref_count == 0)
@ -569,7 +585,7 @@ void DiskS3::remove(const String & path)
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(s3_object_path);
request.SetKey(s3_root_path + s3_object_path);
throwIfError(client->DeleteObject(request));
}
}
@ -644,7 +660,7 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
{
/// Increment number of references.
Metadata src(metadata_path, src_path);
Metadata src(s3_root_path, metadata_path, src_path);
++src.ref_count;
src.save();
@ -655,7 +671,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path)
void DiskS3::createFile(const String & path)
{
/// Create empty metadata file.
Metadata metadata(metadata_path, path, true);
Metadata metadata(s3_root_path, metadata_path, path, true);
metadata.save();
}