DiskS3 restore file format change and minor improvements.

This commit is contained in:
Pavel Kovalenko 2021-04-16 17:43:16 +03:00
parent 84a420b806
commit 760a2ccedf
3 changed files with 72 additions and 51 deletions

View File

@ -6,6 +6,7 @@
#include <random>
#include <optional>
#include <utility>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
@ -954,6 +955,7 @@ void DiskS3::findLastRevision()
{
/// Construct revision number from high to low bits.
String revision;
revision.reserve(64);
for (int bit = 0; bit < 64; bit++)
{
auto revision_prefix = revision + "1";
@ -1176,39 +1178,50 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512);
buffer.next();
/// Empty file - just restore all metadata.
if (!buffer.hasPendingData())
return;
try
{
readIntText(restore_information.revision, buffer);
assertChar('\n', buffer);
std::map<String, String> properties;
if (!buffer.hasPendingData())
return;
while (buffer.hasPendingData())
{
String property;
readText(property, buffer);
assertChar('\n', buffer);
readText(restore_information.source_bucket, buffer);
assertChar('\n', buffer);
auto pos = property.find('=');
if (pos == String::npos || pos == 0 || pos == property.length())
throw Exception(fmt::format("Invalid property {} in restore file", property), ErrorCodes::UNKNOWN_FORMAT);
if (!buffer.hasPendingData())
return;
auto key = property.substr(0, pos);
auto value = property.substr(pos + 1);
readText(restore_information.source_path, buffer);
assertChar('\n', buffer);
auto it = properties.find(key);
if (it != properties.end())
throw Exception(fmt::format("Property key duplication {} in restore file", key), ErrorCodes::UNKNOWN_FORMAT);
if (!buffer.hasPendingData())
return;
properties[key] = value;
}
readBoolTextWord(restore_information.detached, buffer);
assertChar('\n', buffer);
for (const auto & [key, value] : properties)
{
ReadBufferFromString value_buffer (value);
if (buffer.hasPendingData())
throw Exception("Extra information at the end of restore file", ErrorCodes::UNKNOWN_FORMAT);
if (key == "revision")
readIntText(restore_information.revision, value_buffer);
else if (key == "source_bucket")
readText(restore_information.source_bucket, value_buffer);
else if (key == "source_path")
readText(restore_information.source_path, value_buffer);
else if (key == "detached")
readBoolTextWord(restore_information.detached, value_buffer);
else
throw Exception(fmt::format("Unknown key {} in restore file", key), ErrorCodes::UNKNOWN_FORMAT);
}
}
catch (const Exception & e)
catch (const Exception &)
{
throw Exception("Failed to read restore information", e, ErrorCodes::UNKNOWN_FORMAT);
tryLogCurrentException(log, "Failed to read restore information");
throw;
}
}
@ -1241,12 +1254,12 @@ void DiskS3::restore()
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
}
if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION)
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
LOG_INFO(log, "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}",
name, information.revision, information.source_bucket, information.source_path);
if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION)
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
LOG_INFO(log, "Removing old metadata...");
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path;
@ -1254,8 +1267,8 @@ void DiskS3::restore()
if (exists(root))
removeSharedRecursive(root + '/', !cleanup_s3);
restoreFiles(information.source_bucket, information.source_path, information.revision);
restoreFileOperations(information.source_bucket, information.source_path, information.revision, information.detached);
restoreFiles(information);
restoreFileOperations(information);
Poco::File restore_file(metadata_path + RESTORE_FILE_NAME);
restore_file.remove();
@ -1272,12 +1285,12 @@ void DiskS3::restore()
}
}
void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision)
void DiskS3::restoreFiles(const RestoreInformation & restore_information)
{
LOG_INFO(log, "Starting restore files for disk {}", name);
std::vector<std::future<void>> results;
listObjects(source_bucket, source_path, [this, &source_bucket, &source_path, &target_revision, &results](auto list_result)
auto restore_files = [this, &restore_information, &results](auto list_result)
{
std::vector<String> keys;
for (const auto & row : list_result.GetContents())
@ -1290,7 +1303,7 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
const auto [revision, _] = extractRevisionAndOperationFromKey(key);
/// Filter early if it's possible to get revision from key.
if (revision > target_revision)
if (revision > restore_information.revision)
continue;
keys.push_back(key);
@ -1298,16 +1311,19 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
if (!keys.empty())
{
auto result = getExecutor().execute([this, &source_bucket, &source_path, keys]()
auto result = getExecutor().execute([this, &restore_information, keys]()
{
processRestoreFiles(source_bucket, source_path, keys);
processRestoreFiles(restore_information.source_bucket, restore_information.source_path, keys);
});
results.push_back(std::move(result));
}
return true;
});
};
/// Execute.
listObjects(restore_information.source_bucket, restore_information.source_path, restore_files);
for (auto & result : results)
result.wait();
@ -1350,16 +1366,15 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
}
}
void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision, bool detached)
void DiskS3::restoreFileOperations(const RestoreInformation & restore_information)
{
LOG_INFO(log, "Starting restore file operations for disk {}", name);
/// Enable recording file operations if we restore to different bucket / path.
send_metadata = bucket != source_bucket || s3_root_path != source_path;
send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path;
std::set<String> renames;
listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision, &detached, &renames](auto list_result)
auto restore_file_operations = [this, &restore_information, &renames](auto list_result)
{
const String rename = "rename";
const String hardlink = "hardlink";
@ -1377,14 +1392,14 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
/// S3 ensures that keys will be listed in ascending UTF-8 bytes order (revision order).
/// We can stop processing if revision of the object is already more than required.
if (revision > target_revision)
if (revision > restore_information.revision)
return false;
/// Keep original revision if restore to different bucket / path.
if (send_metadata)
revision_counter = revision - 1;
auto object_metadata = headObject(source_bucket, key).GetMetadata();
auto object_metadata = headObject(restore_information.source_bucket, key).GetMetadata();
if (operation == rename)
{
auto from_path = object_metadata["from_path"];
@ -1394,7 +1409,7 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
moveFile(from_path, to_path);
LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
if (detached && isDirectory(to_path))
if (restore_information.detached && isDirectory(to_path))
{
/// Sometimes directory paths are passed without trailing '/'. We should keep them in one consistent way.
if (!from_path.ends_with('/'))
@ -1425,9 +1440,12 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
}
return true;
});
};
if (detached)
/// Execute.
listObjects(restore_information.source_bucket, restore_information.source_path + "operations/", restore_file_operations);
if (restore_information.detached)
{
Strings not_finished_prefixes{"tmp_", "delete_tmp_", "attaching_", "deleting_"};

View File

@ -167,9 +167,9 @@ private:
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision);
void restoreFiles(const RestoreInformation & restore_information);
void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector<String> keys);
void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision, bool detached);
void restoreFileOperations(const RestoreInformation & restore_information);
/// Remove 'path' prefix from 'key' to get relative key.
/// It's needed to store keys to metadata files in RELATIVE_PATHS version.
@ -204,7 +204,7 @@ private:
int list_object_keys_size;
/// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\w+)-(\\w+).*"};
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
/// Object contains information about schema version.
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";

View File

@ -106,15 +106,18 @@ def drop_shadow_information(node):
node.exec_in_container(['bash', '-c', 'rm -rf /var/lib/clickhouse/shadow/*'], user='root')
def create_restore_file(node, revision=0, bucket=None, path=None, detached=None):
add_restore_option = 'echo -en "{}\n" >> /var/lib/clickhouse/disks/s3/restore'
node.exec_in_container(['bash', '-c', add_restore_option.format(revision)], user='root')
def create_restore_file(node, revision=None, bucket=None, path=None, detached=None):
node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root')
add_restore_option = 'echo -en "{}={}\n" >> /var/lib/clickhouse/disks/s3/restore'
if revision:
node.exec_in_container(['bash', '-c', add_restore_option.format('revision', revision)], user='root')
if bucket:
node.exec_in_container(['bash', '-c', add_restore_option.format(bucket)], user='root')
node.exec_in_container(['bash', '-c', add_restore_option.format('source_bucket', bucket)], user='root')
if path:
node.exec_in_container(['bash', '-c', add_restore_option.format(path)], user='root')
node.exec_in_container(['bash', '-c', add_restore_option.format('source_path', path)], user='root')
if detached:
node.exec_in_container(['bash', '-c', add_restore_option.format('true')], user='root')
node.exec_in_container(['bash', '-c', add_restore_option.format('detached', 'true')], user='root')
def get_revision_counter(node, backup_number):