Merge pull request #22070 from Jokser/disk-s3-migration

This commit is contained in:
Vladimir 2021-03-31 11:06:40 +03:00 committed by GitHub
commit 57eb5f8772
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 269 additions and 16 deletions

View File

@ -648,7 +648,7 @@ void DiskS3::moveFile(const String & from_path, const String & to_path)
if (send_metadata)
{
auto revision = ++revision_counter;
const DiskS3::ObjectMetadata object_metadata {
const ObjectMetadata object_metadata {
{"from_path", from_path},
{"to_path", to_path}
};
@ -942,20 +942,32 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name);
/// Find last revision.
if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION)
migrateToRestorableSchema();
findLastRevision();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Disk {} started up", name);
}
void DiskS3::findLastRevision()
{
UInt64 l = 0, r = LATEST_REVISION;
while (l < r)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check revision in bounds {}-{}", l, r);
auto revision = l + (r - l + 1) / 2;
if (revision == 0)
break;
auto revision_str = revisionToString(revision);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision);
/// Check file or operation with such revision exists.
if (checkObjectExists(s3_root_path + "r" + revision_str)
|| checkObjectExists(s3_root_path + "operations/r" + revision_str))
if (checkObjectExists(bucket, s3_root_path + "r" + revision_str)
|| checkObjectExists(bucket, s3_root_path + "operations/r" + revision_str))
l = revision;
else
r = revision - 1;
@ -964,10 +976,127 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name);
}
bool DiskS3::checkObjectExists(const String & prefix)
int DiskS3::readSchemaVersion(const String & source_bucket, const String & source_path)
{
int version = 0;
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
return version;
ReadBufferFromS3 buffer (client, source_bucket, source_path + SCHEMA_VERSION_OBJECT);
readIntText(version, buffer);
return version;
}
void DiskS3::saveSchemaVersion(const int & version)
{
WriteBufferFromS3 buffer (client, bucket, s3_root_path + SCHEMA_VERSION_OBJECT, min_upload_part_size, max_single_part_upload_size);
writeIntText(version, buffer);
buffer.finalize();
}
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
{
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(bucket + "/" + key);
request.SetBucket(bucket);
request.SetKey(key);
request.SetMetadata(metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
auto outcome = client->CopyObject(request);
throwIfError(outcome);
}
void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate file {} to restorable schema", metadata_path + path);
auto meta = readMeta(path);
for (const auto & [key, _] : meta.s3_objects)
{
ObjectMetadata metadata {
{"path", path}
};
updateObjectMetadata(s3_root_path + key, metadata);
}
}
void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate directory {} to restorable schema", metadata_path + path);
bool dir_contains_only_files = true;
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (isDirectory(it->path()))
{
dir_contains_only_files = false;
break;
}
/// The whole directory can be migrated asynchronously.
if (dir_contains_only_files)
{
auto result = getExecutor().execute([this, path]
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});
results.push_back(std::move(result));
}
else
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (!isDirectory(it->path()))
{
auto source_path = it->path();
auto result = getExecutor().execute([this, source_path]
{
migrateFileToRestorableSchema(source_path);
});
results.push_back(std::move(result));
}
else
migrateToRestorableSchemaRecursive(it->path(), results);
}
}
void DiskS3::migrateToRestorableSchema()
{
try
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Start migration to restorable schema for disk {}", name);
Futures results;
for (const auto & root : data_roots)
if (exists(root))
migrateToRestorableSchemaRecursive(root + '/', results);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
catch (const Exception & e)
{
LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to migrate to restorable schema. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
throw;
}
}
bool DiskS3::checkObjectExists(const String & source_bucket, const String & prefix)
{
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetBucket(source_bucket);
request.SetPrefix(prefix);
request.SetMaxKeys(1);
@ -1048,7 +1177,7 @@ struct DiskS3::RestoreInformation
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
{
ReadBufferFromFile buffer(metadata_path + restore_file_name, 512);
ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512);
buffer.next();
/// Empty file - just restore all metadata.
@ -1083,7 +1212,7 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
void DiskS3::restore()
{
if (!exists(restore_file_name))
if (!exists(RESTORE_FILE_NAME))
return;
try
@ -1110,17 +1239,27 @@ void DiskS3::restore()
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
}
///TODO: Cleanup FS and bucket if previous restore was failed.
if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION)
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}",
name, information.revision, information.source_bucket, information.source_path);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Removing old metadata...");
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path;
for (const auto & root : data_roots)
if (exists(root))
removeSharedRecursive(root + '/', !cleanup_s3);
restoreFiles(information.source_bucket, information.source_path, information.revision);
restoreFileOperations(information.source_bucket, information.source_path, information.revision);
Poco::File restore_file(metadata_path + restore_file_name);
Poco::File restore_file(metadata_path + RESTORE_FILE_NAME);
restore_file.remove();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name);
}
catch (const Exception & e)
@ -1186,7 +1325,11 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
/// Restore file if object has 'path' in metadata.
auto path_entry = object_metadata.find("path");
if (path_entry == object_metadata.end())
throw Exception("Failed to restore key " + key + " because it doesn't have 'path' in metadata", ErrorCodes::S3_ERROR);
{
/// Such keys can remain after migration, we can skip them.
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' in metadata", key);
continue;
}
const auto & path = path_entry->second;

View File

@ -25,6 +25,7 @@ class DiskS3 : public IDisk
{
public:
using ObjectMetadata = std::map<std::string, std::string>;
using Futures = std::vector<std::future<void>>;
friend class DiskS3Reservation;
@ -149,7 +150,16 @@ private:
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
static String revisionToString(UInt64 revision);
bool checkObjectExists(const String & prefix);
bool checkObjectExists(const String & source_bucket, const String & prefix);
void findLastRevision();
int readSchemaVersion(const String & source_bucket, const String & source_path);
void saveSchemaVersion(const int & version);
void updateObjectMetadata(const String & key, const ObjectMetadata & metadata);
void migrateFileToRestorableSchema(const String & path);
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
void migrateToRestorableSchema();
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key);
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
@ -169,7 +179,7 @@ private:
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration;
const String bucket;
const String s3_root_path;
const String metadata_path;
String metadata_path;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;
@ -180,16 +190,23 @@ private:
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter;
static constexpr UInt64 LATEST_REVISION = (static_cast<UInt64>(1)) << 63;
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0;
/// File at path {metadata_path}/restore contains metadata restore information
const String restore_file_name = "restore";
inline static const String RESTORE_FILE_NAME = "restore";
/// The number of keys listed in one request (1000 is max value)
int list_object_keys_size;
/// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
/// Object contains information about schema version.
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";
/// Version with possibility to backup-restore metadata.
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
/// Directories with data.
const std::vector<String> data_roots {"data", "store"};
};
}

View File

@ -0,0 +1,35 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/another_data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>false</send_metadata>
<list_object_keys_size>1</list_object_keys_size> <!-- To effectively test restore parallelism -->
<retry_attempts>0</retry_attempts>
</s3>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -1,3 +1,4 @@
import os
import logging
import random
import string
@ -10,6 +11,20 @@ logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml')
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
@pytest.fixture(scope="module")
def cluster():
try:
@ -26,6 +41,10 @@ def cluster():
"configs/config.d/storage_conf_another_bucket_path.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
cluster.add_instance("node_not_restorable", main_configs=[
"configs/config.d/storage_conf_not_restorable.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -75,6 +94,8 @@ def create_table(node, table_name, additional_settings=None):
def purge_s3(cluster, bucket):
minio = cluster.minio_client
for obj in list(minio.list_objects(bucket, recursive=True)):
if str(obj.object_name).find(".SCHEMA_VERSION") != -1:
continue
minio.remove_object(bucket, obj.object_name)
@ -103,7 +124,7 @@ def get_revision_counter(node, backup_number):
def drop_table(cluster):
yield
node_names = ["node", "node_another_bucket", "node_another_bucket_path"]
node_names = ["node", "node_another_bucket", "node_another_bucket_path", "node_not_restorable"]
for node_name in node_names:
node = cluster.instances[node_name]
@ -311,3 +332,40 @@ def test_restore_mutations(cluster):
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096)
def test_migrate_to_restorable_schema(cluster):
node = cluster.instances["node_not_restorable"]
create_table(node, "test")
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
replace_config("<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>")
node.restart_clickhouse()
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1)))
node.query("ALTER TABLE s3.test FREEZE")
revision = get_revision_counter(node, 1)
assert revision != 0
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
# Restore to revision before mutation.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data")
node_another_bucket.start_clickhouse(10)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)