Ability to backup-restore metadata files for DiskS3 (WIP)

This commit is contained in:
Pavel Kovalenko 2020-12-23 15:35:52 +03:00
parent 18fe1c796b
commit cc3b5958b0
8 changed files with 171 additions and 91 deletions

View File

@ -139,7 +139,7 @@ DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimate
{
try
{
auto dir_path = getDirectoryPath(path);
auto dir_path = directoryPath(path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
@ -182,7 +182,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write file {} to cache", backQuote(path));
auto dir_path = getDirectoryPath(path);
auto dir_path = directoryPath(path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
@ -217,7 +217,7 @@ void DiskCacheWrapper::moveFile(const String & from_path, const String & to_path
{
if (cache_disk->exists(from_path))
{
auto dir_path = getDirectoryPath(to_path);
auto dir_path = directoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
@ -230,7 +230,7 @@ void DiskCacheWrapper::replaceFile(const String & from_path, const String & to_p
{
if (cache_disk->exists(from_path))
{
auto dir_path = getDirectoryPath(to_path);
auto dir_path = directoryPath(to_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
@ -257,7 +257,7 @@ void DiskCacheWrapper::createHardLink(const String & src_path, const String & ds
{
if (cache_disk->exists(src_path))
{
auto dir_path = getDirectoryPath(dst_path);
auto dir_path = directoryPath(dst_path);
if (!cache_disk->exists(dir_path))
cache_disk->createDirectories(dir_path);
@ -278,11 +278,6 @@ void DiskCacheWrapper::createDirectories(const String & path)
DiskDecorator::createDirectories(path);
}
inline String DiskCacheWrapper::getDirectoryPath(const String & path)
{
return Poco::Path{path}.setFileName("").toString();
}
/// TODO: Current reservation mechanism leaks IDisk abstraction details.
/// This hack is needed to return proper disk pointer (wrapper instead of implementation) from reservation object.
class ReservationDelegate : public IReservation

View File

@ -43,7 +43,6 @@ public:
private:
std::shared_ptr<FileDownloadMetadata> acquireDownloadMetadata(const String & path) const;
static String getDirectoryPath(const String & path);
/// Disk to cache files.
std::shared_ptr<DiskLocal> cache_disk;

View File

@ -103,11 +103,6 @@ void DiskDecorator::replaceFile(const String & from_path, const String & to_path
delegate->replaceFile(from_path, to_path);
}
void DiskDecorator::copyFile(const String & from_path, const String & to_path)
{
delegate->copyFile(from_path, to_path);
}
void DiskDecorator::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
delegate->copy(from_path, to_disk, to_path);

View File

@ -32,7 +32,6 @@ public:
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase>

View File

@ -262,4 +262,11 @@ inline String fileName(const String & path)
{
return Poco::Path(path).getFileName();
}
/// Return directory path for the specified path.
inline String directoryPath(const String & path)
{
return Poco::Path(path).setFileName("").toString();
}
}

View File

@ -41,6 +41,7 @@ namespace ErrorCodes
extern const int INCORRECT_DISK_INDEX;
extern const int NOT_IMPLEMENTED;
extern const int PATH_ACCESS_DENIED;
extern const int LOGICAL_ERROR;
}
@ -849,7 +850,8 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
{
if (send_metadata)
/// We don't need to record hardlinks created to shadow folder.
if (send_metadata && dst_path.find("/shadow/") != String::npos)
{
auto revision = ++revision_counter;
const ObjectMetadata object_metadata {
@ -910,7 +912,7 @@ void DiskS3::shutdown()
void DiskS3::createFileOperationObject(const String & operation_name, UInt64 revision, const DiskS3::ObjectMetadata & metadata)
{
const String key = "meta/r" + revisionToString(revision) + "-" + operation_name;
const String key = "operations/r" + revisionToString(revision) + "-" + operation_name;
WriteBufferFromS3 buffer(client, bucket, s3_root_path + key, min_upload_part_size, max_single_part_upload_size, metadata);
buffer.write('0');
buffer.finalize();
@ -929,9 +931,9 @@ void DiskS3::startup()
{
auto revision = (r - l) >> 1;
auto revision_str = revisionToString(revision);
/// Check that object or metaobject with such revision exists.
/// Check that file or operation with such revision exists.
if (checkObjectExists(s3_root_path + "r" + revision_str)
|| checkObjectExists(s3_root_path + "meta/r" + revision_str))
|| checkObjectExists(s3_root_path + "operations/r" + revision_str))
l = revision;
else
r = revision;
@ -953,25 +955,6 @@ bool DiskS3::checkObjectExists(const String & prefix)
return !outcome.GetResult().GetContents().empty();
}
struct DiskS3::RestoreInformation
{
UInt64 revision = (static_cast<UInt64>(1)) << 63;
String bucket;
String path;
};
void DiskS3::restore()
{
if (!exists(restore_file))
return;
RestoreInformation information;
///TODO: read restore information from restore_file.
restoreFiles(information.bucket, information.path, information.revision);
restoreFileOperations(information.bucket, information.path, information.revision);
}
Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket, const String & key)
{
Aws::S3::Model::HeadObjectRequest request;
@ -1006,24 +989,102 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
} while (outcome.GetResult().GetIsTruncated());
}
void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 revision)
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key)
{
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
auto outcome = client->CopyObject(request);
throwIfError(outcome);
}
struct DiskS3::RestoreInformation
{
UInt64 revision = (static_cast<UInt64>(1)) << 63;
String bucket;
String path;
};
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
{
ReadBufferFromFile buffer(metadata_path + restore_file, 512);
buffer.next();
/// Empty file - just restore all metadata.
if (!buffer.hasPendingData())
return;
try
{
readIntText(restore_information.revision, buffer);
assertChar('\n', buffer);
if (!buffer.hasPendingData())
return;
readText(restore_information.bucket, buffer);
assertChar('\n', buffer);
if (!buffer.hasPendingData())
return;
readText(restore_information.path, buffer);
assertChar('\n', buffer);
if (buffer.hasPendingData())
throw Exception("Extra information at the end of restore file", ErrorCodes::UNKNOWN_FORMAT);
}
catch (const Exception & e)
{
throw Exception("Failed to read restore information", e, ErrorCodes::UNKNOWN_FORMAT);
}
}
void DiskS3::restore()
{
if (!exists(restore_file))
return;
try
{
RestoreInformation information;
information.bucket = bucket;
information.path = s3_root_path;
readRestoreInformation(information);
///TODO: Cleanup FS and bucket if previous restore was failed.
restoreFiles(information.bucket, information.path, information.revision);
restoreFileOperations(information.bucket, information.path, information.revision);
}
catch (const Exception & e)
{
throw Exception("Failed to restore disk: " + name, e, ErrorCodes::LOGICAL_ERROR);
}
}
void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision)
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore files for disk {}", name);
std::vector<std::future<void>> results;
listObjects(source_bucket, source_path, [this, &source_bucket, &revision, &results](auto list_result) {
listObjects(source_bucket, source_path, [this, &source_bucket, &source_path, &target_revision, &results](auto list_result)
{
std::vector<String> keys;
for (const auto & row : list_result.GetContents())
{
const String & key = row.GetKey();
/// Skip meta objects. They will be processed separately.
if (key.find("/meta/") != String::npos)
/// Skip file operations objects. They will be processed separately.
if (key.find("/operations/") != String::npos)
continue;
auto [revision, _] = extractRevisionAndOperationFromKey(key);
/// Filter early if it's possible to get revision from key.
if (extractRevisionFromKey(key) > revision)
if (revision > target_revision)
continue;
keys.push_back(key);
@ -1031,7 +1092,9 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
if (!keys.empty())
{
auto result = getExecutor().execute([this, &source_bucket, keys]() { processRestoreFiles(source_bucket, keys);
auto result = getExecutor().execute([this, &source_bucket, &source_path, keys]()
{
processRestoreFiles(source_bucket, source_path, keys);
});
results.push_back(std::move(result));
@ -1048,50 +1111,45 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
LOG_INFO(&Poco::Logger::get("DiskS3"), "Files are restored for disk {}", name);
}
inline String getDirectoryPath(const String & path)
{
return Poco::Path{path}.setFileName("").toString();
}
void DiskS3::processRestoreFiles(const String & source_bucket, Strings keys)
void DiskS3::processRestoreFiles(const String & source_bucket, const String & source_path, Strings keys)
{
for (const auto & key : keys)
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(source_bucket);
request.SetKey(key);
auto outcome = client->HeadObject(request);
throwIfError(outcome);
auto object_metadata = outcome.GetResult().GetMetadata();
auto head_result = headObject(source_bucket, key);
auto object_metadata = head_result.GetMetadata();
/// If object has 'path' in metadata then restore it.
auto path = object_metadata.find("path");
if (path == object_metadata.end())
auto path_entry = object_metadata.find("path");
if (path_entry == object_metadata.end())
{
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have path key in metadata", key);
continue;
}
createDirectories(getDirectoryPath(path->second));
auto metadata = createMeta(path->second);
const auto & path = path_entry->second;
/// TODO: shrink common prefix of s3_root_path and key.
auto relative_key = key;
metadata.addObject(relative_key, outcome.GetResult().GetContentLength());
createDirectories(directoryPath(path));
auto metadata = createMeta(path);
/// TODO: Copy object to configured bucket if source_bucket is different.
auto relative_key = shrinkKey(source_path, key);
metadata.addObject(relative_key, head_result.GetContentLength());
/// Copy object to bucket configured for current DiskS3 instance.
if (bucket != source_bucket)
copyObject(source_bucket, key, bucket, s3_root_path + relative_key);
metadata.save();
}
}
void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 revision)
void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision)
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name);
/// Temporarily disable sending metadata.
send_metadata = false;
/// Disable sending metadata if we restore metadata to the same bucket.
send_metadata = bucket != source_bucket;
listObjects(source_bucket, source_path + "meta/", [this, &source_bucket, &revision](auto list_result) {
listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision](auto list_result) {
const String rename = "rename";
const String remove = "remove";
const String hardlink = "hardlink";
@ -1100,12 +1158,22 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
{
const String & key = row.GetKey();
auto [revision, operation] = extractRevisionAndOperationFromKey(key);
if (revision == 0)
{
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} with unknown revision", revision);
continue;
}
/// Stop processing when get revision more than required.
/// S3 ensures that keys will be listed in ascending UTF-8 bytes order.
if (extractRevisionFromKey(key) > revision)
if (revision > target_revision)
return false;
auto operation = extractOperationFromKey(key);
/// Keep original revision if restore to different bucket.
if (send_metadata)
revision_counter = revision - 1;
auto object_metadata = headObject(source_bucket, key).GetMetadata();
if (operation == rename)
{
@ -1122,8 +1190,7 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
{
auto src_path = object_metadata["src_path"];
auto dst_path = object_metadata["dst_path"];
/// Skip hardlinks to shadow (backup) directory.
if (exists(src_path) && dst_path.find("/shadow/") != String::npos)
if (exists(src_path))
createHardLink(src_path, dst_path);
}
}
@ -1136,21 +1203,27 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
LOG_INFO(&Poco::Logger::get("DiskS3"), "File operations restored for disk {}", name);
}
UInt64 DiskS3::extractRevisionFromKey(const String & key)
std::tuple<UInt64, String> DiskS3::extractRevisionAndOperationFromKey(const String & key)
{
/// TODO: Implement.
return 0;
UInt64 revision = 0;
String operation;
re2::RE2::FullMatch(key, key_regexp, &revision, &operation);
return {revision, operation};
}
String DiskS3::extractOperationFromKey(const String & key)
String DiskS3::shrinkKey(const String & path, const String & key)
{
/// TODO: Implement.
return "";
if (!key.starts_with(path))
throw Exception("The key " + key + " prefix mismatch with given " + path, ErrorCodes::LOGICAL_ERROR);
return key.substr(path.length());
}
String DiskS3::revisionToString(UInt64 revision)
{
static constexpr size_t max_digits = 19;
static constexpr size_t max_digits = 19; /// UInt64 max digits in decimal representation.
/// Align revision number with leading zeroes to have strict lexicographical order of them.
auto revision_str = std::to_string(revision);

View File

@ -10,6 +10,7 @@
#include <aws/s3/model/ListObjectsV2Result.h>
#include <Poco/DirectoryIterator.h>
#include <re2/re2.h>
namespace DB
@ -137,16 +138,22 @@ private:
Metadata createMeta(const String & path) const;
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
String revisionToString(UInt64 revision);
bool checkObjectExists(const String & prefix);
static String revisionToString(UInt64 revision);
bool checkObjectExists(const String & prefix);
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key);
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void restoreFiles(const String & source_bucket, const String & source_path, UInt64 revision);
void processRestoreFiles(const String & source_bucket, std::vector<String> keys);
void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 revision);
UInt64 extractRevisionFromKey(const String & key);
String extractOperationFromKey(const String & key);
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision);
void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector<String> keys);
void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision);
/// Remove 'path' prefix from 'key' to get relative key.
/// It's needed to store keys to metadata files in RELATIVE_PATHS version.
static String shrinkKey(const String & path, const String & key);
std::tuple<UInt64, String> extractRevisionAndOperationFromKey(const String & key);
private:
const String name;
@ -165,6 +172,8 @@ private:
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter;
/// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
};
}

View File

@ -160,6 +160,9 @@ void registerDiskS3(DiskFactory & factory)
checkRemoveAccess(*s3disk);
}
s3disk->restore();
s3disk->startup();
bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true);
if (cache_enabled)