Updated strategy for handling internal backups & restores to avoid concurrent internal backups & restores - Added settings to disallow concurrent backups and restores

This commit is contained in:
Smita Kulkarni 2023-01-17 22:27:13 +01:00
parent 46b21629ed
commit 6e06af1b25
8 changed files with 138 additions and 54 deletions

View File

@ -6,7 +6,7 @@
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <Backups/SettingsFieldOptionalUUID.h>
namespace DB
{
@ -16,48 +16,6 @@ namespace ErrorCodes
extern const int WRONG_BACKUP_SETTINGS;
}
namespace
{
struct SettingFieldOptionalUUID
{
std::optional<UUID> value;
explicit SettingFieldOptionalUUID(const std::optional<UUID> & value_) : value(value_) {}
explicit SettingFieldOptionalUUID(const Field & field)
{
if (field.getType() == Field::Types::Null)
{
value = std::nullopt;
return;
}
if (field.getType() == Field::Types::String)
{
const String & str = field.get<const String &>();
if (str.empty())
{
value = std::nullopt;
return;
}
UUID id;
if (tryParse(id, str))
{
value = id;
return;
}
}
throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot parse uuid from {}", field);
}
explicit operator Field() const { return Field(value ? toString(*value) : ""); }
};
}
/// List of backup settings except base_backup_name and cluster_host_ids.
#define LIST_OF_BACKUP_SETTINGS(M) \
M(String, id) \

View File

@ -160,8 +160,15 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
else
backup_id = toString(*backup_settings.backup_uuid);
if (!backup_settings.internal && (num_active_backups && !allow_concurrent_backups))
/// Check if there are no concurrent backups
if (num_active_backups && !allow_concurrent_backups)
{
/// If its an internal backup and we currently have 1 active backup, it could be the original query, validate using backup_uuid
if(!(num_active_backups==1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid)))
{
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
}
}
std::shared_ptr<IBackupCoordination> backup_coordination;
if (backup_settings.internal)
@ -376,6 +383,9 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
if (!restore_settings.backup_uuid)
restore_settings.backup_uuid = UUIDHelpers::generateV4();
/// `restore_id` will be used as a key to the `infos` map, so it should be unique.
OperationID restore_id;
if (restore_settings.internal)
@ -383,10 +393,17 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
else if (!restore_settings.id.empty())
restore_id = restore_settings.id;
else
restore_id = toString(UUIDHelpers::generateV4());
restore_id = toString(*restore_settings.backup_uuid);
if (!restore_settings.internal && (num_active_restores && !allow_concurrent_restores))
/// Check if there are no concurrent restores
if (num_active_restores && !allow_concurrent_restores)
{
/// If its an internal restore and we currently have 1 active restore, it could be the original query, validate using iz
if(!(num_active_restores==1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid)))
{
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
}
}
std::shared_ptr<IRestoreCoordination> restore_coordination;
if (restore_settings.internal)
@ -480,6 +497,7 @@ void BackupsWorker::doRestore(
backup_open_params.context = context;
backup_open_params.backup_info = backup_info;
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.backup_uuid = restore_settings.backup_uuid;
backup_open_params.password = restore_settings.password;
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
@ -696,6 +714,30 @@ std::vector<BackupsWorker::Info> BackupsWorker::getAllInfos() const
return res_infos;
}
std::vector<BackupsWorker::Info> BackupsWorker::getAllActiveBackupInfos() const
{
std::vector<Info> res_infos;
std::lock_guard lock{infos_mutex};
for (const auto & info : infos | boost::adaptors::map_values)
{
if (info.status==BackupStatus::CREATING_BACKUP)
res_infos.push_back(info);
}
return res_infos;
}
std::vector<BackupsWorker::Info> BackupsWorker::getAllActiveRestoreInfos() const
{
std::vector<Info> res_infos;
std::lock_guard lock{infos_mutex};
for (const auto & info : infos | boost::adaptors::map_values)
{
if (info.status==BackupStatus::RESTORING)
res_infos.push_back(info);
}
return res_infos;
}
void BackupsWorker::shutdown()
{
bool has_active_backups_and_restores = (num_active_backups || num_active_restores);

View File

@ -103,6 +103,8 @@ private:
void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true);
void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }
void setNumFilesAndSize(const OperationID & id, size_t num_files, UInt64 uncompressed_size, UInt64 compressed_size);
std::vector<Info> getAllActiveBackupInfos() const;
std::vector<Info> getAllActiveRestoreInfos() const;
ThreadPool backups_thread_pool;
ThreadPool restores_thread_pool;

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTSetQuery.h>
#include <boost/algorithm/string/predicate.hpp>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Backups/SettingsFieldOptionalUUID.h>
namespace DB
@ -162,7 +163,9 @@ namespace
M(RestoreUDFCreationMode, create_function) \
M(Bool, internal) \
M(String, host_id) \
M(String, coordination_zk_path)
M(String, coordination_zk_path) \
M(OptionalUUID, backup_uuid)
RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query)
{

View File

@ -122,6 +122,11 @@ struct RestoreSettings
/// Path in Zookeeper used to coordinate restoring process while executing by RESTORE ON CLUSTER.
String coordination_zk_path;
/// Internal, should not be specified by user.
/// UUID of the backup. If it's not set it will be generated randomly.
/// This is used to validate internal restores when allow_concurrent_restores is turned off
std::optional<UUID> backup_uuid;
static RestoreSettings fromRestoreQuery(const ASTBackupQuery & query);
void copySettingsToQuery(ASTBackupQuery & query) const;
};

View File

@ -0,0 +1,43 @@
#include <Backups/SettingsFieldOptionalUUID.h>
#include <Common/ErrorCodes.h>
#include <Core/SettingsFields.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_BACKUP_SETTINGS;
}
SettingFieldOptionalUUID::SettingFieldOptionalUUID(const Field & field)
{
if (field.getType() == Field::Types::Null)
{
value = std::nullopt;
return;
}
if (field.getType() == Field::Types::String)
{
const String & str = field.get<const String &>();
if (str.empty())
{
value = std::nullopt;
return;
}
UUID id;
if (tryParse(id, str))
{
value = id;
return;
}
}
throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot parse uuid from {}", field);
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <optional>
#include <Core/SettingsFields.h>
namespace DB
{
struct SettingFieldOptionalUUID
{
std::optional<UUID> value;
explicit SettingFieldOptionalUUID(const std::optional<UUID> & value_) : value(value_) {}
explicit SettingFieldOptionalUUID(const Field & field);
explicit operator Field() const { return Field(value ? toString(*value) : ""); }
};
}

View File

@ -19,12 +19,25 @@ def generate_cluster_def():
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write("<clickhouse>\n\t<remote_servers>\n\t\t<cluster>\n\t\t\t<shard>\n")
f.write("""
<clickhouse>
<remote_servers>
<cluster>
<shard>
""")
for i in range(num_nodes):
f.write(
f"\t\t\t\t<replica>\n\t\t\t\t\t<host>node{i}</host>\n\t\t\t\t\t<port>9000</port>\n\t\t\t\t</replica>\n"
)
f.write("\t\t\t</shard>\n\t\t</cluster>\n\t</remote_servers>\n</clickhouse>")
f.write("""
<replica>
<host>node"""+str(i)+"""</host>
<port>9000</port>
</replica>
""")
f.write("""
</shard>
</cluster>
</remote_servers>
</clickhouse>
""")
return path