diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index f101de340f1..95034b8e107 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -1,7 +1,6 @@ #include "DiskCacheWrapper.h" #include #include -#include #include namespace DB @@ -114,7 +113,7 @@ DiskCacheWrapper::readFile( if (!cache_file_predicate(path)) return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache); - LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Read file {} from cache", backQuote(path)); + LOG_DEBUG(log, "Read file {} from cache", backQuote(path)); if (cache_disk->exists(path)) return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache); @@ -128,11 +127,11 @@ DiskCacheWrapper::readFile( { /// This thread will responsible for file downloading to cache. metadata->status = DOWNLOADING; - LOG_DEBUG(&Poco::Logger::get("DiskCache"), "File {} doesn't exist in cache. Will download it", backQuote(path)); + LOG_DEBUG(log, "File {} doesn't exist in cache. Will download it", backQuote(path)); } else if (metadata->status == DOWNLOADING) { - LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Waiting for file {} download to cache", backQuote(path)); + LOG_DEBUG(log, "Waiting for file {} download to cache", backQuote(path)); metadata->condition.wait(lock, [metadata] { return metadata->status == DOWNLOADED || metadata->status == ERROR; }); } } @@ -157,7 +156,7 @@ DiskCacheWrapper::readFile( } cache_disk->moveFile(tmp_path, path); - LOG_DEBUG(&Poco::Logger::get("DiskCache"), "File {} downloaded to cache", backQuote(path)); + LOG_DEBUG(log, "File {} downloaded to cache", backQuote(path)); } catch (...) { @@ -186,7 +185,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode if (!cache_file_predicate(path)) return DiskDecorator::writeFile(path, buf_size, mode); - LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Write file {} to cache", backQuote(path)); + LOG_DEBUG(log, "Write file {} to cache", backQuote(path)); auto dir_path = directoryPath(path); if (!cache_disk->exists(dir_path)) diff --git a/src/Disks/DiskCacheWrapper.h b/src/Disks/DiskCacheWrapper.h index 7e627b0c3c3..6d58394640f 100644 --- a/src/Disks/DiskCacheWrapper.h +++ b/src/Disks/DiskCacheWrapper.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "DiskDecorator.h" #include "DiskLocal.h" @@ -63,6 +64,8 @@ private: mutable std::unordered_map> file_downloads; /// Protects concurrent downloading files to cache. mutable std::mutex mutex; + + Poco::Logger * log = &Poco::Logger::get("DiskCache"); }; } diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 9d2bad0373d..d0cf6a00344 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -2,13 +2,13 @@ #include #include "DiskFactory.h" +#include #include #include #include -#include #include -#include + #include @@ -96,7 +96,7 @@ bool DiskLocal::tryReserve(UInt64 bytes) std::lock_guard lock(DiskLocal::reservation_mutex); if (bytes == 0) { - LOG_DEBUG(&Poco::Logger::get("DiskLocal"), "Reserving 0 bytes on disk {}", backQuote(name)); + LOG_DEBUG(log, "Reserving 0 bytes on disk {}", backQuote(name)); ++reservation_count; return true; } @@ -105,7 +105,7 @@ bool DiskLocal::tryReserve(UInt64 bytes) UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); if (unreserved_space >= bytes) { - LOG_DEBUG(&Poco::Logger::get("DiskLocal"), "Reserving {} on disk {}, having unreserved {}.", + LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.", ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); ++reservation_count; reserved_bytes += bytes; @@ -339,7 +339,7 @@ DiskLocalReservation::~DiskLocalReservation() if (disk->reserved_bytes < size) { disk->reserved_bytes = 0; - LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName()); + LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName()); } else { @@ -347,7 +347,7 @@ DiskLocalReservation::~DiskLocalReservation() } if (disk->reservation_count == 0) - LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName()); + LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName()); else --disk->reservation_count; } diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index d957fc6f847..567ca24eb50 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -115,6 +116,8 @@ private: UInt64 reservation_count = 0; static std::mutex reservation_mutex; + + Poco::Logger * log = &Poco::Logger::get("DiskLocal"); }; } diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 1de4ab843ac..5a2f938fcd9 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -2,9 +2,11 @@ #include "Disks/DiskFactory.h" +#include #include #include #include +#include #include #include #include @@ -18,7 +20,6 @@ #include #include #include -#include #include #include @@ -491,7 +492,7 @@ public: if (disk->reserved_bytes < size) { disk->reserved_bytes = 0; - LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservations size for disk '{}'.", disk->getName()); + LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName()); } else { @@ -499,7 +500,7 @@ public: } if (disk->reservation_count == 0) - LOG_ERROR(&Poco::Logger::get("DiskLocal"), "Unbalanced reservation count for disk '{}'.", disk->getName()); + LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName()); else --disk->reservation_count; } @@ -535,7 +536,7 @@ public: } catch (...) { - tryLogCurrentException(&Poco::Logger::get("DiskS3"), "Failed to run async task"); + tryLogCurrentException("DiskS3", "Failed to run async task"); try { @@ -675,7 +676,7 @@ std::unique_ptr DiskS3::readFile(const String & path, si { auto metadata = readMeta(path); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}", + LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}", backQuote(metadata_path + path), metadata.s3_objects.size()); auto reader = std::make_unique(client, bucket, metadata, buf_size); @@ -711,7 +712,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, /// Save empty metadata to disk to have ability to get file size while buffer is not finalized. metadata.save(); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path); + LOG_DEBUG(log, "Write to file by path: {}. New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path); return std::make_unique( client, bucket, metadata, s3_path, object_metadata, min_upload_part_size, max_single_part_upload_size, buf_size); @@ -720,7 +721,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, { auto metadata = readMeta(path); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.", + LOG_DEBUG(log, "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.", backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size()); return std::make_unique( @@ -730,7 +731,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) { - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path)); + LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path)); Poco::File file(metadata_path + path); @@ -762,7 +763,7 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) if (e.code() == ErrorCodes::UNKNOWN_FORMAT) { LOG_WARNING( - &Poco::Logger::get("DiskS3"), + log, "Metadata file {} can't be read by reason: {}. Removing it forcibly.", backQuote(path), e.nested() ? e.nested()->message() : e.message()); @@ -846,7 +847,7 @@ bool DiskS3::tryReserve(UInt64 bytes) std::lock_guard lock(reservation_mutex); if (bytes == 0) { - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Reserving 0 bytes on s3 disk {}", backQuote(name)); + LOG_DEBUG(log, "Reserving 0 bytes on s3 disk {}", backQuote(name)); ++reservation_count; return true; } @@ -855,7 +856,7 @@ bool DiskS3::tryReserve(UInt64 bytes) UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); if (unreserved_space >= bytes) { - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Reserving {} on disk {}, having unreserved {}.", + LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.", ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); ++reservation_count; reserved_bytes += bytes; @@ -940,40 +941,36 @@ void DiskS3::startup() if (!send_metadata) return; - LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name); + LOG_INFO(log, "Starting up disk {}", name); if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION) migrateToRestorableSchema(); findLastRevision(); - LOG_INFO(&Poco::Logger::get("DiskS3"), "Disk {} started up", name); + LOG_INFO(log, "Disk {} started up", name); } void DiskS3::findLastRevision() { - UInt64 l = 0, r = LATEST_REVISION; - while (l < r) + /// Construct revision number from high to low bits. + String revision; + revision.reserve(64); + for (int bit = 0; bit < 64; bit++) { - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check revision in bounds {}-{}", l, r); + auto revision_prefix = revision + "1"; - auto revision = l + (r - l + 1) / 2; - if (revision == 0) - break; + LOG_DEBUG(log, "Check object exists with revision prefix {}", revision_prefix); - auto revision_str = revisionToString(revision); - - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision); - - /// Check file or operation with such revision exists. - if (checkObjectExists(bucket, s3_root_path + "r" + revision_str) - || checkObjectExists(bucket, s3_root_path + "operations/r" + revision_str)) - l = revision; + /// Check file or operation with such revision prefix exists. + if (checkObjectExists(bucket, s3_root_path + "r" + revision_prefix) + || checkObjectExists(bucket, s3_root_path + "operations/r" + revision_prefix)) + revision += "1"; else - r = revision - 1; + revision += "0"; } - revision_counter = l; - LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name); + revision_counter = static_cast(std::bitset<64>(revision).to_ullong()); + LOG_INFO(log, "Found last revision number {} for disk {}", revision_counter, name); } int DiskS3::readSchemaVersion(const String & source_bucket, const String & source_path) @@ -1010,7 +1007,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met void DiskS3::migrateFileToRestorableSchema(const String & path) { - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate file {} to restorable schema", metadata_path + path); + LOG_DEBUG(log, "Migrate file {} to restorable schema", metadata_path + path); auto meta = readMeta(path); @@ -1027,7 +1024,7 @@ void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & r { checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate directory {} to restorable schema", metadata_path + path); + LOG_DEBUG(log, "Migrate directory {} to restorable schema", metadata_path + path); bool dir_contains_only_files = true; for (auto it = iterateDirectory(path); it->isValid(); it->next()) @@ -1070,7 +1067,7 @@ void DiskS3::migrateToRestorableSchema() { try { - LOG_INFO(&Poco::Logger::get("DiskS3"), "Start migration to restorable schema for disk {}", name); + LOG_INFO(log, "Start migration to restorable schema for disk {}", name); Futures results; @@ -1085,9 +1082,9 @@ void DiskS3::migrateToRestorableSchema() saveSchemaVersion(RESTORABLE_SCHEMA_VERSION); } - catch (const Exception & e) + catch (const Exception &) { - LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to migrate to restorable schema. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString()); + tryLogCurrentException(log, fmt::format("Failed to migrate to restorable schema for disk {}", name)); throw; } @@ -1173,6 +1170,7 @@ struct DiskS3::RestoreInformation UInt64 revision = LATEST_REVISION; String source_bucket; String source_path; + bool detached = false; }; void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information) @@ -1180,33 +1178,50 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512); buffer.next(); - /// Empty file - just restore all metadata. - if (!buffer.hasPendingData()) - return; - try { - readIntText(restore_information.revision, buffer); - assertChar('\n', buffer); + std::map properties; - if (!buffer.hasPendingData()) - return; + while (buffer.hasPendingData()) + { + String property; + readText(property, buffer); + assertChar('\n', buffer); - readText(restore_information.source_bucket, buffer); - assertChar('\n', buffer); + auto pos = property.find('='); + if (pos == String::npos || pos == 0 || pos == property.length()) + throw Exception(fmt::format("Invalid property {} in restore file", property), ErrorCodes::UNKNOWN_FORMAT); - if (!buffer.hasPendingData()) - return; + auto key = property.substr(0, pos); + auto value = property.substr(pos + 1); - readText(restore_information.source_path, buffer); - assertChar('\n', buffer); + auto it = properties.find(key); + if (it != properties.end()) + throw Exception(fmt::format("Property key duplication {} in restore file", key), ErrorCodes::UNKNOWN_FORMAT); - if (buffer.hasPendingData()) - throw Exception("Extra information at the end of restore file", ErrorCodes::UNKNOWN_FORMAT); + properties[key] = value; + } + + for (const auto & [key, value] : properties) + { + ReadBufferFromString value_buffer (value); + + if (key == "revision") + readIntText(restore_information.revision, value_buffer); + else if (key == "source_bucket") + readText(restore_information.source_bucket, value_buffer); + else if (key == "source_path") + readText(restore_information.source_path, value_buffer); + else if (key == "detached") + readBoolTextWord(restore_information.detached, value_buffer); + else + throw Exception(fmt::format("Unknown key {} in restore file", key), ErrorCodes::UNKNOWN_FORMAT); + } } - catch (const Exception & e) + catch (const Exception &) { - throw Exception("Failed to read restore information", e, ErrorCodes::UNKNOWN_FORMAT); + tryLogCurrentException(log, "Failed to read restore information"); + throw; } } @@ -1239,43 +1254,43 @@ void DiskS3::restore() throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS); } + LOG_INFO(log, "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}", + name, information.revision, information.source_bucket, information.source_path); + if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION) throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS); - LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}", - name, information.revision, information.source_bucket, information.source_path); - - LOG_INFO(&Poco::Logger::get("DiskS3"), "Removing old metadata..."); + LOG_INFO(log, "Removing old metadata..."); bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path; for (const auto & root : data_roots) if (exists(root)) removeSharedRecursive(root + '/', !cleanup_s3); - restoreFiles(information.source_bucket, information.source_path, information.revision); - restoreFileOperations(information.source_bucket, information.source_path, information.revision); + restoreFiles(information); + restoreFileOperations(information); Poco::File restore_file(metadata_path + RESTORE_FILE_NAME); restore_file.remove(); saveSchemaVersion(RESTORABLE_SCHEMA_VERSION); - LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name); + LOG_INFO(log, "Restore disk {} finished", name); } - catch (const Exception & e) + catch (const Exception &) { - LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to restore disk. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString()); + tryLogCurrentException(log, fmt::format("Failed to restore disk {}", name)); throw; } } -void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision) +void DiskS3::restoreFiles(const RestoreInformation & restore_information) { - LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore files for disk {}", name); + LOG_INFO(log, "Starting restore files for disk {}", name); std::vector> results; - listObjects(source_bucket, source_path, [this, &source_bucket, &source_path, &target_revision, &results](auto list_result) + auto restore_files = [this, &restore_information, &results](auto list_result) { std::vector keys; for (const auto & row : list_result.GetContents()) @@ -1288,7 +1303,7 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa const auto [revision, _] = extractRevisionAndOperationFromKey(key); /// Filter early if it's possible to get revision from key. - if (revision > target_revision) + if (revision > restore_information.revision) continue; keys.push_back(key); @@ -1296,23 +1311,26 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa if (!keys.empty()) { - auto result = getExecutor().execute([this, &source_bucket, &source_path, keys]() + auto result = getExecutor().execute([this, &restore_information, keys]() { - processRestoreFiles(source_bucket, source_path, keys); + processRestoreFiles(restore_information.source_bucket, restore_information.source_path, keys); }); results.push_back(std::move(result)); } return true; - }); + }; + + /// Execute. + listObjects(restore_information.source_bucket, restore_information.source_path, restore_files); for (auto & result : results) result.wait(); for (auto & result : results) result.get(); - LOG_INFO(&Poco::Logger::get("DiskS3"), "Files are restored for disk {}", name); + LOG_INFO(log, "Files are restored for disk {}", name); } void DiskS3::processRestoreFiles(const String & source_bucket, const String & source_path, Strings keys) @@ -1327,7 +1345,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so if (path_entry == object_metadata.end()) { /// Such keys can remain after migration, we can skip them. - LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' in metadata", key); + LOG_WARNING(log, "Skip key {} because it doesn't have 'path' in metadata", key); continue; } @@ -1344,18 +1362,19 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so metadata.addObject(relative_key, head_result.GetContentLength()); metadata.save(); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored file {}", path); + LOG_DEBUG(log, "Restored file {}", path); } } -void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision) +void DiskS3::restoreFileOperations(const RestoreInformation & restore_information) { - LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name); + LOG_INFO(log, "Starting restore file operations for disk {}", name); /// Enable recording file operations if we restore to different bucket / path. - send_metadata = bucket != source_bucket || s3_root_path != source_path; + send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path; - listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision](auto list_result) + std::set renames; + auto restore_file_operations = [this, &restore_information, &renames](auto list_result) { const String rename = "rename"; const String hardlink = "hardlink"; @@ -1367,20 +1386,20 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String & const auto [revision, operation] = extractRevisionAndOperationFromKey(key); if (revision == UNKNOWN_REVISION) { - LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} with unknown revision", key); + LOG_WARNING(log, "Skip key {} with unknown revision", key); continue; } /// S3 ensures that keys will be listed in ascending UTF-8 bytes order (revision order). /// We can stop processing if revision of the object is already more than required. - if (revision > target_revision) + if (revision > restore_information.revision) return false; /// Keep original revision if restore to different bucket / path. if (send_metadata) revision_counter = revision - 1; - auto object_metadata = headObject(source_bucket, key).GetMetadata(); + auto object_metadata = headObject(restore_information.source_bucket, key).GetMetadata(); if (operation == rename) { auto from_path = object_metadata["from_path"]; @@ -1388,7 +1407,23 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String & if (exists(from_path)) { moveFile(from_path, to_path); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Revision {}. Restored rename {} -> {}", revision, from_path, to_path); + LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path); + + if (restore_information.detached && isDirectory(to_path)) + { + /// Sometimes directory paths are passed without trailing '/'. We should keep them in one consistent way. + if (!from_path.ends_with('/')) + from_path += '/'; + if (!to_path.ends_with('/')) + to_path += '/'; + + /// Always keep latest actual directory path to avoid 'detaching' not existing paths. + auto it = renames.find(from_path); + if (it != renames.end()) + renames.erase(it); + + renames.insert(to_path); + } } } else if (operation == hardlink) @@ -1399,27 +1434,55 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String & { createDirectories(directoryPath(dst_path)); createHardLink(src_path, dst_path); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path); + LOG_DEBUG(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path); } } } return true; - }); + }; + + /// Execute. + listObjects(restore_information.source_bucket, restore_information.source_path + "operations/", restore_file_operations); + + if (restore_information.detached) + { + Strings not_finished_prefixes{"tmp_", "delete_tmp_", "attaching_", "deleting_"}; + + for (const auto & path : renames) + { + /// Skip already detached parts. + if (path.find("/detached/") != std::string::npos) + continue; + + /// Skip not finished parts. They shouldn't be in 'detached' directory, because CH wouldn't be able to finish processing them. + Poco::Path directory_path (path); + auto directory_name = directory_path.directory(directory_path.depth() - 1); + auto predicate = [&directory_name](String & prefix) { return directory_name.starts_with(prefix); }; + if (std::any_of(not_finished_prefixes.begin(), not_finished_prefixes.end(), predicate)) + continue; + + auto detached_path = pathToDetached(path); + + LOG_DEBUG(log, "Move directory to 'detached' {} -> {}", path, detached_path); + + Poco::File(metadata_path + path).moveTo(metadata_path + detached_path); + } + } send_metadata = true; - LOG_INFO(&Poco::Logger::get("DiskS3"), "File operations restored for disk {}", name); + LOG_INFO(log, "File operations restored for disk {}", name); } std::tuple DiskS3::extractRevisionAndOperationFromKey(const String & key) { - UInt64 revision = UNKNOWN_REVISION; + String revision_str; String operation; - re2::RE2::FullMatch(key, key_regexp, &revision, &operation); + re2::RE2::FullMatch(key, key_regexp, &revision_str, &operation); - return {revision, operation}; + return {(revision_str.empty() ? UNKNOWN_REVISION : static_cast(std::bitset<64>(revision_str).to_ullong())), operation}; } String DiskS3::shrinkKey(const String & path, const String & key) @@ -1432,15 +1495,12 @@ String DiskS3::shrinkKey(const String & path, const String & key) String DiskS3::revisionToString(UInt64 revision) { - static constexpr size_t max_digits = 19; /// UInt64 max digits in decimal representation. + return std::bitset<64>(revision).to_string(); +} - /// Align revision number with leading zeroes to have strict lexicographical order of them. - auto revision_str = std::to_string(revision); - auto digits_to_align = max_digits - revision_str.length(); - for (size_t i = 0; i < digits_to_align; ++i) - revision_str = "0" + revision_str; - - return revision_str; +String DiskS3::pathToDetached(const String & source_path) +{ + return Poco::Path(source_path).parent().append(Poco::Path("detached")).toString() + '/'; } void DiskS3::onFreeze(const String & path) diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index 87aab71fc44..9db0f47f55a 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "Disks/DiskFactory.h" #include "Disks/Executor.h" #include "ProxyConfiguration.h" @@ -148,6 +149,7 @@ private: Metadata createMeta(const String & path) const; void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata); + /// Converts revision to binary string with leading zeroes (64 bit). static String revisionToString(UInt64 revision); bool checkObjectExists(const String & source_bucket, const String & prefix); @@ -165,15 +167,18 @@ private: void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key); void readRestoreInformation(RestoreInformation & restore_information); - void restoreFiles(const String & source_bucket, const String & source_path, UInt64 target_revision); + void restoreFiles(const RestoreInformation & restore_information); void processRestoreFiles(const String & source_bucket, const String & source_path, std::vector keys); - void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 target_revision); + void restoreFileOperations(const RestoreInformation & restore_information); /// Remove 'path' prefix from 'key' to get relative key. /// It's needed to store keys to metadata files in RELATIVE_PATHS version. static String shrinkKey(const String & path, const String & key); std::tuple extractRevisionAndOperationFromKey(const String & key); + /// Forms detached path '../../detached/part_name/' from '../../part_name/' + static String pathToDetached(const String & source_path); + const String name; std::shared_ptr client; std::shared_ptr proxy_configuration; @@ -207,6 +212,8 @@ private: static constexpr int RESTORABLE_SCHEMA_VERSION = 1; /// Directories with data. const std::vector data_roots {"data", "store"}; + + Poco::Logger * log = &Poco::Logger::get("DiskS3"); }; } diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 3d85f494676..69a66a50b6d 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1124,23 +1124,28 @@ class ClickHouseInstance: return self.http_query(sql=sql, data=data, params=params, user=user, password=password, expect_fail_and_get_error=True) - def stop_clickhouse(self, start_wait_sec=5, kill=False): + def stop_clickhouse(self, stop_wait_sec=30, kill=False): if not self.stay_alive: raise Exception("clickhouse can be stopped only with stay_alive=True instance") self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root') - time.sleep(start_wait_sec) + deadline = time.time() + stop_wait_sec + while time.time() < deadline: + time.sleep(0.5) + if self.get_process_pid("clickhouse") is None: + break + assert self.get_process_pid("clickhouse") is None, "ClickHouse was not stopped" - def start_clickhouse(self, stop_wait_sec=5): + def start_clickhouse(self, start_wait_sec=30): if not self.stay_alive: raise Exception("clickhouse can be started again only with stay_alive=True instance") self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid())) # wait start from helpers.test_tools import assert_eq_with_retry - assert_eq_with_retry(self, "select 1", "1", retry_count=int(stop_wait_sec / 0.5), sleep_time=0.5) + assert_eq_with_retry(self, "select 1", "1", retry_count=int(start_wait_sec / 0.5), sleep_time=0.5) - def restart_clickhouse(self, stop_start_wait_sec=5, kill=False): + def restart_clickhouse(self, stop_start_wait_sec=30, kill=False): self.stop_clickhouse(stop_start_wait_sec, kill) self.start_clickhouse(stop_start_wait_sec) diff --git a/tests/integration/test_merge_tree_s3_restore/configs/config.d/clusters.xml b/tests/integration/test_merge_tree_s3_restore/configs/config.d/clusters.xml new file mode 100644 index 00000000000..4808ae4bc4a --- /dev/null +++ b/tests/integration/test_merge_tree_s3_restore/configs/config.d/clusters.xml @@ -0,0 +1,23 @@ + + + + + + true + + node + 9000 + + + + + + true + + node_another_bucket + 9000 + + + + + diff --git a/tests/integration/test_merge_tree_s3_restore/test.py b/tests/integration/test_merge_tree_s3_restore/test.py index c0ebce68480..0781f0b9ce9 100644 --- a/tests/integration/test_merge_tree_s3_restore/test.py +++ b/tests/integration/test_merge_tree_s3_restore/test.py @@ -7,20 +7,21 @@ import time import pytest from helpers.cluster import ClickHouseCluster + logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) - SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml') +NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml') +COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/log_conf.xml", "configs/config.d/clusters.xml"] def replace_config(old, new): - config = open(CONFIG_PATH, 'r') + config = open(NOT_RESTORABLE_CONFIG_PATH, 'r') config_lines = config.readlines() config.close() config_lines = [line.replace(old, new) for line in config_lines] - config = open(CONFIG_PATH, 'w') + config = open(NOT_RESTORABLE_CONFIG_PATH, 'w') config.writelines(config_lines) config.close() @@ -29,22 +30,22 @@ def replace_config(old, new): def cluster(): try: cluster = ClickHouseCluster(__file__) - cluster.add_instance("node", main_configs=[ - "configs/config.d/storage_conf.xml", - "configs/config.d/bg_processing_pool_conf.xml", - "configs/config.d/log_conf.xml"], user_configs=[], with_minio=True, stay_alive=True) - cluster.add_instance("node_another_bucket", main_configs=[ - "configs/config.d/storage_conf_another_bucket.xml", - "configs/config.d/bg_processing_pool_conf.xml", - "configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True) - cluster.add_instance("node_another_bucket_path", main_configs=[ - "configs/config.d/storage_conf_another_bucket_path.xml", - "configs/config.d/bg_processing_pool_conf.xml", - "configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True) - cluster.add_instance("node_not_restorable", main_configs=[ - "configs/config.d/storage_conf_not_restorable.xml", - "configs/config.d/bg_processing_pool_conf.xml", - "configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True) + + cluster.add_instance("node", + main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf.xml"], + macros={"cluster": "node", "replica": "0"}, + with_minio=True, with_zookeeper=True, stay_alive=True) + cluster.add_instance("node_another_bucket", + main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_another_bucket.xml"], + macros={"cluster": "node_another_bucket", "replica": "0"}, + with_zookeeper=True, stay_alive=True) + cluster.add_instance("node_another_bucket_path", + main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_another_bucket_path.xml"], + stay_alive=True) + cluster.add_instance("node_not_restorable", + main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_not_restorable.xml"], + stay_alive=True) + logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") @@ -65,28 +66,26 @@ def generate_values(date_str, count, sign=1): return ",".join(["('{}',{},'{}',{})".format(x, y, z, 0) for x, y, z in data]) -def create_table(node, table_name, additional_settings=None): +def create_table(node, table_name, replicated=False): node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = Ordinary") create_table_statement = """ - CREATE TABLE s3.{} ( + CREATE TABLE s3.{table_name} {on_cluster} ( dt Date, id Int64, data String, counter Int64, INDEX min_max (id) TYPE minmax GRANULARITY 3 - ) ENGINE=MergeTree() + ) ENGINE={engine} PARTITION BY dt ORDER BY (dt, id) SETTINGS storage_policy='s3', old_parts_lifetime=600, index_granularity=512 - """.format(table_name) - - if additional_settings: - create_table_statement += "," - create_table_statement += additional_settings + """.format(table_name=table_name, + on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "", + engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()") node.query(create_table_statement) @@ -107,17 +106,23 @@ def drop_shadow_information(node): node.exec_in_container(['bash', '-c', 'rm -rf /var/lib/clickhouse/shadow/*'], user='root') -def create_restore_file(node, revision=0, bucket=None, path=None): - add_restore_option = 'echo -en "{}\n" >> /var/lib/clickhouse/disks/s3/restore' - node.exec_in_container(['bash', '-c', add_restore_option.format(revision)], user='root') +def create_restore_file(node, revision=None, bucket=None, path=None, detached=None): + node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root') + + add_restore_option = 'echo -en "{}={}\n" >> /var/lib/clickhouse/disks/s3/restore' + if revision: + node.exec_in_container(['bash', '-c', add_restore_option.format('revision', revision)], user='root') if bucket: - node.exec_in_container(['bash', '-c', add_restore_option.format(bucket)], user='root') + node.exec_in_container(['bash', '-c', add_restore_option.format('source_bucket', bucket)], user='root') if path: - node.exec_in_container(['bash', '-c', add_restore_option.format(path)], user='root') + node.exec_in_container(['bash', '-c', add_restore_option.format('source_path', path)], user='root') + if detached: + node.exec_in_container(['bash', '-c', add_restore_option.format('detached', 'true')], user='root') def get_revision_counter(node, backup_number): - return int(node.exec_in_container(['bash', '-c', 'cat /var/lib/clickhouse/disks/s3/shadow/{}/revision.txt'.format(backup_number)], user='root')) + return int(node.exec_in_container( + ['bash', '-c', 'cat /var/lib/clickhouse/disks/s3/shadow/{}/revision.txt'.format(backup_number)], user='root')) @pytest.fixture(autouse=True) @@ -128,7 +133,8 @@ def drop_table(cluster): for node_name in node_names: node = cluster.instances[node_name] - node.query("DROP TABLE IF EXISTS s3.test NO DELAY") + node.query("DROP TABLE IF EXISTS s3.test SYNC") + node.query("DROP DATABASE IF EXISTS s3 SYNC") drop_s3_metadata(node) drop_shadow_information(node) @@ -138,32 +144,23 @@ def drop_table(cluster): purge_s3(cluster, bucket) -def test_full_restore(cluster): +@pytest.mark.parametrize( + "replicated", [False, True] +) +def test_full_restore(cluster, replicated): node = cluster.instances["node"] - create_table(node, "test") + create_table(node, "test", replicated) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1))) - # To ensure parts have merged - node.query("OPTIMIZE TABLE s3.test") - - assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) - assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) - node.stop_clickhouse() drop_s3_metadata(node) - node.start_clickhouse() - - # All data is removed. - assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0) - - node.stop_clickhouse() create_restore_file(node) - node.start_clickhouse(10) + node.start_clickhouse() assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -191,7 +188,7 @@ def test_restore_another_bucket_path(cluster): node_another_bucket.stop_clickhouse() create_restore_file(node_another_bucket, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -202,7 +199,7 @@ def test_restore_another_bucket_path(cluster): node_another_bucket_path.stop_clickhouse() create_restore_file(node_another_bucket_path, bucket="root2", path="data") - node_another_bucket_path.start_clickhouse(10) + node_another_bucket_path.start_clickhouse() assert node_another_bucket_path.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket_path.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -244,7 +241,7 @@ def test_restore_different_revisions(cluster): drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision1, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -255,7 +252,7 @@ def test_restore_different_revisions(cluster): drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision2, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -266,7 +263,7 @@ def test_restore_different_revisions(cluster): drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision3, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -298,7 +295,7 @@ def test_restore_mutations(cluster): drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision_before_mutation, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -309,7 +306,7 @@ def test_restore_mutations(cluster): drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision_after_mutation, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -323,7 +320,7 @@ def test_restore_mutations(cluster): purge_s3(cluster, cluster.minio_bucket_2) revision = (revision_before_mutation + revision_after_mutation) // 2 create_restore_file(node_another_bucket, revision=revision, bucket="root") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() # Wait for unfinished mutation completion. time.sleep(3) @@ -365,7 +362,57 @@ def test_migrate_to_restorable_schema(cluster): drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data") - node_another_bucket.start_clickhouse(10) + node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) + + +@pytest.mark.parametrize( + "replicated", [False, True] +) +def test_restore_to_detached(cluster, replicated): + node = cluster.instances["node"] + + create_table(node, "test", replicated) + + node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) + node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) + node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1))) + node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-07', 4096, 0))) + + # Add some mutation. + node.query("ALTER TABLE s3.test UPDATE counter = 1 WHERE 1", settings={"mutations_sync": 2}) + + # Detach some partition. + node.query("ALTER TABLE s3.test DETACH PARTITION '2020-01-07'") + + node.query("ALTER TABLE s3.test FREEZE") + revision = get_revision_counter(node, 1) + + node_another_bucket = cluster.instances["node_another_bucket"] + + create_table(node_another_bucket, "test", replicated) + + node_another_bucket.stop_clickhouse() + create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True) + node_another_bucket.start_clickhouse() + + assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0) + + node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-03'") + node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-04'") + node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-05'") + node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-06'") + + assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) + assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) + assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) + + # Attach partition that was already detached before backup-restore. + node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-07'") + + assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 5) + assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) + assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 5)