Ability to backup-restore metadata files for DiskS3 (WIP)

This commit is contained in:
Pavel Kovalenko 2020-12-22 21:47:47 +03:00
parent 410d0a51b5
commit 18fe1c796b
9 changed files with 331 additions and 73 deletions

View File

@ -239,19 +239,6 @@ void DiskCacheWrapper::replaceFile(const String & from_path, const String & to_p
DiskDecorator::replaceFile(from_path, to_path);
}
void DiskCacheWrapper::copyFile(const String & from_path, const String & to_path)
{
if (cache_disk->exists(from_path))
{
auto dir_path = getDirectoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
cache_disk->copyFile(from_path, to_path);
}
DiskDecorator::copyFile(from_path, to_path);
}
void DiskCacheWrapper::remove(const String & path)
{
if (cache_disk->exists(path))

View File

@ -32,7 +32,6 @@ public:
void moveDirectory(const String & from_path, const String & to_path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBufferFromFileBase>
readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const override;
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -220,11 +220,6 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path)
from_file.renameTo(to_file.path());
}
void DiskLocal::copyFile(const String & from_path, const String & to_path)
{
Poco::File(disk_path + from_path).copyTo(disk_path + to_path);
}
std::unique_ptr<ReadBufferFromFileBase>
DiskLocal::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const
{

View File

@ -67,8 +67,6 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;

View File

@ -314,11 +314,6 @@ void DiskMemory::replaceFileImpl(const String & from_path, const String & to_pat
files.insert(std::move(node));
}
void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path*/)
{
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t) const
{
std::lock_guard lock(mutex);

View File

@ -60,8 +60,6 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(

View File

@ -127,9 +127,6 @@ public:
/// If a file with `to_path` path already exists, it will be replaced.
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
/// Copy the file from `from_path` to `to_path`.
virtual void copyFile(const String & from_path, const String & to_path) = 0;
/// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`.
virtual void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path);

View File

@ -23,6 +23,8 @@
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <boost/algorithm/string.hpp>
@ -32,6 +34,7 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int FILE_ALREADY_EXISTS;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
@ -76,12 +79,12 @@ String getRandomName()
}
template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> && response)
void throwIfError(Aws::Utils::Outcome<Result, Error> & response)
{
if (!response.IsSuccess())
{
const auto & err = response.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
throw Exception(std::to_string(static_cast<int>(err.GetErrorType())) + ": " + err.GetMessage(), ErrorCodes::S3_ERROR);
}
}
@ -613,45 +616,31 @@ void DiskS3::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
if (send_metadata)
{
auto revision = ++revision_counter;
const DiskS3::ObjectMetadata object_metadata {
{"from_path", from_path},
{"to_path", to_path}
};
createFileOperationObject("rename", revision, object_metadata);
}
Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path);
}
void DiskS3::replaceFile(const String & from_path, const String & to_path)
{
Poco::File from_file(metadata_path + from_path);
Poco::File to_file(metadata_path + to_path);
if (to_file.exists())
if (exists(to_path))
{
Poco::File tmp_file(metadata_path + to_path + ".old");
to_file.renameTo(tmp_file.path());
from_file.renameTo(metadata_path + to_path);
remove(to_path + ".old");
const String tmp_path = to_path + ".old";
moveFile(to_path, tmp_path);
moveFile(from_path, to_path);
remove(tmp_path);
}
else
from_file.renameTo(to_file.path());
}
void DiskS3::copyFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
remove(to_path);
auto from = readMeta(from_path);
auto to = createMeta(to_path);
for (const auto & [path, size] : from.s3_objects)
{
auto new_path = getRandomName();
Aws::S3::Model::CopyObjectRequest req;
req.SetCopySource(bucket + "/" + s3_root_path + path);
req.SetBucket(bucket);
req.SetKey(s3_root_path + new_path);
throwIfError(client->CopyObject(req));
to.addObject(new_path, size);
}
to.save();
moveFile(from_path, to_path);
}
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const
@ -673,7 +662,17 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
/// Path to store new S3 object.
auto s3_path = getRandomName();
auto object_metadata = createObjectMetadata(path);
std::optional<ObjectMetadata> object_metadata;
if (send_metadata)
{
auto revision = ++revision_counter;
object_metadata = {
{"path", path}
};
s3_path = "r" + revisionToString(revision) + "-file-" + s3_path;
}
if (!exist || mode == WriteMode::Rewrite)
{
/// If metadata file exists - remove and create new.
@ -727,6 +726,15 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
}
else /// In other case decrement number of references, save metadata and delete file.
{
if (send_metadata)
{
auto revision = ++revision_counter;
const ObjectMetadata object_metadata {
{"path", path}
};
createFileOperationObject("remove", revision, object_metadata);
}
--metadata.ref_count;
metadata.save();
file.remove();
@ -780,7 +788,8 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
throwIfError(client->DeleteObjects(request));
auto outcome = client->DeleteObjects(request);
throwIfError(outcome);
}
}
}
@ -840,6 +849,16 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
{
if (send_metadata)
{
auto revision = ++revision_counter;
const ObjectMetadata object_metadata {
{"src_path", src_path},
{"dst_path", dst_path}
};
createFileOperationObject("hardlink", revision, object_metadata);
}
/// Increment number of references.
auto src = readMeta(src_path);
++src.ref_count;
@ -889,12 +908,257 @@ void DiskS3::shutdown()
client->DisableRequestProcessing();
}
std::optional<DiskS3::ObjectMetadata> DiskS3::createObjectMetadata(const String & path) const
void DiskS3::createFileOperationObject(const String & operation_name, UInt64 revision, const DiskS3::ObjectMetadata & metadata)
{
if (send_metadata)
return (DiskS3::ObjectMetadata){{"path", path}};
const String key = "meta/r" + revisionToString(revision) + "-" + operation_name;
WriteBufferFromS3 buffer(client, bucket, s3_root_path + key, min_upload_part_size, max_single_part_upload_size, metadata);
buffer.write('0');
buffer.finalize();
}
return {};
void DiskS3::startup()
{
if (!send_metadata)
return;
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name);
/// Find last revision.
UInt64 l = 0, r = (static_cast<UInt64>(1)) << 63;
while (r - l > 1)
{
auto revision = (r - l) >> 1;
auto revision_str = revisionToString(revision);
/// Check that object or metaobject with such revision exists.
if (checkObjectExists(s3_root_path + "r" + revision_str)
|| checkObjectExists(s3_root_path + "meta/r" + revision_str))
l = revision;
else
r = revision;
}
revision_counter = l;
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {}", revision_counter);
}
bool DiskS3::checkObjectExists(const String & prefix)
{
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetPrefix(prefix);
request.SetMaxKeys(1);
auto outcome = client->ListObjectsV2(request);
throwIfError(outcome);
return !outcome.GetResult().GetContents().empty();
}
struct DiskS3::RestoreInformation
{
UInt64 revision = (static_cast<UInt64>(1)) << 63;
String bucket;
String path;
};
void DiskS3::restore()
{
if (!exists(restore_file))
return;
RestoreInformation information;
///TODO: read restore information from restore_file.
restoreFiles(information.bucket, information.path, information.revision);
restoreFileOperations(information.bucket, information.path, information.revision);
}
Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket, const String & key)
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(source_bucket);
request.SetKey(key);
auto outcome = client->HeadObject(request);
throwIfError(outcome);
return outcome.GetResultWithOwnership();
}
void DiskS3::listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback)
{
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(source_bucket);
request.SetPrefix(source_path);
request.SetMaxKeys(1000);
Aws::S3::Model::ListObjectsV2Outcome outcome;
do
{
outcome = client->ListObjectsV2(request);
throwIfError(outcome);
bool should_continue = callback(outcome.GetResult());
if (!should_continue)
break;
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
} while (outcome.GetResult().GetIsTruncated());
}
void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 revision)
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore files for disk {}", name);
std::vector<std::future<void>> results;
listObjects(source_bucket, source_path, [this, &source_bucket, &revision, &results](auto list_result) {
std::vector<String> keys;
for (const auto & row : list_result.GetContents())
{
const String & key = row.GetKey();
/// Skip meta objects. They will be processed separately.
if (key.find("/meta/") != String::npos)
continue;
/// Filter early if it's possible to get revision from key.
if (extractRevisionFromKey(key) > revision)
continue;
keys.push_back(key);
}
if (!keys.empty())
{
auto result = getExecutor().execute([this, &source_bucket, keys]() { processRestoreFiles(source_bucket, keys);
});
results.push_back(std::move(result));
}
return true;
});
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Files are restored for disk {}", name);
}
inline String getDirectoryPath(const String & path)
{
return Poco::Path{path}.setFileName("").toString();
}
void DiskS3::processRestoreFiles(const String & source_bucket, Strings keys)
{
for (const auto & key : keys)
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(source_bucket);
request.SetKey(key);
auto outcome = client->HeadObject(request);
throwIfError(outcome);
auto object_metadata = outcome.GetResult().GetMetadata();
/// If object has 'path' in metadata then restore it.
auto path = object_metadata.find("path");
if (path == object_metadata.end())
continue;
createDirectories(getDirectoryPath(path->second));
auto metadata = createMeta(path->second);
/// TODO: shrink common prefix of s3_root_path and key.
auto relative_key = key;
metadata.addObject(relative_key, outcome.GetResult().GetContentLength());
/// TODO: Copy object to configured bucket if source_bucket is different.
metadata.save();
}
}
void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 revision)
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name);
/// Temporarily disable sending metadata.
send_metadata = false;
listObjects(source_bucket, source_path + "meta/", [this, &source_bucket, &revision](auto list_result) {
const String rename = "rename";
const String remove = "remove";
const String hardlink = "hardlink";
for (const auto & row : list_result.GetContents())
{
const String & key = row.GetKey();
/// Stop processing when get revision more than required.
/// S3 ensures that keys will be listed in ascending UTF-8 bytes order.
if (extractRevisionFromKey(key) > revision)
return false;
auto operation = extractOperationFromKey(key);
auto object_metadata = headObject(source_bucket, key).GetMetadata();
if (operation == rename)
{
auto from_path = object_metadata["from_path"];
auto to_path = object_metadata["to_path"];
if (exists(from_path))
moveFile(from_path, to_path);
}
else if (operation == remove)
{
removeIfExists(object_metadata["path"]);
}
else if (operation == hardlink)
{
auto src_path = object_metadata["src_path"];
auto dst_path = object_metadata["dst_path"];
/// Skip hardlinks to shadow (backup) directory.
if (exists(src_path) && dst_path.find("/shadow/") != String::npos)
createHardLink(src_path, dst_path);
}
}
return true;
});
send_metadata = true;
LOG_INFO(&Poco::Logger::get("DiskS3"), "File operations restored for disk {}", name);
}
UInt64 DiskS3::extractRevisionFromKey(const String & key)
{
/// TODO: Implement.
return 0;
}
String DiskS3::extractOperationFromKey(const String & key)
{
/// TODO: Implement.
return "";
}
String DiskS3::revisionToString(UInt64 revision)
{
static constexpr size_t max_digits = 19;
/// Align revision number with leading zeroes to have strict lexicographical order of them.
auto revision_str = std::to_string(revision);
auto digits_to_align = max_digits - revision_str.length();
for (size_t i = 0; i < digits_to_align; ++i)
revision_str = "0" + revision_str;
return revision_str;
}
}

View File

@ -1,10 +1,14 @@
#pragma once
#include <atomic>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
#include "ProxyConfiguration.h"
#include <aws/s3/S3Client.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Result.h>
#include <Poco/DirectoryIterator.h>
@ -19,12 +23,16 @@ namespace DB
class DiskS3 : public IDisk
{
public:
/// File contains restore information
const String restore_file = "restore";
using ObjectMetadata = std::map<std::string, std::string>;
friend class DiskS3Reservation;
class AwsS3KeyKeeper;
struct Metadata;
struct RestoreInformation;
DiskS3(
String name_,
@ -74,8 +82,6 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
@ -114,17 +120,34 @@ public:
void shutdown() override;
/// Actions performed after disk creation.
void startup();
/// Restore S3 metadata files on file system.
void restore();
private:
bool tryReserve(UInt64 bytes);
void removeMeta(const String & path, AwsS3KeyKeeper & keys);
void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys);
void removeAws(const AwsS3KeyKeeper & keys);
std::optional<ObjectMetadata> createObjectMetadata(const String & path) const;
Metadata readMeta(const String & path) const;
Metadata createMeta(const String & path) const;
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
String revisionToString(UInt64 revision);
bool checkObjectExists(const String & prefix);
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key);
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void restoreFiles(const String & source_bucket, const String & source_path, UInt64 revision);
void processRestoreFiles(const String & source_bucket, std::vector<String> keys);
void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 revision);
UInt64 extractRevisionFromKey(const String & key);
String extractOperationFromKey(const String & key);
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
@ -140,6 +163,8 @@ private:
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter;
};
}