Disk S3 possibility to migrate to restorable schema

This commit is contained in:
Pavel Kovalenko 2021-03-19 16:20:19 +03:00
parent a07dfe328f
commit e378c0bf8a
4 changed files with 250 additions and 15 deletions

View File

@ -648,7 +648,7 @@ void DiskS3::moveFile(const String & from_path, const String & to_path)
if (send_metadata) if (send_metadata)
{ {
auto revision = ++revision_counter; auto revision = ++revision_counter;
const DiskS3::ObjectMetadata object_metadata { const ObjectMetadata object_metadata {
{"from_path", from_path}, {"from_path", from_path},
{"to_path", to_path} {"to_path", to_path}
}; };
@ -942,7 +942,16 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name); LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name);
/// Find last revision. if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION)
migrateToRestorableSchema();
findLastRevision();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Disk {} started up", name);
}
void DiskS3::findLastRevision()
{
UInt64 l = 0, r = LATEST_REVISION; UInt64 l = 0, r = LATEST_REVISION;
while (l < r) while (l < r)
{ {
@ -954,8 +963,8 @@ void DiskS3::startup()
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision); LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision);
/// Check file or operation with such revision exists. /// Check file or operation with such revision exists.
if (checkObjectExists(s3_root_path + "r" + revision_str) if (checkObjectExists(bucket, s3_root_path + "r" + revision_str)
|| checkObjectExists(s3_root_path + "operations/r" + revision_str)) || checkObjectExists(bucket, s3_root_path + "operations/r" + revision_str))
l = revision; l = revision;
else else
r = revision - 1; r = revision - 1;
@ -964,10 +973,124 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name); LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name);
} }
bool DiskS3::checkObjectExists(const String & prefix) int DiskS3::readSchemaVersion(const String & source_bucket, const String & source_path)
{
int version = 0;
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
return version;
ReadBufferFromS3 buffer (client, source_bucket, source_path + SCHEMA_VERSION_OBJECT);
readIntText(version, buffer);
return version;
}
void DiskS3::saveSchemaVersion(const int & version)
{
WriteBufferFromS3 buffer (client, bucket, s3_root_path + SCHEMA_VERSION_OBJECT, min_upload_part_size, max_single_part_upload_size);
writeIntText(version, buffer);
buffer.finalize();
}
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
{
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(bucket + "/" + key);
request.SetBucket(bucket);
request.SetKey(key);
request.SetMetadata(metadata);
auto outcome = client->CopyObject(request);
throwIfError(outcome);
}
void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate file {} to restorable schema", metadata_path + path);
auto meta = readMeta(path);
for (const auto & [key, _] : meta.s3_objects)
{
ObjectMetadata metadata {
{"path", path}
};
updateObjectMetadata(s3_root_path + key, metadata);
}
}
void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate directory {} to restorable schema", metadata_path + path);
bool dir_contains_only_files = true;
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
if (isDirectory(it->path()))
{
dir_contains_only_files = false;
break;
}
/// The whole directory can be migrated asynchronously.
if (dir_contains_only_files)
{
auto result = getExecutor().execute([this, path]
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});
results.push_back(std::move(result));
}
else
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
if (!isDirectory(it->path()))
{
auto source_path = it->path();
auto result = getExecutor().execute([this, source_path]
{
migrateFileToRestorableSchema(source_path);
});
results.push_back(std::move(result));
}
else
migrateToRestorableSchemaRecursive(it->path(), results);
}
}
void DiskS3::migrateToRestorableSchema()
{
try
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Start migration to restorable schema for disk {}", name);
Futures results;
migrateToRestorableSchemaRecursive("data/", results);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
catch (const Exception & e)
{
LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to migrate to restorable schema. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
throw;
}
}
bool DiskS3::checkObjectExists(const String & source_bucket, const String & prefix)
{ {
Aws::S3::Model::ListObjectsV2Request request; Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket); request.SetBucket(source_bucket);
request.SetPrefix(prefix); request.SetPrefix(prefix);
request.SetMaxKeys(1); request.SetMaxKeys(1);
@ -1048,7 +1171,7 @@ struct DiskS3::RestoreInformation
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information) void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
{ {
ReadBufferFromFile buffer(metadata_path + restore_file_name, 512); ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512);
buffer.next(); buffer.next();
/// Empty file - just restore all metadata. /// Empty file - just restore all metadata.
@ -1083,7 +1206,7 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
void DiskS3::restore() void DiskS3::restore()
{ {
if (!exists(restore_file_name)) if (!exists(RESTORE_FILE_NAME))
return; return;
try try
@ -1110,15 +1233,21 @@ void DiskS3::restore()
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS); throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
} }
///TODO: Cleanup FS and bucket if previous restore was failed. if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION)
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}", LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}",
name, information.revision, information.source_bucket, information.source_path); name, information.revision, information.source_bucket, information.source_path);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Removing old metadata...");
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path;
removeSharedRecursive("data/", !cleanup_s3);
restoreFiles(information.source_bucket, information.source_path, information.revision); restoreFiles(information.source_bucket, information.source_path, information.revision);
restoreFileOperations(information.source_bucket, information.source_path, information.revision); restoreFileOperations(information.source_bucket, information.source_path, information.revision);
Poco::File restore_file(metadata_path + restore_file_name); Poco::File restore_file(metadata_path + RESTORE_FILE_NAME);
restore_file.remove(); restore_file.remove();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name); LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name);

View File

@ -25,6 +25,7 @@ class DiskS3 : public IDisk
{ {
public: public:
using ObjectMetadata = std::map<std::string, std::string>; using ObjectMetadata = std::map<std::string, std::string>;
using Futures = std::vector<std::future<void>>;
friend class DiskS3Reservation; friend class DiskS3Reservation;
@ -148,7 +149,16 @@ private:
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata); void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
static String revisionToString(UInt64 revision); static String revisionToString(UInt64 revision);
bool checkObjectExists(const String & prefix); bool checkObjectExists(const String & source_bucket, const String & prefix);
void findLastRevision();
int readSchemaVersion(const String & source_bucket, const String & source_path);
void saveSchemaVersion(const int & version);
void updateObjectMetadata(const String & key, const ObjectMetadata & metadata);
void migrateFileToRestorableSchema(const String & path);
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
void migrateToRestorableSchema();
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key); Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key);
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback); void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key); void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
@ -168,7 +178,7 @@ private:
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration; std::shared_ptr<S3::ProxyConfiguration> proxy_configuration;
const String bucket; const String bucket;
const String s3_root_path; const String s3_root_path;
const String metadata_path; String metadata_path;
size_t min_upload_part_size; size_t min_upload_part_size;
size_t max_single_part_upload_size; size_t max_single_part_upload_size;
size_t min_bytes_for_seek; size_t min_bytes_for_seek;
@ -179,16 +189,21 @@ private:
std::mutex reservation_mutex; std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter; std::atomic<UInt64> revision_counter;
static constexpr UInt64 LATEST_REVISION = (static_cast<UInt64>(1)) << 63; static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0; static constexpr UInt64 UNKNOWN_REVISION = 0;
/// File at path {metadata_path}/restore contains metadata restore information /// File at path {metadata_path}/restore contains metadata restore information
const String restore_file_name = "restore"; inline static const String RESTORE_FILE_NAME = "restore";
/// The number of keys listed in one request (1000 is max value) /// The number of keys listed in one request (1000 is max value)
int list_object_keys_size; int list_object_keys_size;
/// Key has format: ../../r{revision}-{operation} /// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"}; const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
/// Object contains information about schema version.
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";
/// Version with possibility to backup-restore metadata.
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
}; };
} }

View File

@ -0,0 +1,35 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/another_data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>false</send_metadata>
<list_object_keys_size>1</list_object_keys_size> <!-- To effectively test restore parallelism -->
<retry_attempts>0</retry_attempts>
</s3>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -1,3 +1,4 @@
import os
import logging import logging
import random import random
import string import string
@ -10,6 +11,20 @@ logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler()) logging.getLogger().addHandler(logging.StreamHandler())
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml')
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def cluster(): def cluster():
try: try:
@ -26,6 +41,10 @@ def cluster():
"configs/config.d/storage_conf_another_bucket_path.xml", "configs/config.d/storage_conf_another_bucket_path.xml",
"configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True) "configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
cluster.add_instance("node_not_restorable", main_configs=[
"configs/config.d/storage_conf_not_restorable.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
logging.info("Starting cluster...") logging.info("Starting cluster...")
cluster.start() cluster.start()
logging.info("Cluster started") logging.info("Cluster started")
@ -103,7 +122,7 @@ def get_revision_counter(node, backup_number):
def drop_table(cluster): def drop_table(cluster):
yield yield
node_names = ["node", "node_another_bucket", "node_another_bucket_path"] node_names = ["node", "node_another_bucket", "node_another_bucket_path", "node_not_restorable"]
for node_name in node_names: for node_name in node_names:
node = cluster.instances[node_name] node = cluster.instances[node_name]
@ -311,3 +330,40 @@ def test_restore_mutations(cluster):
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096)
def test_migrate_to_restorable_schema(cluster):
node = cluster.instances["node_not_restorable"]
create_table(node, "test")
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
replace_config("<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>")
node.restart_clickhouse()
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1)))
node.query("ALTER TABLE s3.test FREEZE")
revision = get_revision_counter(node, 1)
assert revision != 0
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
# Restore to revision before mutation.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data")
node_another_bucket.start_clickhouse(10)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)