Fix restorable schema

This commit is contained in:
alesapin 2022-05-13 17:00:47 +02:00
parent d8580c8cb8
commit eba60ff38f
12 changed files with 100 additions and 48 deletions

View File

@ -103,7 +103,7 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
checkRemoveAccess(*azure_blob_storage_disk);
}
azure_blob_storage_disk->startup();
azure_blob_storage_disk->startup(context);
if (config.getBool(config_prefix + ".cache_enabled", true))
{

View File

@ -211,9 +211,9 @@ void DiskDecorator::shutdown()
delegate->shutdown();
}
void DiskDecorator::startup()
void DiskDecorator::startup(ContextPtr context)
{
delegate->startup();
delegate->startup(context);
}
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map)

View File

@ -71,7 +71,7 @@ public:
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override;
void startup() override;
void startup(ContextPtr context) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
String getCacheBasePath() const override { return delegate->getCacheBasePath(); }
std::vector<String> getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); }

View File

@ -484,7 +484,7 @@ DiskLocal::DiskLocal(
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
}
void DiskLocal::startup()
void DiskLocal::startup(ContextPtr)
{
try
{
@ -672,7 +672,7 @@ void registerDiskLocal(DiskFactory & factory)
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup();
disk->startup(context);
return std::make_shared<DiskRestartProxy>(disk);
};
factory.registerDiskType("local", creator);

View File

@ -110,7 +110,7 @@ public:
bool isBroken() const override { return broken; }
void startup() override;
void startup(ContextPtr) override;
void shutdown() override;

View File

@ -32,6 +32,12 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
static String revisionToString(UInt64 revision)
{
return std::bitset<64>(revision).to_string();
}
DiskObjectStorage::Metadata DiskObjectStorage::Metadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
{
Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
@ -340,16 +346,35 @@ size_t DiskObjectStorage::getFileSize(const String & path) const
return readMetadata(path).total_size;
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path, bool should_send_metadata)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "MOVE FILE");
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
if (should_send_metadata)
{
auto revision = metadata_helper->revision_counter + 1;
metadata_helper->revision_counter += 1;
const ObjectAttributes object_metadata {
{"from_path", from_path},
{"to_path", to_path}
};
metadata_helper->createFileOperationObject("rename", revision, object_metadata);
}
metadata_disk->moveFile(from_path, to_path);
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
{
moveFile(from_path, to_path, send_metadata);
}
void DiskObjectStorage::replaceFile(const String & from_path, const String & to_path)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "REPLACE FILE");
if (exists(to_path))
{
const String tmp_path = to_path + ".old";
@ -363,6 +388,7 @@ void DiskObjectStorage::replaceFile(const String & from_path, const String & to_
void DiskObjectStorage::removeSharedFile(const String & path, bool delete_metadata_only)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Remove shared file");
std::vector<String> paths_to_remove;
removeMetadata(path, paths_to_remove);
@ -372,6 +398,7 @@ void DiskObjectStorage::removeSharedFile(const String & path, bool delete_metada
void DiskObjectStorage::removeFromRemoteFS(const std::vector<String> & paths)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Read from remote FS");
object_storage->removeObjects(paths);
}
@ -416,17 +443,35 @@ bool DiskObjectStorage::checkUniqueId(const String & id) const
return checkObjectExists(id);
}
void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path)
void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path, bool should_send_metadata)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "HARDLINK FILE");
readUpdateAndStoreMetadata(src_path, false, [](Metadata & metadata) { metadata.ref_count++; return true; });
if (should_send_metadata && !dst_path.starts_with("shadow/"))
{
auto revision = metadata_helper->revision_counter + 1;
metadata_helper->revision_counter += 1;
const ObjectAttributes object_metadata {
{"src_path", src_path},
{"dst_path", dst_path}
};
metadata_helper->createFileOperationObject("hardlink", revision, object_metadata);
}
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
}
void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path)
{
createHardLink(src_path, dst_path, send_metadata);
}
void DiskObjectStorage::setReadOnly(const String & path)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "set readonly");
/// We should store read only flag inside metadata file (instead of using FS flag),
/// because we modify metadata file when create hard-links from it.
readUpdateAndStoreMetadata(path, false, [](Metadata & metadata) { metadata.read_only = true; return true; });
@ -560,15 +605,19 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
void DiskObjectStorage::shutdown()
{
LOG_INFO(log, "Shutting down disk {}", name);
object_storage->shutdown();
LOG_INFO(log, "Disk {} shut down", name);
}
void DiskObjectStorage::startup()
void DiskObjectStorage::startup(ContextPtr context)
{
LOG_INFO(log, "Starting up disk {}", name);
object_storage->startup();
restoreMetadataIfNeeded(context->getConfigRef(), "storage_configuration.disks." + name, context);
LOG_INFO(log, "Disk {} started up", name);
}
@ -649,13 +698,24 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
{
auto blob_name = getRandomASCIIString();
std::optional<ObjectAttributes> object_attributes;
if (send_metadata)
{
auto revision = metadata_helper->revision_counter + 1;
metadata_helper->revision_counter++;
object_attributes = {
{"path", path}
};
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{
readOrCreateUpdateAndStoreMetadata(path, mode, false,
[blob_name, count] (DiskObjectStorage::Metadata & metadata) { metadata.addObject(blob_name, count); return true; });
};
return object_storage->writeObject(fs::path(remote_fs_root_path) / blob_name, {}, create_metadata_callback, buf_size, settings);
return object_storage->writeObject(fs::path(remote_fs_root_path) / blob_name, object_attributes, create_metadata_callback, buf_size, settings);
}
@ -725,10 +785,6 @@ DiskObjectStorageReservation::~DiskObjectStorageReservation()
}
}
static String revisionToString(UInt64 revision)
{
return std::bitset<64>(revision).to_string();
}
void DiskObjectStorageMetadataHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
{
@ -877,8 +933,11 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchema()
void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
LOG_INFO(disk->log, "Restore operation for disk {} called", disk->name);
if (!disk->exists(RESTORE_FILE_NAME))
{
LOG_INFO(disk->log, "No restore file '{}' exists, finishing restore", RESTORE_FILE_NAME);
return;
}
@ -925,6 +984,7 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur
if (disk->exists(root))
disk->removeSharedRecursive(root + '/', !cleanup_s3, {});
LOG_INFO(disk->log, "Old metadata removed, restoring new one");
restoreFiles(source_object_storage, information);
restoreFileOperations(source_object_storage, information);
@ -1024,6 +1084,9 @@ void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_objec
std::vector<String> keys_names;
for (const auto & [key, size] : keys)
{
LOG_INFO(disk->log, "Calling restore for key for disk {}", key);
/// Skip file operations objects. They will be processed separately.
if (key.find("/operations/") != String::npos)
continue;
@ -1051,6 +1114,7 @@ void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_objec
BlobsPathToSize children;
source_object_storage->listPrefix(restore_information.source_path, children);
restore_files(children);
for (auto & result : results)
@ -1091,7 +1155,7 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc
auto relative_key = shrinkKey(source_path, key);
/// Copy object if we restore to different bucket / path.
if (disk->remote_fs_root_path != source_path)
if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != source_path)
source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage);
auto updater = [relative_key, meta] (DiskObjectStorage::Metadata & metadata)
@ -1107,6 +1171,14 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc
}
void DiskObjectStorage::onFreeze(const String & path)
{
createDirectories(path);
auto revision_file_buf = metadata_disk->writeFile(path + "revision.txt", 32);
writeIntText(metadata_helper->revision_counter.load(), *revision_file_buf);
revision_file_buf->finalize();
}
static String pathToDetached(const String & source_path)
{
if (source_path.ends_with('/'))
@ -1150,16 +1222,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
auto to_path = object_attributes["to_path"];
if (disk->exists(from_path))
{
disk->moveFile(from_path, to_path);
if (send_metadata)
{
auto next_revision = ++revision_counter;
const ObjectAttributes object_metadata {
{"from_path", from_path},
{"to_path", to_path}
};
createFileOperationObject("rename", next_revision, object_attributes);
}
disk->moveFile(from_path, to_path, send_metadata);
LOG_TRACE(disk->log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
@ -1187,16 +1250,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
if (disk->exists(src_path))
{
disk->createDirectories(directoryPath(dst_path));
if (send_metadata && !dst_path.starts_with("shadow/"))
{
auto next_revision = ++revision_counter;
const ObjectAttributes object_metadata {
{"src_path", src_path},
{"dst_path", dst_path}
};
createFileOperationObject("hardlink", next_revision, object_attributes);
}
disk->createHardLink(src_path, dst_path);
disk->createHardLink(src_path, dst_path, send_metadata);
LOG_TRACE(disk->log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
}
}

View File

@ -6,8 +6,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int DEADLOCK_AVOIDED;
{extern const int DEADLOCK_AVOIDED;
}
using Millis = std::chrono::milliseconds;
@ -329,7 +328,7 @@ void DiskRestartProxy::getRemotePathsRecursive(const String & path, std::vector<
return DiskDecorator::getRemotePathsRecursive(path, paths_map);
}
void DiskRestartProxy::restart()
void DiskRestartProxy::restart(ContextPtr context)
{
/// Speed up processing unhealthy requests.
DiskDecorator::shutdown();
@ -352,7 +351,7 @@ void DiskRestartProxy::restart()
LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName());
DiskDecorator::startup();
DiskDecorator::startup(context);
LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName());
}

View File

@ -68,7 +68,7 @@ public:
std::vector<String> getRemotePaths(const String & path) const override;
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
void restart();
void restart(ContextPtr context);
private:
friend class RestartAwareReadBuffer;

View File

@ -297,7 +297,7 @@ public:
virtual void shutdown() {}
/// Performs action on disk startup.
virtual void startup() {}
virtual void startup(ContextPtr) {}
/// Return some uniq string for file, overrode for IDiskRemote
/// Required for distinguish different copies of the same part on remote disk

View File

@ -36,7 +36,8 @@ WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS()
void WriteIndirectBufferFromRemoteFS::finalizeImpl()
{
WriteBufferFromFileDecorator::finalizeImpl();
create_metadata_callback(count());
if (create_metadata_callback)
create_metadata_callback(count());
}

View File

@ -96,9 +96,7 @@ void registerDiskS3(DiskFactory & factory)
checkRemoveAccess(*s3disk);
}
s3disk->startup();
s3disk->restoreMetadataIfNeeded(config, config_prefix, context);
s3disk->startup(context);
std::shared_ptr<IDisk> disk_result = s3disk;

View File

@ -780,7 +780,7 @@ void InterpreterSystemQuery::restartDisk(String & name)
auto disk = getContext()->getDisk(name);
if (DiskRestartProxy * restart_proxy = dynamic_cast<DiskRestartProxy*>(disk.get()))
restart_proxy->restart();
restart_proxy->restart(getContext());
else
throw Exception("Disk " + name + " doesn't have possibility to restart", ErrorCodes::BAD_ARGUMENTS);
}