mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
Use cluster state data to check concurrent backup/restore
Implementation: * BackupWorker checks the if any backup/restore which has a path in zookeeper has status not completed, if yes, new backup/restore is stopped. * For not on cluster only active backup / restore is checked. * Removed restore_uuid from RestoreSettings, as it is no longer used.
This commit is contained in:
parent
3475f84d02
commit
ef54683386
@ -160,9 +160,6 @@ namespace
|
||||
{
|
||||
return fmt::format("{:03}", counter); /// Outputs 001, 002, 003, ...
|
||||
}
|
||||
|
||||
/// We try to store data to zookeeper several times due to possible version conflicts.
|
||||
constexpr size_t NUM_ATTEMPTS = 10;
|
||||
}
|
||||
|
||||
BackupCoordinationRemote::BackupCoordinationRemote(
|
||||
@ -468,7 +465,7 @@ void BackupCoordinationRemote::updateFileInfo(const FileInfo & file_info)
|
||||
auto zk = getZooKeeper();
|
||||
String size_and_checksum = serializeSizeAndChecksum(std::pair{file_info.size, file_info.checksum});
|
||||
String full_path = zookeeper_path + "/file_infos/" + size_and_checksum;
|
||||
for (size_t attempt = 0; attempt < NUM_ATTEMPTS; ++attempt)
|
||||
for (size_t attempt = 0; attempt < MAX_ZOOKEEPER_ATTEMPTS; ++attempt)
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
auto new_info = deserializeFileInfo(zk->get(full_path, &stat));
|
||||
@ -476,7 +473,7 @@ void BackupCoordinationRemote::updateFileInfo(const FileInfo & file_info)
|
||||
auto code = zk->trySet(full_path, serializeFileInfo(new_info), stat.version);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return;
|
||||
bool is_last_attempt = (attempt == NUM_ATTEMPTS - 1);
|
||||
bool is_last_attempt = (attempt == MAX_ZOOKEEPER_ATTEMPTS - 1);
|
||||
if ((code != Coordination::Error::ZBADVERSION) || is_last_attempt)
|
||||
throw zkutil::KeeperException(code, full_path);
|
||||
}
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include <Backups/BackupCoordinationReplicatedTables.h>
|
||||
#include <Backups/BackupCoordinationStageSync.h>
|
||||
|
||||
/// We try to store data to zookeeper several times due to possible version conflicts.
|
||||
constexpr size_t MAX_ZOOKEEPER_ATTEMPTS = 10;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -8,6 +8,10 @@ namespace DB
|
||||
|
||||
namespace BackupCoordinationStage
|
||||
{
|
||||
/// This stage is set after concurrency check so ensure we dont start other backup/restores
|
||||
/// when concurrent backup/restores are not allowed
|
||||
constexpr const char * SCHEDULED_TO_START = "scheduled to start";
|
||||
|
||||
/// Finding all tables and databases which we're going to put to the backup and collecting their metadata.
|
||||
constexpr const char * GATHERING_METADATA = "gathering metadata";
|
||||
|
||||
|
@ -173,13 +173,6 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
|
||||
String backup_name_for_logging = backup_info.toStringForLogging();
|
||||
try
|
||||
{
|
||||
if (!allow_concurrent_backups && hasConcurrentBackups(backup_settings))
|
||||
{
|
||||
/// addInfo is called here to record the failed backup details
|
||||
addInfo(backup_id, backup_name_for_logging, backup_settings.internal, BackupStatus::BACKUP_FAILED);
|
||||
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
|
||||
}
|
||||
|
||||
addInfo(backup_id, backup_name_for_logging, backup_settings.internal, BackupStatus::CREATING_BACKUP);
|
||||
|
||||
/// Prepare context to use.
|
||||
@ -259,6 +252,7 @@ void BackupsWorker::doBackup(
|
||||
}
|
||||
|
||||
bool on_cluster = !backup_query->cluster.empty();
|
||||
|
||||
assert(mutable_context || (!on_cluster && !called_async));
|
||||
|
||||
/// Checks access rights if this is not ON CLUSTER query.
|
||||
@ -284,6 +278,9 @@ void BackupsWorker::doBackup(
|
||||
if (!backup_coordination)
|
||||
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
|
||||
|
||||
if (!allow_concurrent_backups && !backup_settings.internal && hasConcurrentBackups(backup_id, context, on_cluster))
|
||||
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
|
||||
|
||||
/// Opens a backup for writing.
|
||||
BackupFactory::CreateParams backup_create_params;
|
||||
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
|
||||
@ -384,9 +381,6 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
|
||||
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
|
||||
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
|
||||
|
||||
if (!restore_settings.restore_uuid)
|
||||
restore_settings.restore_uuid = UUIDHelpers::generateV4();
|
||||
|
||||
/// `restore_id` will be used as a key to the `infos` map, so it should be unique.
|
||||
OperationID restore_id;
|
||||
if (restore_settings.internal)
|
||||
@ -394,7 +388,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
|
||||
else if (!restore_settings.id.empty())
|
||||
restore_id = restore_settings.id;
|
||||
else
|
||||
restore_id = toString(*restore_settings.restore_uuid);
|
||||
restore_id = toString(UUIDHelpers::generateV4());
|
||||
|
||||
std::shared_ptr<IRestoreCoordination> restore_coordination;
|
||||
if (restore_settings.internal)
|
||||
@ -410,13 +404,6 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
|
||||
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
|
||||
String backup_name_for_logging = backup_info.toStringForLogging();
|
||||
|
||||
if (!allow_concurrent_restores && hasConcurrentRestores(restore_settings))
|
||||
{
|
||||
/// addInfo is called here to record the failed restore details
|
||||
addInfo(restore_id, backup_name_for_logging, restore_settings.internal, BackupStatus::RESTORING);
|
||||
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
|
||||
}
|
||||
|
||||
addInfo(restore_id, backup_name_for_logging, restore_settings.internal, BackupStatus::RESTORING);
|
||||
|
||||
/// Prepare context to use.
|
||||
@ -496,7 +483,6 @@ void BackupsWorker::doRestore(
|
||||
backup_open_params.context = context;
|
||||
backup_open_params.backup_info = backup_info;
|
||||
backup_open_params.base_backup_info = restore_settings.base_backup_info;
|
||||
backup_open_params.backup_uuid = restore_settings.restore_uuid;
|
||||
backup_open_params.password = restore_settings.password;
|
||||
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
|
||||
|
||||
@ -532,12 +518,15 @@ void BackupsWorker::doRestore(
|
||||
if (on_cluster && restore_settings.coordination_zk_path.empty())
|
||||
{
|
||||
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
|
||||
restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(UUIDHelpers::generateV4());
|
||||
restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_id);
|
||||
}
|
||||
|
||||
if (!restore_coordination)
|
||||
restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal);
|
||||
|
||||
if (!allow_concurrent_restores && !restore_settings.internal && hasConcurrentRestores(restore_id, context, on_cluster))
|
||||
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
|
||||
|
||||
/// Do RESTORE.
|
||||
if (on_cluster)
|
||||
{
|
||||
@ -744,32 +733,108 @@ std::vector<BackupsWorker::Info> BackupsWorker::getAllActiveRestoreInfos() const
|
||||
return res_infos;
|
||||
}
|
||||
|
||||
bool BackupsWorker::hasConcurrentBackups(const BackupSettings & backup_settings) const
|
||||
bool BackupsWorker::hasConcurrentBackups(const OperationID & backup_id, const ContextPtr & context, bool on_cluster) const
|
||||
{
|
||||
/// Check if there are no concurrent backups
|
||||
if (num_active_backups)
|
||||
if (on_cluster)
|
||||
{
|
||||
/// If its an internal backup and we currently have 1 active backup, it could be the original query, validate using backup_uuid
|
||||
if (!(num_active_backups == 1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid)))
|
||||
String common_backup_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups") ;
|
||||
auto zookeeper = context->getGlobalContext()->getZooKeeper();
|
||||
std::string backup_stage_path = common_backup_path + "/backup-" + toString(backup_id) +"/stage";
|
||||
|
||||
if (!zookeeper->exists(common_backup_path))
|
||||
zookeeper->createAncestors(common_backup_path);
|
||||
|
||||
for (size_t attempt = 0; attempt < MAX_ZOOKEEPER_ATTEMPTS; ++attempt)
|
||||
{
|
||||
return true;
|
||||
Coordination::Stat stat;
|
||||
zookeeper->get(common_backup_path, &stat);
|
||||
Strings existing_backup_paths = zookeeper->getChildren(common_backup_path);
|
||||
|
||||
for (const auto & existing_backup_path : existing_backup_paths)
|
||||
{
|
||||
if (startsWith(existing_backup_path, "restore-"))
|
||||
continue;
|
||||
|
||||
String existing_backup_id = existing_backup_path;
|
||||
existing_backup_id.erase(0, String("backup-").size());
|
||||
|
||||
if (existing_backup_id == toString(backup_id))
|
||||
continue;
|
||||
|
||||
const auto status = zookeeper->get(common_backup_path + "/" + existing_backup_path + "/stage");
|
||||
if (status != Stage::COMPLETED)
|
||||
return true;
|
||||
}
|
||||
|
||||
zookeeper->createIfNotExists(backup_stage_path, "");
|
||||
auto code = zookeeper->trySet(backup_stage_path, Stage::SCHEDULED_TO_START, stat.version);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
break;
|
||||
bool is_last_attempt = (attempt == MAX_ZOOKEEPER_ATTEMPTS - 1);
|
||||
if ((code != Coordination::Error::ZBADVERSION) || is_last_attempt)
|
||||
throw zkutil::KeeperException(code, backup_stage_path);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (num_active_backups == 1)
|
||||
return false;
|
||||
else
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool BackupsWorker::hasConcurrentRestores(const RestoreSettings & restore_settings) const
|
||||
bool BackupsWorker::hasConcurrentRestores(const OperationID & restore_id, const ContextPtr & context, bool on_cluster) const
|
||||
{
|
||||
/// Check if there are no concurrent restores
|
||||
if (num_active_restores)
|
||||
if (on_cluster)
|
||||
{
|
||||
/// If its an internal restore and we currently have 1 active restore, it could be the original query, validate using iz
|
||||
if (!(num_active_restores == 1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.restore_uuid)))
|
||||
String common_restore_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups") ;
|
||||
auto zookeeper = context->getGlobalContext()->getZooKeeper();
|
||||
std::string path = common_restore_path + "/restore-" + toString(restore_id) +"/stage";
|
||||
|
||||
if (!zookeeper->exists(common_restore_path))
|
||||
zookeeper->createAncestors(common_restore_path);
|
||||
|
||||
for (size_t attempt = 0; attempt < MAX_ZOOKEEPER_ATTEMPTS; ++attempt)
|
||||
{
|
||||
return true;
|
||||
Coordination::Stat stat;
|
||||
zookeeper->get(common_restore_path, &stat);
|
||||
Strings existing_restore_paths = zookeeper->getChildren(common_restore_path);
|
||||
for (const auto & existing_restore_path : existing_restore_paths)
|
||||
{
|
||||
if (startsWith(existing_restore_path, "backup-"))
|
||||
continue;
|
||||
|
||||
String existing_restore_id = existing_restore_path;
|
||||
existing_restore_id.erase(0, String("restore-").size());
|
||||
|
||||
if (existing_restore_id == toString(restore_id))
|
||||
continue;
|
||||
|
||||
|
||||
const auto status = zookeeper->get(common_restore_path + "/" + existing_restore_path + "/stage");
|
||||
if (status != Stage::COMPLETED)
|
||||
return true;
|
||||
}
|
||||
|
||||
zookeeper->createIfNotExists(path, "");
|
||||
auto code = zookeeper->trySet(path, Stage::SCHEDULED_TO_START, stat.version);
|
||||
if (code == Coordination::Error::ZOK)
|
||||
break;
|
||||
bool is_last_attempt = (attempt == MAX_ZOOKEEPER_ATTEMPTS - 1);
|
||||
if ((code != Coordination::Error::ZBADVERSION) || is_last_attempt)
|
||||
throw zkutil::KeeperException(code, path);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (num_active_restores == 1)
|
||||
return false;
|
||||
else
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void BackupsWorker::shutdown()
|
||||
|
@ -113,8 +113,8 @@ private:
|
||||
void setNumFilesAndSize(const OperationID & id, size_t num_files, size_t num_processed_files, UInt64 processed_files_size, UInt64 uncompressed_size, UInt64 compressed_size);
|
||||
std::vector<Info> getAllActiveBackupInfos() const;
|
||||
std::vector<Info> getAllActiveRestoreInfos() const;
|
||||
bool hasConcurrentBackups(const BackupSettings & backup_settings) const;
|
||||
bool hasConcurrentRestores(const RestoreSettings & restore_settings) const;
|
||||
bool hasConcurrentBackups(const OperationID & backup_id, const ContextPtr & context, bool on_cluster) const;
|
||||
bool hasConcurrentRestores(const OperationID & restore_id, const ContextPtr & context, bool on_cluster) const;
|
||||
|
||||
ThreadPool backups_thread_pool;
|
||||
ThreadPool restores_thread_pool;
|
||||
|
@ -163,8 +163,7 @@ namespace
|
||||
M(RestoreUDFCreationMode, create_function) \
|
||||
M(Bool, internal) \
|
||||
M(String, host_id) \
|
||||
M(String, coordination_zk_path) \
|
||||
M(OptionalUUID, restore_uuid)
|
||||
M(String, coordination_zk_path)
|
||||
|
||||
|
||||
RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query)
|
||||
|
@ -122,11 +122,6 @@ struct RestoreSettings
|
||||
/// Path in Zookeeper used to coordinate restoring process while executing by RESTORE ON CLUSTER.
|
||||
String coordination_zk_path;
|
||||
|
||||
/// Internal, should not be specified by user.
|
||||
/// UUID of the restore. If it's not set it will be generated randomly.
|
||||
/// This is used to validate internal restores when allow_concurrent_restores is turned off
|
||||
std::optional<UUID> restore_uuid;
|
||||
|
||||
static RestoreSettings fromRestoreQuery(const ASTBackupQuery & query);
|
||||
void copySettingsToQuery(ASTBackupQuery & query) const;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user