Ability to backup-restore metadata files for DiskS3 (WIP)

This commit is contained in:
Pavel Kovalenko 2020-12-23 18:11:37 +03:00
parent cc3b5958b0
commit 2848b32af1
3 changed files with 56 additions and 33 deletions

View File

@ -504,17 +504,17 @@ private:
CurrentMetrics::Increment metric_increment;
};
/// Runs tasks asynchronously using global thread pool.
/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
public:
explicit AsyncExecutor() = default;
explicit AsyncExecutor(int thread_pool_size) : pool(ThreadPool(thread_pool_size)) { }
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
GlobalThreadPool::instance().scheduleOrThrowOnError(
pool.scheduleOrThrowOnError(
[promise, task]()
{
try
@ -535,6 +535,9 @@ public:
return promise->get_future();
}
private:
ThreadPool pool;
};
@ -548,8 +551,10 @@ DiskS3::DiskS3(
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_)
: IDisk(std::make_unique<AsyncExecutor>())
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_)
: IDisk(std::make_unique<AsyncExecutor>(thread_pool_size_))
, name(std::move(name_))
, client(std::move(client_))
, proxy_configuration(std::move(proxy_configuration_))
@ -560,6 +565,7 @@ DiskS3::DiskS3(
, max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, send_metadata(send_metadata_)
, list_object_keys_size(list_object_keys_size_)
{
}
@ -727,15 +733,6 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
}
else /// In other case decrement number of references, save metadata and delete file.
{
if (send_metadata)
{
auto revision = ++revision_counter;
const ObjectMetadata object_metadata {
{"path", path}
};
createFileOperationObject("remove", revision, object_metadata);
}
--metadata.ref_count;
metadata.save();
file.remove();
@ -926,7 +923,7 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name);
/// Find last revision.
UInt64 l = 0, r = (static_cast<UInt64>(1)) << 63;
UInt64 l = 0, r = LATEST_REVISION;
while (r - l > 1)
{
auto revision = (r - l) >> 1;
@ -1002,7 +999,7 @@ void DiskS3::copyObject(const String & src_bucket, const String & src_key, const
struct DiskS3::RestoreInformation
{
UInt64 revision = (static_cast<UInt64>(1)) << 63;
UInt64 revision = LATEST_REVISION;
String bucket;
String path;
};
@ -1054,6 +1051,20 @@ void DiskS3::restore()
information.path = s3_root_path;
readRestoreInformation(information);
if (information.revision == 0)
information.revision = LATEST_REVISION;
if (information.bucket == bucket)
{
/// In this case we need to additionally cleanup S3 from objects with later revision.
/// Will be simply just restore to different path.
if (information.path == s3_root_path && information.revision != LATEST_REVISION)
throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS);
/// This case complicates S3 cleanup in case of unsuccessful restore.
if (information.path != s3_root_path && (information.path.starts_with(s3_root_path) || s3_root_path.starts_with(information.path)))
throw Exception("Restoring to the same bucket is allowed only if restore paths are same or not prefixes of each other", ErrorCodes::BAD_ARGUMENTS);
}
///TODO: Cleanup FS and bucket if previous restore was failed.
@ -1122,7 +1133,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
auto path_entry = object_metadata.find("path");
if (path_entry == object_metadata.end())
{
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have path key in metadata", key);
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' key in metadata", key);
continue;
}
@ -1134,11 +1145,13 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
auto relative_key = shrinkKey(source_path, key);
metadata.addObject(relative_key, head_result.GetContentLength());
/// Copy object to bucket configured for current DiskS3 instance.
if (bucket != source_bucket)
/// Copy object if we restore to different bucket / path.
if (bucket != source_bucket || s3_root_path != source_path)
copyObject(source_bucket, key, bucket, s3_root_path + relative_key);
metadata.save();
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored {} file", path);
}
}
@ -1146,12 +1159,12 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name);
/// Disable sending metadata if we restore metadata to the same bucket.
send_metadata = bucket != source_bucket;
/// Enable record file operations if we restore to different bucket / path.
send_metadata = bucket != source_bucket || s3_root_path != source_path;
listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision](auto list_result) {
listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision](auto list_result)
{
const String rename = "rename";
const String remove = "remove";
const String hardlink = "hardlink";
for (const auto & row : list_result.GetContents())
@ -1170,7 +1183,7 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
if (revision > target_revision)
return false;
/// Keep original revision if restore to different bucket.
/// Keep original revision if restore to different bucket / path.
if (send_metadata)
revision_counter = revision - 1;
@ -1180,18 +1193,20 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
auto from_path = object_metadata["from_path"];
auto to_path = object_metadata["to_path"];
if (exists(from_path))
{
moveFile(from_path, to_path);
}
else if (operation == remove)
{
removeIfExists(object_metadata["path"]);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored rename {} -> {}", from_path, to_path);
}
}
else if (operation == hardlink)
{
auto src_path = object_metadata["src_path"];
auto dst_path = object_metadata["dst_path"];
if (exists(src_path))
{
createHardLink(src_path, dst_path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored hardlink {} -> {}", src_path, dst_path);
}
}
}

View File

@ -24,9 +24,6 @@ namespace DB
class DiskS3 : public IDisk
{
public:
/// File contains restore information
const String restore_file = "restore";
using ObjectMetadata = std::map<std::string, std::string>;
friend class DiskS3Reservation;
@ -45,7 +42,9 @@ public:
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
bool send_metadata_);
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_);
const String & getName() const override { return name; }
@ -172,6 +171,13 @@ private:
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter;
static constexpr UInt64 LATEST_REVISION = (static_cast<UInt64>(1)) << 63;
/// File contains restore information
const String restore_file = "restore";
/// The number of keys listed in one request (1000 is max value).
int list_object_keys_size;
/// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
};

View File

@ -150,7 +150,9 @@ void registerDiskS3(DiskFactory & factory)
context.getSettingsRef().s3_min_upload_part_size,
context.getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_object_metadata", false));
config.getBool(config_prefix + ".send_object_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".list_object_keys_size", 1000));
/// This code is used only to check access to the corresponding disk.
if (!config.getBool(config_prefix + ".skip_access_check", false))