Merge pull request #23112 from Jokser/disk-s3-restore-to-detached

This commit is contained in:
Vladimir 2021-04-19 10:29:00 +03:00 committed by GitHub
commit e4e2b04c53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 319 additions and 172 deletions

View File

@ -1,7 +1,6 @@
#include "DiskCacheWrapper.h"
#include <IO/copyData.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
#include <condition_variable>
namespace DB
@ -114,7 +113,7 @@ DiskCacheWrapper::readFile(
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Read file {} from cache", backQuote(path));
LOG_DEBUG(log, "Read file {} from cache", backQuote(path));
if (cache_disk->exists(path))
return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
@ -128,11 +127,11 @@ DiskCacheWrapper::readFile(
{
/// This thread will responsible for file downloading to cache.
metadata->status = DOWNLOADING;
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "File {} doesn't exist in cache. Will download it", backQuote(path));
LOG_DEBUG(log, "File {} doesn't exist in cache. Will download it", backQuote(path));
}
else if (metadata->status == DOWNLOADING)
{
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Waiting for file {} download to cache", backQuote(path));
LOG_DEBUG(log, "Waiting for file {} download to cache", backQuote(path));
metadata->condition.wait(lock, [metadata] { return metadata->status == DOWNLOADED || metadata->status == ERROR; });
}
}
@ -157,7 +156,7 @@ DiskCacheWrapper::readFile(
}
cache_disk->moveFile(tmp_path, path);
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "File {} downloaded to cache", backQuote(path));
LOG_DEBUG(log, "File {} downloaded to cache", backQuote(path));
}
catch (...)
{
@ -186,7 +185,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
if (!cache_file_predicate(path))
return DiskDecorator::writeFile(path, buf_size, mode);
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Write file {} to cache", backQuote(path));
LOG_DEBUG(log, "Write file {} to cache", backQuote(path));
auto dir_path = directoryPath(path);
if (!cache_disk->exists(dir_path))

View File

@ -1,6 +1,7 @@
#pragma once
#include <unordered_map>
#include <common/logger_useful.h>
#include "DiskDecorator.h"
#include "DiskLocal.h"
@ -63,6 +64,8 @@ private:
mutable std::unordered_map<String, std::weak_ptr<FileDownloadMetadata>> file_downloads;
/// Protects concurrent downloading files to cache.
mutable std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("DiskCache");
};
}

View File

@ -2,13 +2,13 @@
#include <Common/createHardLink.h>
#include "DiskFactory.h"
#include <Disks/LocalDirectorySyncGuard.h>
#include <Interpreters/Context.h>
#include <Common/filesystemHelpers.h>
#include <Common/quoteString.h>
#include <Disks/LocalDirectorySyncGuard.h>
#include <IO/createReadBufferFromFileBase.h>
#include <common/logger_useful.h>
#include <unistd.h>
@ -96,7 +96,7 @@ bool DiskLocal::tryReserve(UInt64 bytes)
std::lock_guard lock(DiskLocal::reservation_mutex);
if (bytes == 0)
{
LOG_DEBUG(&Poco::Logger::get("DiskLocal"), "Reserving 0 bytes on disk {}", backQuote(name));
LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name));
++reservation_count;
return true;
}
@ -105,7 +105,7 @@ bool DiskLocal::tryReserve(UInt64 bytes)
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(&Poco::Logger::get("DiskLocal"), "Reserving {} on disk {}, having unreserved {}.",
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
@ -339,7 +339,7 @@ DiskLocalReservation::~DiskLocalReservation()
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
@ -347,7 +347,7 @@ DiskLocalReservation::~DiskLocalReservation()
}
if (disk->reservation_count == 0)
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <common/logger_useful.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
@ -115,6 +116,8 @@ private:
UInt64 reservation_count = 0;
static std::mutex reservation_mutex;
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
};
}

View File

@ -2,9 +2,11 @@
#include "Disks/DiskFactory.h"
#include <bitset>
#include <random>
#include <optional>
#include <utility>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
@ -18,7 +20,6 @@
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
@ -491,7 +492,7 @@ public:
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName());
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
@ -499,7 +500,7 @@ public:
}
if (disk->reservation_count == 0)
LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName());
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
@ -535,7 +536,7 @@ public:
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get("DiskS3"), "Failed to run async task");
tryLogCurrentException("DiskS3", "Failed to run async task");
try
{
@ -675,7 +676,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
{
auto metadata = readMeta(path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
@ -711,7 +712,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
LOG_DEBUG(log, "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, object_metadata, min_upload_part_size, max_single_part_upload_size, buf_size);
@ -720,7 +721,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
{
auto metadata = readMeta(path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
LOG_DEBUG(log, "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
return std::make_unique<WriteIndirectBufferFromS3>(
@ -730,7 +731,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path));
LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path));
Poco::File file(metadata_path + path);
@ -762,7 +763,7 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
{
LOG_WARNING(
&Poco::Logger::get("DiskS3"),
log,
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
backQuote(path),
e.nested() ? e.nested()->message() : e.message());
@ -846,7 +847,7 @@ bool DiskS3::tryReserve(UInt64 bytes)
std::lock_guard lock(reservation_mutex);
if (bytes == 0)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Reserving 0 bytes on s3 disk {}", backQuote(name));
LOG_DEBUG(log, "Reserving 0 bytes on s3 disk {}", backQuote(name));
++reservation_count;
return true;
}
@ -855,7 +856,7 @@ bool DiskS3::tryReserve(UInt64 bytes)
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Reserving {} on disk {}, having unreserved {}.",
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
@ -940,40 +941,36 @@ void DiskS3::startup()
if (!send_metadata)
return;
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name);
LOG_INFO(log, "Starting up disk {}", name);
if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION)
migrateToRestorableSchema();
findLastRevision();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Disk {} started up", name);
LOG_INFO(log, "Disk {} started up", name);
}
void DiskS3::findLastRevision()
{
UInt64 l = 0, r = LATEST_REVISION;
while (l < r)
/// Construct revision number from high to low bits.
String revision;
revision.reserve(64);
for (int bit = 0; bit < 64; bit++)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check revision in bounds {}-{}", l, r);
auto revision_prefix = revision + "1";
auto revision = l + (r - l + 1) / 2;
if (revision == 0)
break;
LOG_DEBUG(log, "Check object exists with revision prefix {}", revision_prefix);
auto revision_str = revisionToString(revision);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision);
/// Check file or operation with such revision exists.
if (checkObjectExists(bucket, s3_root_path + "r" + revision_str)
|| checkObjectExists(bucket, s3_root_path + "operations/r" + revision_str))
l = revision;
/// Check file or operation with such revision prefix exists.
if (checkObjectExists(bucket, s3_root_path + "r" + revision_prefix)
|| checkObjectExists(bucket, s3_root_path + "operations/r" + revision_prefix))
revision += "1";
else
r = revision - 1;
revision += "0";
}
revision_counter = l;
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name);
revision_counter = static_cast<UInt64>(std::bitset<64>(revision).to_ullong());
LOG_INFO(log, "Found last revision number {} for disk {}", revision_counter, name);
}
int DiskS3::readSchemaVersion(const String & source_bucket, const String & source_path)
@ -1010,7 +1007,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate file {} to restorable schema", metadata_path + path);
LOG_DEBUG(log, "Migrate file {} to restorable schema", metadata_path + path);
auto meta = readMeta(path);
@ -1027,7 +1024,7 @@ void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & r
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate directory {} to restorable schema", metadata_path + path);
LOG_DEBUG(log, "Migrate directory {} to restorable schema", metadata_path + path);
bool dir_contains_only_files = true;
for (auto it = iterateDirectory(path); it->isValid(); it->next())
@ -1070,7 +1067,7 @@ void DiskS3::migrateToRestorableSchema()
{
try
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Start migration to restorable schema for disk {}", name);
LOG_INFO(log, "Start migration to restorable schema for disk {}", name);
Futures results;
@ -1085,9 +1082,9 @@ void DiskS3::migrateToRestorableSchema()
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
catch (const Exception & e)
catch (const Exception &)
{
LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to migrate to restorable schema. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
tryLogCurrentException(log, fmt::format("Failed to migrate to restorable schema for disk {}", name));
throw;
}
@ -1173,6 +1170,7 @@ struct DiskS3::RestoreInformation
UInt64 revision = LATEST_REVISION;
String source_bucket;
String source_path;
bool detached = false;
};
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
@ -1180,33 +1178,50 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512);
buffer.next();
/// Empty file - just restore all metadata.
if (!buffer.hasPendingData())
return;
try
{
readIntText(restore_information.revision, buffer);
assertChar('\n', buffer);
std::map<String, String> properties;
if (!buffer.hasPendingData())
return;
while (buffer.hasPendingData())
{
String property;
readText(property, buffer);
assertChar('\n', buffer);
readText(restore_information.source_bucket, buffer);
assertChar('\n', buffer);
auto pos = property.find('=');
if (pos == String::npos || pos == 0 || pos == property.length())
throw Exception(fmt::format("Invalid property {} in restore file", property), ErrorCodes::UNKNOWN_FORMAT);
if (!buffer.hasPendingData())
return;
auto key = property.substr(0, pos);
auto value = property.substr(pos + 1);
readText(restore_information.source_path, buffer);
assertChar('\n', buffer);
auto it = properties.find(key);
if (it != properties.end())
throw Exception(fmt::format("Property key duplication {} in restore file", key), ErrorCodes::UNKNOWN_FORMAT);
if (buffer.hasPendingData())
throw Exception("Extra information at the end of restore file", ErrorCodes::UNKNOWN_FORMAT);
properties[key] = value;
}
for (const auto & [key, value] : properties)
{
ReadBufferFromString value_buffer (value);
if (key == "revision")
readIntText(restore_information.revision, value_buffer);
else if (key == "source_bucket")
readText(restore_information.source_bucket, value_buffer);
else if (key == "source_path")
readText(restore_information.source_path, value_buffer);
else if (key == "detached")
readBoolTextWord(restore_information.detached, value_buffer);
else
throw Exception(fmt::format("Unknown key {} in restore file", key), ErrorCodes::UNKNOWN_FORMAT);
}
}
catch (const Exception & e)
catch (const Exception &)
{
throw Exception("Failed to read restore information", e, ErrorCodes::UNKNOWN_FORMAT);
tryLogCurrentException(log, "Failed to read restore information");
throw;
}
}
@ -1239,43 +1254,43 @@ void DiskS3::restore()
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
}
LOG_INFO(log, "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}",
name, information.revision, information.source_bucket, information.source_path);
if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION)
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}",
name, information.revision, information.source_bucket, information.source_path);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Removing old metadata...");
LOG_INFO(log, "Removing old metadata...");
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path;
for (const auto & root : data_roots)
if (exists(root))
removeSharedRecursive(root + '/', !cleanup_s3);
restoreFiles(information.source_bucket, information.source_path, information.revision);
restoreFileOperations(information.source_bucket, information.source_path, information.revision);
restoreFiles(information);
restoreFileOperations(information);
Poco::File restore_file(metadata_path + RESTORE_FILE_NAME);
restore_file.remove();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name);
LOG_INFO(log, "Restore disk {} finished", name);
}
catch (const Exception & e)
catch (const Exception &)
{
LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to restore disk. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
tryLogCurrentException(log, fmt::format("Failed to restore disk {}", name));
throw;
}
}
void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision)
void DiskS3::restoreFiles(const RestoreInformation & restore_information)
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore files for disk {}", name);
LOG_INFO(log, "Starting restore files for disk {}", name);
std::vector<std::future<void>> results;
listObjects(source_bucket, source_path, [this, &source_bucket, &source_path, &target_revision, &results](auto list_result)
auto restore_files = [this, &restore_information, &results](auto list_result)
{
std::vector<String> keys;
for (const auto & row : list_result.GetContents())
@ -1288,7 +1303,7 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
const auto [revision, _] = extractRevisionAndOperationFromKey(key);
/// Filter early if it's possible to get revision from key.
if (revision > target_revision)
if (revision > restore_information.revision)
continue;
keys.push_back(key);
@ -1296,23 +1311,26 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
if (!keys.empty())
{
auto result = getExecutor().execute([this, &source_bucket, &source_path, keys]()
auto result = getExecutor().execute([this, &restore_information, keys]()
{
processRestoreFiles(source_bucket, source_path, keys);
processRestoreFiles(restore_information.source_bucket, restore_information.source_path, keys);
});
results.push_back(std::move(result));
}
return true;
});
};
/// Execute.
listObjects(restore_information.source_bucket, restore_information.source_path, restore_files);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Files are restored for disk {}", name);
LOG_INFO(log, "Files are restored for disk {}", name);
}
void DiskS3::processRestoreFiles(const String & source_bucket, const String & source_path, Strings keys)
@ -1327,7 +1345,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
if (path_entry == object_metadata.end())
{
/// Such keys can remain after migration, we can skip them.
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' in metadata", key);
LOG_WARNING(log, "Skip key {} because it doesn't have 'path' in metadata", key);
continue;
}
@ -1344,18 +1362,19 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored file {}", path);
LOG_DEBUG(log, "Restored file {}", path);
}
}
void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision)
void DiskS3::restoreFileOperations(const RestoreInformation & restore_information)
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name);
LOG_INFO(log, "Starting restore file operations for disk {}", name);
/// Enable recording file operations if we restore to different bucket / path.
send_metadata = bucket != source_bucket || s3_root_path != source_path;
send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path;
listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision](auto list_result)
std::set<String> renames;
auto restore_file_operations = [this, &restore_information, &renames](auto list_result)
{
const String rename = "rename";
const String hardlink = "hardlink";
@ -1367,20 +1386,20 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
const auto [revision, operation] = extractRevisionAndOperationFromKey(key);
if (revision == UNKNOWN_REVISION)
{
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} with unknown revision", key);
LOG_WARNING(log, "Skip key {} with unknown revision", key);
continue;
}
/// S3 ensures that keys will be listed in ascending UTF-8 bytes order (revision order).
/// We can stop processing if revision of the object is already more than required.
if (revision > target_revision)
if (revision > restore_information.revision)
return false;
/// Keep original revision if restore to different bucket / path.
if (send_metadata)
revision_counter = revision - 1;
auto object_metadata = headObject(source_bucket, key).GetMetadata();
auto object_metadata = headObject(restore_information.source_bucket, key).GetMetadata();
if (operation == rename)
{
auto from_path = object_metadata["from_path"];
@ -1388,7 +1407,23 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
if (exists(from_path))
{
moveFile(from_path, to_path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
if (restore_information.detached && isDirectory(to_path))
{
/// Sometimes directory paths are passed without trailing '/'. We should keep them in one consistent way.
if (!from_path.ends_with('/'))
from_path += '/';
if (!to_path.ends_with('/'))
to_path += '/';
/// Always keep latest actual directory path to avoid 'detaching' not existing paths.
auto it = renames.find(from_path);
if (it != renames.end())
renames.erase(it);
renames.insert(to_path);
}
}
}
else if (operation == hardlink)
@ -1399,27 +1434,55 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
{
createDirectories(directoryPath(dst_path));
createHardLink(src_path, dst_path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
LOG_DEBUG(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
}
}
}
return true;
});
};
/// Execute.
listObjects(restore_information.source_bucket, restore_information.source_path + "operations/", restore_file_operations);
if (restore_information.detached)
{
Strings not_finished_prefixes{"tmp_", "delete_tmp_", "attaching_", "deleting_"};
for (const auto & path : renames)
{
/// Skip already detached parts.
if (path.find("/detached/") != std::string::npos)
continue;
/// Skip not finished parts. They shouldn't be in 'detached' directory, because CH wouldn't be able to finish processing them.
Poco::Path directory_path (path);
auto directory_name = directory_path.directory(directory_path.depth() - 1);
auto predicate = [&directory_name](String & prefix) { return directory_name.starts_with(prefix); };
if (std::any_of(not_finished_prefixes.begin(), not_finished_prefixes.end(), predicate))
continue;
auto detached_path = pathToDetached(path);
LOG_DEBUG(log, "Move directory to 'detached' {} -> {}", path, detached_path);
Poco::File(metadata_path + path).moveTo(metadata_path + detached_path);
}
}
send_metadata = true;
LOG_INFO(&Poco::Logger::get("DiskS3"), "File operations restored for disk {}", name);
LOG_INFO(log, "File operations restored for disk {}", name);
}
std::tuple<UInt64, String> DiskS3::extractRevisionAndOperationFromKey(const String & key)
{
UInt64 revision = UNKNOWN_REVISION;
String revision_str;
String operation;
re2::RE2::FullMatch(key, key_regexp, &revision, &operation);
re2::RE2::FullMatch(key, key_regexp, &revision_str, &operation);
return {revision, operation};
return {(revision_str.empty() ? UNKNOWN_REVISION : static_cast<UInt64>(std::bitset<64>(revision_str).to_ullong())), operation};
}
String DiskS3::shrinkKey(const String & path, const String & key)
@ -1432,15 +1495,12 @@ String DiskS3::shrinkKey(const String & path, const String & key)
String DiskS3::revisionToString(UInt64 revision)
{
static constexpr size_t max_digits = 19; /// UInt64 max digits in decimal representation.
return std::bitset<64>(revision).to_string();
}
/// Align revision number with leading zeroes to have strict lexicographical order of them.
auto revision_str = std::to_string(revision);
auto digits_to_align = max_digits - revision_str.length();
for (size_t i = 0; i < digits_to_align; ++i)
revision_str = "0" + revision_str;
return revision_str;
String DiskS3::pathToDetached(const String & source_path)
{
return Poco::Path(source_path).parent().append(Poco::Path("detached")).toString() + '/';
}
void DiskS3::onFreeze(const String & path)

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <common/logger_useful.h>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
#include "ProxyConfiguration.h"
@ -148,6 +149,7 @@ private:
Metadata createMeta(const String & path) const;
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
/// Converts revision to binary string with leading zeroes (64 bit).
static String revisionToString(UInt64 revision);
bool checkObjectExists(const String & source_bucket, const String & prefix);
@ -165,15 +167,18 @@ private:
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision);
void restoreFiles(const RestoreInformation & restore_information);
void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector<String> keys);
void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision);
void restoreFileOperations(const RestoreInformation & restore_information);
/// Remove 'path' prefix from 'key' to get relative key.
/// It's needed to store keys to metadata files in RELATIVE_PATHS version.
static String shrinkKey(const String & path, const String & key);
std::tuple<UInt64, String> extractRevisionAndOperationFromKey(const String & key);
/// Forms detached path '../../detached/part_name/' from '../../part_name/'
static String pathToDetached(const String & source_path);
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration;
@ -207,6 +212,8 @@ private:
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
/// Directories with data.
const std::vector<String> data_roots {"data", "store"};
Poco::Logger * log = &Poco::Logger::get("DiskS3");
};
}

View File

@ -1124,23 +1124,28 @@ class ClickHouseInstance:
return self.http_query(sql=sql, data=data, params=params, user=user, password=password,
expect_fail_and_get_error=True)
def stop_clickhouse(self, start_wait_sec=5, kill=False):
def stop_clickhouse(self, stop_wait_sec=30, kill=False):
if not self.stay_alive:
raise Exception("clickhouse can be stopped only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
time.sleep(start_wait_sec)
deadline = time.time() + stop_wait_sec
while time.time() < deadline:
time.sleep(0.5)
if self.get_process_pid("clickhouse") is None:
break
assert self.get_process_pid("clickhouse") is None, "ClickHouse was not stopped"
def start_clickhouse(self, stop_wait_sec=5):
def start_clickhouse(self, start_wait_sec=30):
if not self.stay_alive:
raise Exception("clickhouse can be started again only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
# wait start
from helpers.test_tools import assert_eq_with_retry
assert_eq_with_retry(self, "select 1", "1", retry_count=int(stop_wait_sec / 0.5), sleep_time=0.5)
assert_eq_with_retry(self, "select 1", "1", retry_count=int(start_wait_sec / 0.5), sleep_time=0.5)
def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
def restart_clickhouse(self, stop_start_wait_sec=30, kill=False):
self.stop_clickhouse(stop_start_wait_sec, kill)
self.start_clickhouse(stop_start_wait_sec)

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<node>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node</host>
<port>9000</port>
</replica>
</shard>
</node>
<node_another_bucket>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_another_bucket</host>
<port>9000</port>
</replica>
</shard>
</node_another_bucket>
</remote_servers>
</yandex>

View File

@ -7,20 +7,21 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml')
NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml')
COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/log_conf.xml", "configs/config.d/clusters.xml"]
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config = open(NOT_RESTORABLE_CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config = open(NOT_RESTORABLE_CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
@ -29,22 +30,22 @@ def replace_config(old, new):
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], with_minio=True, stay_alive=True)
cluster.add_instance("node_another_bucket", main_configs=[
"configs/config.d/storage_conf_another_bucket.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
cluster.add_instance("node_another_bucket_path", main_configs=[
"configs/config.d/storage_conf_another_bucket_path.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
cluster.add_instance("node_not_restorable", main_configs=[
"configs/config.d/storage_conf_not_restorable.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
cluster.add_instance("node",
main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf.xml"],
macros={"cluster": "node", "replica": "0"},
with_minio=True, with_zookeeper=True, stay_alive=True)
cluster.add_instance("node_another_bucket",
main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_another_bucket.xml"],
macros={"cluster": "node_another_bucket", "replica": "0"},
with_zookeeper=True, stay_alive=True)
cluster.add_instance("node_another_bucket_path",
main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_another_bucket_path.xml"],
stay_alive=True)
cluster.add_instance("node_not_restorable",
main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_not_restorable.xml"],
stay_alive=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -65,28 +66,26 @@ def generate_values(date_str, count, sign=1):
return ",".join(["('{}',{},'{}',{})".format(x, y, z, 0) for x, y, z in data])
def create_table(node, table_name, additional_settings=None):
def create_table(node, table_name, replicated=False):
node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = Ordinary")
create_table_statement = """
CREATE TABLE s3.{} (
CREATE TABLE s3.{table_name} {on_cluster} (
dt Date,
id Int64,
data String,
counter Int64,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
) ENGINE={engine}
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
storage_policy='s3',
old_parts_lifetime=600,
index_granularity=512
""".format(table_name)
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
""".format(table_name=table_name,
on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "",
engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()")
node.query(create_table_statement)
@ -107,17 +106,23 @@ def drop_shadow_information(node):
node.exec_in_container(['bash', '-c', 'rm -rf /var/lib/clickhouse/shadow/*'], user='root')
def create_restore_file(node, revision=0, bucket=None, path=None):
add_restore_option = 'echo -en "{}\n" >> /var/lib/clickhouse/disks/s3/restore'
node.exec_in_container(['bash', '-c', add_restore_option.format(revision)], user='root')
def create_restore_file(node, revision=None, bucket=None, path=None, detached=None):
node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root')
add_restore_option = 'echo -en "{}={}\n" >> /var/lib/clickhouse/disks/s3/restore'
if revision:
node.exec_in_container(['bash', '-c', add_restore_option.format('revision', revision)], user='root')
if bucket:
node.exec_in_container(['bash', '-c', add_restore_option.format(bucket)], user='root')
node.exec_in_container(['bash', '-c', add_restore_option.format('source_bucket', bucket)], user='root')
if path:
node.exec_in_container(['bash', '-c', add_restore_option.format(path)], user='root')
node.exec_in_container(['bash', '-c', add_restore_option.format('source_path', path)], user='root')
if detached:
node.exec_in_container(['bash', '-c', add_restore_option.format('detached', 'true')], user='root')
def get_revision_counter(node, backup_number):
return int(node.exec_in_container(['bash', '-c', 'cat /var/lib/clickhouse/disks/s3/shadow/{}/revision.txt'.format(backup_number)], user='root'))
return int(node.exec_in_container(
['bash', '-c', 'cat /var/lib/clickhouse/disks/s3/shadow/{}/revision.txt'.format(backup_number)], user='root'))
@pytest.fixture(autouse=True)
@ -128,7 +133,8 @@ def drop_table(cluster):
for node_name in node_names:
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3.test NO DELAY")
node.query("DROP TABLE IF EXISTS s3.test SYNC")
node.query("DROP DATABASE IF EXISTS s3 SYNC")
drop_s3_metadata(node)
drop_shadow_information(node)
@ -138,32 +144,23 @@ def drop_table(cluster):
purge_s3(cluster, bucket)
def test_full_restore(cluster):
@pytest.mark.parametrize(
"replicated", [False, True]
)
def test_full_restore(cluster, replicated):
node = cluster.instances["node"]
create_table(node, "test")
create_table(node, "test", replicated)
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
# To ensure parts have merged
node.query("OPTIMIZE TABLE s3.test")
assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
node.stop_clickhouse()
drop_s3_metadata(node)
node.start_clickhouse()
# All data is removed.
assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0)
node.stop_clickhouse()
create_restore_file(node)
node.start_clickhouse(10)
node.start_clickhouse()
assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -191,7 +188,7 @@ def test_restore_another_bucket_path(cluster):
node_another_bucket.stop_clickhouse()
create_restore_file(node_another_bucket, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -202,7 +199,7 @@ def test_restore_another_bucket_path(cluster):
node_another_bucket_path.stop_clickhouse()
create_restore_file(node_another_bucket_path, bucket="root2", path="data")
node_another_bucket_path.start_clickhouse(10)
node_another_bucket_path.start_clickhouse()
assert node_another_bucket_path.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket_path.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -244,7 +241,7 @@ def test_restore_different_revisions(cluster):
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision1, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -255,7 +252,7 @@ def test_restore_different_revisions(cluster):
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision2, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -266,7 +263,7 @@ def test_restore_different_revisions(cluster):
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision3, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -298,7 +295,7 @@ def test_restore_mutations(cluster):
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision_before_mutation, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -309,7 +306,7 @@ def test_restore_mutations(cluster):
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision_after_mutation, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@ -323,7 +320,7 @@ def test_restore_mutations(cluster):
purge_s3(cluster, cluster.minio_bucket_2)
revision = (revision_before_mutation + revision_after_mutation) // 2
create_restore_file(node_another_bucket, revision=revision, bucket="root")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
# Wait for unfinished mutation completion.
time.sleep(3)
@ -365,7 +362,57 @@ def test_migrate_to_restorable_schema(cluster):
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data")
node_another_bucket.start_clickhouse(10)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
@pytest.mark.parametrize(
"replicated", [False, True]
)
def test_restore_to_detached(cluster, replicated):
node = cluster.instances["node"]
create_table(node, "test", replicated)
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-07', 4096, 0)))
# Add some mutation.
node.query("ALTER TABLE s3.test UPDATE counter = 1 WHERE 1", settings={"mutations_sync": 2})
# Detach some partition.
node.query("ALTER TABLE s3.test DETACH PARTITION '2020-01-07'")
node.query("ALTER TABLE s3.test FREEZE")
revision = get_revision_counter(node, 1)
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test", replicated)
node_another_bucket.stop_clickhouse()
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True)
node_another_bucket.start_clickhouse()
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0)
node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-03'")
node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-04'")
node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-05'")
node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-06'")
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
# Attach partition that was already detached before backup-restore.
node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-07'")
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 5)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 5)