New syntax for BACKUP/RESTORE: set backup engine explicitly.

This commit is contained in:
Vitaly Baranov 2021-11-05 17:18:23 +03:00
parent 78e9af8f0a
commit cf77c0b3fc
20 changed files with 405 additions and 214 deletions

View File

@ -703,10 +703,6 @@ if (ThreadFuzzer::instance().isEffective())
setupTmpPath(log, disk->getPath());
}
/// Storage keeping all the backups.
fs::create_directories(path / "backups");
global_context->setBackupsVolume(config().getString("backups_path", path / "backups"), config().getString("backups_policy", ""));
/** Directory with 'flags': files indicating temporary settings for the server set by system administrator.
* Flags may be cleared automatically after being applied by the server.
* Examples: do repair of local data; clone all replicated tables from replica.

View File

@ -1,65 +1,41 @@
#include <Backups/BackupFactory.h>
#include <Backups/BackupInDirectory.h>
#include <Interpreters/Context.h>
#include <Disks/IVolume.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_NOT_FOUND;
extern const int BACKUP_ALREADY_EXISTS;
extern const int NOT_ENOUGH_SPACE;
extern const int BACKUP_ENGINE_NOT_FOUND;
extern const int LOGICAL_ERROR;
}
BackupFactory & BackupFactory::instance()
{
static BackupFactory the_instance;
return the_instance;
}
void BackupFactory::setBackupsVolume(VolumePtr backups_volume_)
BackupMutablePtr BackupFactory::createBackup(const CreateParams & params) const
{
backups_volume = backups_volume_;
const String & engine_name = params.backup_info.backup_engine_name;
auto it = creators.find(engine_name);
if (it == creators.end())
throw Exception(ErrorCodes::BACKUP_ENGINE_NOT_FOUND, "Not found backup engine {}", engine_name);
return (it->second)(params);
}
BackupMutablePtr BackupFactory::createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup) const
void BackupFactory::registerBackupEngine(const String & engine_name, const CreatorFn & creator_fn)
{
if (!backups_volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume");
for (const auto & disk : backups_volume->getDisks())
{
if (disk->exists(backup_name))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(backup_name));
}
auto reservation = backups_volume->reserve(estimated_backup_size);
if (!reservation)
throw Exception(
ErrorCodes::NOT_ENOUGH_SPACE,
"Couldn't reserve {} bytes of free space for new backup {}",
estimated_backup_size,
quoteString(backup_name));
return std::make_shared<BackupInDirectory>(IBackup::OpenMode::WRITE, reservation->getDisk(), backup_name, base_backup);
if (creators.contains(engine_name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup engine {} was registered twice", engine_name);
creators[engine_name] = creator_fn;
}
BackupPtr BackupFactory::openBackup(const String & backup_name, const BackupPtr & base_backup) const
void registerBackupEngines(BackupFactory & factory);
BackupFactory::BackupFactory()
{
if (!backups_volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume");
for (const auto & disk : backups_volume->getDisks())
{
if (disk->exists(backup_name))
return std::make_shared<BackupInDirectory>(IBackup::OpenMode::READ, disk, backup_name, base_backup);
}
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(backup_name));
registerBackupEngines(*this);
}
}

View File

@ -1,38 +1,46 @@
#pragma once
#include <Backups/IBackup.h>
#include <Backups/BackupInfo.h>
#include <Core/Types.h>
#include <Parsers/IAST_fwd.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <optional>
#include <unordered_map>
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class Context;
using ContextMutablePtr = std::shared_ptr<Context>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using ContextPtr = std::shared_ptr<const Context>;
/// Factory for implementations of the IBackup interface.
class BackupFactory : boost::noncopyable
{
public:
using OpenMode = IBackup::OpenMode;
struct CreateParams
{
OpenMode open_mode = OpenMode::WRITE;
BackupInfo backup_info;
std::optional<BackupInfo> base_backup_info;
ContextPtr context;
};
static BackupFactory & instance();
/// Must be called to initialize the backup factory.
void setBackupsVolume(VolumePtr backups_volume_);
/// Creates a new backup or opens it.
BackupMutablePtr createBackup(const CreateParams & params) const;
/// Creates a new backup and open it for writing.
BackupMutablePtr createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup = {}) const;
/// Opens an existing backup for reading.
BackupPtr openBackup(const String & backup_name, const BackupPtr & base_backup = {}) const;
using CreatorFn = std::function<BackupMutablePtr(const CreateParams & params)>;
void registerBackupEngine(const String & engine_name, const CreatorFn & creator_fn);
private:
VolumePtr backups_volume;
BackupFactory();
std::unordered_map<String, CreatorFn> creators;
};
}

View File

@ -9,6 +9,7 @@
#include <Common/quoteString.h>
#include <Disks/DiskSelector.h>
#include <Disks/IDisk.h>
#include <Disks/DiskLocal.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
@ -32,6 +33,7 @@ namespace ErrorCodes
extern const int BACKUP_ENTRY_ALREADY_EXISTS;
extern const int BACKUP_ENTRY_NOT_FOUND;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
@ -40,12 +42,21 @@ namespace
const UInt64 BACKUP_VERSION = 1;
}
BackupInDirectory::BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr<const IBackup> & base_backup_)
: open_mode(open_mode_), disk(disk_), path(path_), path_with_sep(path_), base_backup(base_backup_)
BackupInDirectory::BackupInDirectory(const String & backup_name_, OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const ContextPtr & context_, const std::optional<BackupInfo> & base_backup_info_)
: backup_name(backup_name_), open_mode(open_mode_), disk(disk_), path(path_), context(context_), base_backup_info(base_backup_info_)
{
if (!path_with_sep.ends_with('/'))
path_with_sep += '/';
trimRight(path, '/');
if (!path.empty() && (path.back() != '/'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Backup {}: Path to backup must end with '/', but {} doesn't.", getName(), path);
if (!disk)
{
if (path.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Backup {}: Path to backup must not be empty.", getName());
disk = std::make_shared<DiskLocal>("internal", path, 0);
path = "";
}
open();
}
@ -58,19 +69,32 @@ void BackupInDirectory::open()
{
if (open_mode == OpenMode::WRITE)
{
if (disk->exists(path))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(path));
disk->createDirectories(path);
directory_was_created = true;
writePathToBaseBackup();
if (disk->exists(path + ".contents"))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", getName());
if (!path.empty() && !disk->isDirectory(path))
{
disk->createDirectories(path);
directory_was_created = true;
}
writeBaseBackupInfo();
}
if (open_mode == OpenMode::READ)
{
if (!disk->isDirectory(path))
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(path));
if (!path.empty() && !disk->isDirectory(path))
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", getName());
readContents();
readPathToBaseBackup();
readBaseBackupInfo();
}
if (base_backup_info)
{
BackupFactory::CreateParams params;
params.backup_info = *base_backup_info;
params.open_mode = OpenMode::READ;
params.context = context;
base_backup = BackupFactory::instance().createBackup(params);
}
}
@ -87,36 +111,34 @@ void BackupInDirectory::close()
}
}
void BackupInDirectory::writePathToBaseBackup()
void BackupInDirectory::writeBaseBackupInfo()
{
String file_path = path_with_sep + ".base_backup";
if (!base_backup)
String file_path = path + ".base_backup";
if (!base_backup_info)
{
disk->removeFileIfExists(file_path);
return;
}
auto out = disk->writeFile(file_path);
writeString(base_backup->getPath(), *out);
writeString(base_backup_info->toString(), *out);
}
void BackupInDirectory::readPathToBaseBackup()
void BackupInDirectory::readBaseBackupInfo()
{
if (base_backup)
if (base_backup_info)
return;
String file_path = path_with_sep + ".base_backup";
String file_path = path + ".base_backup";
if (!disk->exists(file_path))
return;
auto in = disk->readFile(file_path);
String base_backup_path;
readStringUntilEOF(base_backup_path, *in);
if (base_backup_path.empty())
return;
base_backup = BackupFactory::instance().openBackup(base_backup_path);
String str;
readStringUntilEOF(str, *in);
base_backup_info.emplace(BackupInfo::fromString(str));
}
void BackupInDirectory::writeContents()
{
auto out = disk->writeFile(path_with_sep + ".contents");
auto out = disk->writeFile(path + ".contents");
writeVarUInt(BACKUP_VERSION, *out);
writeVarUInt(infos.size(), *out);
@ -136,11 +158,11 @@ void BackupInDirectory::writeContents()
void BackupInDirectory::readContents()
{
auto in = disk->readFile(path_with_sep + ".contents");
auto in = disk->readFile(path + ".contents");
UInt64 version;
readVarUInt(version, *in);
if (version != BACKUP_VERSION)
throw Exception(ErrorCodes::BACKUP_VERSION_NOT_SUPPORTED, "Backup {}: Version {} is not supported", quoteString(path), version);
throw Exception(ErrorCodes::BACKUP_VERSION_NOT_SUPPORTED, "Backup {}: Version {} is not supported", getName(), version);
size_t num_infos;
readVarUInt(num_infos, *in);
@ -164,16 +186,6 @@ void BackupInDirectory::readContents()
}
}
IBackup::OpenMode BackupInDirectory::getOpenMode() const
{
return open_mode;
}
String BackupInDirectory::getPath() const
{
return path;
}
Strings BackupInDirectory::list(const String & prefix, const String & terminator) const
{
if (!prefix.ends_with('/') && !prefix.empty())
@ -209,7 +221,7 @@ size_t BackupInDirectory::getSize(const String & name) const
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", getName(), quoteString(name));
return it->second.size;
}
@ -219,7 +231,7 @@ UInt128 BackupInDirectory::getChecksum(const String & name) const
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", getName(), quoteString(name));
return it->second.checksum;
}
@ -230,7 +242,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", getName(), quoteString(name));
const auto & info = it->second;
if (!info.size)
@ -242,7 +254,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromImmutableFile>(disk, path_with_sep + name, info.size, info.checksum);
return std::make_unique<BackupEntryFromImmutableFile>(disk, path + name, info.size, info.checksum);
}
if (info.size < info.base_size)
@ -250,7 +262,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
throw Exception(
ErrorCodes::BACKUP_DAMAGED,
"Backup {}: Entry {} has its data size less than in the base backup {}: {} < {}",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()), info.size, info.base_size);
getName(), quoteString(name), base_backup->getName(), info.size, info.base_size);
}
if (!base_backup)
@ -258,7 +270,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
quoteString(path), quoteString(name));
getName(), quoteString(name));
}
if (!base_backup->exists(name))
@ -266,7 +278,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
quoteString(path), quoteString(name));
getName(), quoteString(name));
}
auto base_entry = base_backup->read(name);
@ -276,7 +288,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected size in the base backup {}: {} (expected size: {})",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()), base_size, info.base_size);
getName(), quoteString(name), base_backup->getName(), base_size, info.base_size);
}
auto base_checksum = base_entry->getChecksum();
@ -285,7 +297,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected checksum in the base backup {}",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()));
getName(), quoteString(name), base_backup->getName());
}
if (info.size == info.base_size)
@ -298,7 +310,7 @@ BackupEntryPtr BackupInDirectory::read(const String & name) const
/// and the ending goes from this backup.
return std::make_unique<BackupEntryConcat>(
std::move(base_entry),
std::make_unique<BackupEntryFromImmutableFile>(disk, path_with_sep + name, info.size - info.base_size),
std::make_unique<BackupEntryFromImmutableFile>(disk, path + name, info.size - info.base_size),
info.checksum);
}
@ -311,7 +323,7 @@ void BackupInDirectory::write(const String & name, BackupEntryPtr entry)
if (infos.contains(name))
throw Exception(
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", quoteString(path), quoteString(name));
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", getName(), quoteString(name));
UInt64 size = entry->getSize();
std::optional<UInt128> checksum = entry->getChecksum();
@ -421,7 +433,7 @@ void BackupInDirectory::write(const String & name, BackupEntryPtr entry)
maybe_hashing_read_buffer = &hashing_read_buffer.emplace(*read_buffer);
/// Copy the entry's data after `copy_pos`.
String out_file_path = path_with_sep + name;
String out_file_path = path + name;
disk->createDirectories(directoryPath(out_file_path));
auto out = disk->writeFile(out_file_path);
@ -451,4 +463,46 @@ void BackupInDirectory::finalizeWriting()
finalized = true;
}
void registerBackupEngineFile(BackupFactory & factory)
{
auto creator_fn = [](const BackupFactory::CreateParams & params)
{
String backup_name = params.backup_info.toString();
const String & engine_name = params.backup_info.backup_engine_name;
const auto & args = params.backup_info.args;
DiskPtr disk;
String path;
if (engine_name == "File")
{
if (args.size() != 1)
{
throw Exception(
"Backup engine 'File' requires 1 argument (path)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
path = args[0].safeGet<String>();
}
else if (engine_name == "Disk")
{
if (args.size() < 1 || args.size() > 2)
{
throw Exception(
"Backup engine 'Disk' requires 1 or 2 arguments",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
String disk_name = args[0].safeGet<String>();
disk = params.context->getDisk(disk_name);
if (args.size() >= 2)
path = args[1].safeGet<String>();
}
return std::make_shared<BackupInDirectory>(backup_name, params.open_mode, disk, path, params.context, params.base_backup_info);
};
factory.registerBackupEngine("File", creator_fn);
factory.registerBackupEngine("Disk", creator_fn);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Backups/IBackup.h>
#include <Backups/BackupInfo.h>
#include <map>
#include <mutex>
@ -9,23 +10,29 @@ namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Represents a backup stored on a disk.
/// A backup is stored as a directory, each entry is stored as a file in that directory.
/// Also three system files are stored:
/// 1) ".base" is an XML file with information about the base backup.
/// 1) ".base_backup" is a text file with information about the base backup.
/// 2) ".contents" is a binary file containing a list of all entries along with their sizes
/// and checksums and information whether the base backup should be used for each entry
/// 3) ".write_lock" is a temporary empty file which is created before writing of a backup
/// and deleted after finishing that writing.
class BackupInDirectory : public IBackup
{
public:
BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr<const IBackup> & base_backup_ = {});
BackupInDirectory(
const String & backup_name_,
OpenMode open_mode_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info = {});
~BackupInDirectory() override;
OpenMode getOpenMode() const override;
String getPath() const override;
const String & getName() const override { return backup_name; }
OpenMode getOpenMode() const override { return open_mode; }
Strings list(const String & prefix, const String & terminator) const override;
bool exists(const String & name) const override;
size_t getSize(const String & name) const override;
@ -37,8 +44,8 @@ public:
private:
void open();
void close();
void writePathToBaseBackup();
void readPathToBaseBackup();
void writeBaseBackupInfo();
void readBaseBackupInfo();
void writeContents();
void readContents();
@ -52,10 +59,12 @@ private:
UInt128 base_checksum{0, 0};
};
const String backup_name;
const OpenMode open_mode;
const DiskPtr disk;
DiskPtr disk;
String path;
String path_with_sep;
ContextPtr context;
std::optional<BackupInfo> base_backup_info;
std::shared_ptr<const IBackup> base_backup;
std::map<String, EntryInfo> infos;
bool directory_was_created = false;

View File

@ -0,0 +1,66 @@
#include <Backups/BackupInfo.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
namespace DB
{
String BackupInfo::toString() const
{
auto func = std::make_shared<ASTFunction>();
func->name = backup_engine_name;
func->no_empty_args = true;
auto list = std::make_shared<ASTExpressionList>();
func->arguments = list;
func->children.push_back(list);
list->children.reserve(args.size());
for (const auto & arg : args)
list->children.push_back(std::make_shared<ASTLiteral>(arg));
return serializeAST(*func);
}
BackupInfo BackupInfo::fromString(const String & str)
{
ParserIdentifierWithOptionalParameters parser;
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
return fromAST(*ast);
}
BackupInfo BackupInfo::fromAST(const IAST & ast)
{
const auto * func = ast.as<const ASTFunction>();
if (!func)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected function, got {}", serializeAST(ast));
BackupInfo res;
res.backup_engine_name = func->name;
if (func->arguments)
{
const auto * list = func->arguments->as<const ASTExpressionList>();
if (!list)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected list, got {}", serializeAST(*func->arguments));
res.args.reserve(list->children.size());
for (const auto & elem : list->children)
{
const auto * lit = elem->as<const ASTLiteral>();
if (!lit)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected literal, got {}", serializeAST(*elem));
res.args.push_back(lit->value);
}
}
return res;
}
}

21
src/Backups/BackupInfo.h Normal file
View File

@ -0,0 +1,21 @@
#pragma once
#include <Core/Field.h>
namespace DB
{
class IAST;
/// Information about a backup.
struct BackupInfo
{
String backup_engine_name;
std::vector<Field> args;
String toString() const;
static BackupInfo fromString(const String & str);
static BackupInfo fromAST(const IAST & ast);
};
}

View File

@ -7,7 +7,7 @@ namespace DB
{
#define LIST_OF_BACKUP_SETTINGS(M) \
M(String, base_backup, "", "Name of the base backup. Only differences made after the base backup will be included in a newly created backup, so this option allows to make an incremental backup.", 0) \
M(Bool, dummy, false, "", 0) \
DECLARE_SETTINGS_TRAITS_ALLOW_CUSTOM_SETTINGS(BackupSettingsTraits, LIST_OF_BACKUP_SETTINGS)

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Common/TypePromotion.h>
#include <memory>
@ -12,11 +13,15 @@ using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names.
/// A backup can be either incremental or non-incremental. An incremental backup doesn't store
/// the data of the entries which are not changed compared to its base backup.
class IBackup
class IBackup : public std::enable_shared_from_this<IBackup>, public TypePromotion<IBackup>
{
public:
IBackup() {}
virtual ~IBackup() = default;
/// Name of the backup.
virtual const String & getName() const = 0;
enum class OpenMode
{
READ,
@ -26,9 +31,6 @@ public:
/// A backup can be open either in READ or WRITE mode.
virtual OpenMode getOpenMode() const = 0;
/// Returns the path to the backup.
virtual String getPath() const = 0;
/// Returns names of entries stored in the backup.
/// If `prefix` isn't empty the function will return only the names starting with
/// the prefix (but without the prefix itself).

View File

@ -0,0 +1,14 @@
namespace DB
{
class BackupFactory;
void registerBackupEngineFile(BackupFactory &);
void registerBackupEngines(BackupFactory & factory)
{
registerBackupEngineFile(factory);
}
}

View File

@ -594,6 +594,7 @@
M(624, BAD_FILE_TYPE) \
M(625, IO_SETUP_ERROR) \
M(626, CANNOT_SKIP_UNKNOWN_FIELD) \
M(627, BACKUP_ENGINE_NOT_FOUND) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -572,35 +572,6 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
return shared->tmp_volume;
}
void Context::setBackupsVolume(const String & path, const String & policy_name)
{
std::lock_guard lock(shared->storage_policies_mutex);
if (policy_name.empty())
{
String path_with_separator = path;
if (!path_with_separator.ends_with('/'))
path_with_separator += '/';
auto disk = std::make_shared<DiskLocal>("_backups_default", path_with_separator, 0);
shared->backups_volume = std::make_shared<SingleDiskVolume>("_backups_default", disk, 0);
}
else
{
StoragePolicyPtr policy = getStoragePolicySelector(lock)->get(policy_name);
if (policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used for backups, such policy should have exactly one volume",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->backups_volume = policy->getVolume(0);
}
BackupFactory::instance().setBackupsVolume(shared->backups_volume);
}
VolumePtr Context::getBackupsVolume() const
{
std::lock_guard lock(shared->storage_policies_mutex);
return shared->backups_volume;
}
void Context::setFlagsPath(const String & path)
{
auto lock = getLock();

View File

@ -354,9 +354,6 @@ public:
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
void setBackupsVolume(const String & path, const String & policy_name = "");
VolumePtr getBackupsVolume() const;
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Global application configuration settings.

View File

@ -12,40 +12,39 @@ namespace DB
{
namespace
{
BackupSettings getBackupSettings(const ASTBackupQuery & query)
BackupMutablePtr createBackup(const ASTBackupQuery & query, const ContextPtr & context)
{
BackupSettings settings;
BackupFactory::CreateParams params;
params.open_mode = (query.kind == ASTBackupQuery::BACKUP) ? IBackup::OpenMode::WRITE : IBackup::OpenMode::READ;
params.context = context;
params.backup_info = BackupInfo::fromAST(*query.backup_name);
if (query.base_backup_name)
params.base_backup_info = BackupInfo::fromAST(*query.base_backup_name);
return BackupFactory::instance().createBackup(params);
}
#if 0
void getBackupSettings(const ASTBackupQuery & query, BackupSettings & settings, std::optional<BaseBackupInfo> & base_backup)
{
settings = {};
if (query.settings)
settings.applyChanges(query.settings->as<const ASTSetQuery &>().changes);
return settings;
}
BackupPtr getBaseBackup(const BackupSettings & settings)
{
const String & base_backup_name = settings.base_backup;
if (base_backup_name.empty())
return nullptr;
return BackupFactory::instance().openBackup(base_backup_name);
}
#endif
void executeBackup(const ASTBackupQuery & query, const ContextPtr & context)
{
auto settings = getBackupSettings(query);
auto base_backup = getBaseBackup(settings);
BackupMutablePtr backup = createBackup(query, context);
auto backup_entries = makeBackupEntries(query.elements, context);
UInt64 estimated_backup_size = estimateBackupSize(backup_entries, base_backup);
auto backup = BackupFactory::instance().createBackup(query.backup_name, estimated_backup_size, base_backup);
writeBackupEntries(backup, std::move(backup_entries), context->getSettingsRef().max_backup_threads);
}
void executeRestore(const ASTBackupQuery & query, ContextMutablePtr context)
{
auto settings = getBackupSettings(query);
auto base_backup = getBaseBackup(settings);
auto backup = BackupFactory::instance().openBackup(query.backup_name, base_backup);
BackupPtr backup = createBackup(query, context);
auto restore_tasks = makeRestoreTasks(query.elements, context, backup);
executeRestoreTasks(std::move(restore_tasks), context->getSettingsRef().max_backup_threads);
}

View File

@ -94,10 +94,25 @@ namespace
}
}
void formatSettings(const IAST & settings, const IAST::FormatSettings & format)
void formatSettings(const ASTPtr & settings, const ASTPtr & base_backup_name, const IAST::FormatSettings & format)
{
if (!settings && !base_backup_name)
return;
format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " SETTINGS " << (format.hilite ? IAST::hilite_none : "");
settings.format(format);
bool empty = true;
if (base_backup_name)
{
format.ostr << "base_backup = ";
base_backup_name->format(format);
empty = false;
}
if (settings)
{
if (!empty)
format.ostr << ", ";
settings->format(format);
}
}
}
@ -120,11 +135,11 @@ void ASTBackupQuery::formatImpl(const FormatSettings & format, FormatState &, Fo
formatElements(elements, kind, format);
if (settings)
formatSettings(*settings, format);
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO " : " FROM ") << (format.hilite ? hilite_none : "");
backup_name->format(format);
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO" : " FROM") << (format.hilite ? hilite_none : "");
format.ostr << " " << quoteString(backup_name);
if (settings || base_backup_name)
formatSettings(settings, base_backup_name, format);
}
}

View File

@ -16,8 +16,9 @@ using DatabaseAndTableName = std::pair<String, String>;
* TEMPORARY TABLE table_name [AS table_name_in_backup]
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* TO 'backup_name'
* SETTINGS base_backup='base_backup_name'
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {File(...) | Disk(...)}]
*
* RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] |
@ -26,7 +27,7 @@ using DatabaseAndTableName = std::pair<String, String>;
* TEMPORARY TABLE table_name_in_backup [INTO table_name] |
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* FROM 'backup_name'
* FROM {File(...) | Disk(...)}
*
* Notes:
* RESTORE doesn't drop any data, it either creates a table or appends an existing table with restored data.
@ -76,7 +77,11 @@ public:
using Elements = std::vector<Element>;
Elements elements;
String backup_name;
ASTPtr backup_name;
/// Base backup. Only differences made after the base backup will be included in a newly created backup,
/// so this setting allows to make an incremental backup.
ASTPtr base_backup_name;
ASTPtr settings;

View File

@ -148,18 +148,47 @@ namespace
});
}
bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings)
bool parseBackupName(IParser::Pos & pos, Expected & expected, ASTPtr & backup_name)
{
return ParserIdentifierWithOptionalParameters{}.parse(pos, backup_name, expected);
}
bool parseBaseBackupSetting(IParser::Pos & pos, Expected & expected, ASTPtr & base_backup_name)
{
return IParserBase::wrapParseImpl(pos, [&]
{
return ParserKeyword{"base_backup"}.ignore(pos, expected)
&& ParserToken(TokenType::Equals).ignore(pos, expected)
&& parseBackupName(pos, expected, base_backup_name);
});
}
bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings, ASTPtr & base_backup_name)
{
return IParserBase::wrapParseImpl(pos, [&]
{
if (!ParserKeyword{"SETTINGS"}.ignore(pos, expected))
return false;
ASTPtr result;
if (!ParserSetQuery{true}.parse(pos, result, expected))
ASTPtr res_settings;
ASTPtr res_base_backup_name;
auto parse_setting = [&]
{
if (!res_settings && ParserSetQuery{true}.parse(pos, res_settings, expected))
return true;
if (!res_base_backup_name && parseBaseBackupSetting(pos, expected, res_base_backup_name))
return true;
return false;
};
if (!ParserList::parseUtil(pos, expected, parse_setting, false))
return false;
settings = std::move(result);
settings = std::move(res_settings);
base_backup_name = std::move(res_base_backup_name);
return true;
});
}
@ -182,13 +211,14 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!ParserKeyword{(kind == Kind::BACKUP) ? "TO" : "FROM"}.ignore(pos, expected))
return false;
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
ASTPtr backup_name;
if (!parseBackupName(pos, expected, backup_name))
return false;
String backup_name = ast->as<ASTLiteral &>().value.safeGet<String>();
ASTPtr settings;
parseSettings(pos, expected, settings);
ASTPtr base_backup_name;
parseSettings(pos, expected, settings, base_backup_name);
auto query = std::make_shared<ASTBackupQuery>();
node = query;
@ -196,6 +226,7 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->kind = kind;
query->elements = std::move(elements);
query->backup_name = std::move(backup_name);
query->base_backup_name = std::move(base_backup_name);
query->settings = std::move(settings);
return true;

View File

@ -13,8 +13,9 @@ namespace DB
* TEMPORARY TABLE table_name [AS table_name_in_backup]
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* TO 'backup_name'
* [SETTINGS base_backup = 'base_backup_name']
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {FILE(...) | DISK(...)}]
*
* RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] |
@ -23,7 +24,7 @@ namespace DB
* TEMPORARY TABLE table_name_in_backup [INTO table_name] |
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* FROM 'backup_name'
* FROM {File(...) | Disk(...)}
*/
class ParserBackupQuery : public IParserBase
{

View File

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/var/lib/clickhouse/backups/</path>
</backups>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -3,7 +3,7 @@ import re
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
instance = cluster.add_instance('instance', main_configs=["configs/backups_disk.xml"])
def create_and_fill_table(engine="MergeTree"):
@ -18,6 +18,7 @@ def create_and_fill_table(engine="MergeTree"):
def start_cluster():
try:
cluster.start()
cluster.exec_in_container(instance.docker_id, ["mkdir", "-p", "/var/lib/clickhouse/backups/"])
yield cluster
finally:
cluster.shutdown()
@ -35,8 +36,7 @@ backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"test-backup-{backup_id_counter}"
return f"Disk('backups', '{backup_id_counter}/')"
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog"])
@ -45,12 +45,12 @@ def test_restore_table(engine):
create_and_fill_table(engine=engine)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
@ -60,12 +60,12 @@ def test_restore_table_into_existing_table(engine):
create_and_fill_table(engine=engine)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "200\t9900\n"
instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "300\t14850\n"
@ -74,11 +74,11 @@ def test_restore_table_under_another_name():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
@ -87,11 +87,11 @@ def test_backup_table_under_another_name():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table AS test.table2 TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table2 FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
@ -101,14 +101,14 @@ def test_incremental_backup():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
instance.query(f"BACKUP TABLE test.table TO '{incremental_backup_name}' SETTINGS base_backup = '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {incremental_backup_name} SETTINGS base_backup = {backup_name}")
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM '{incremental_backup_name}'")
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {incremental_backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n"
@ -116,10 +116,24 @@ def test_backup_not_found_or_already_exists():
backup_name = new_backup_name()
expected_error = "Backup .* not found"
assert re.search(expected_error, instance.query_and_get_error(f"RESTORE TABLE test.table AS test.table2 FROM '{backup_name}'"))
assert re.search(expected_error, instance.query_and_get_error(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}"))
create_and_fill_table()
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
expected_error = "Backup .* already exists"
assert re.search(expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO '{backup_name}'"))
assert re.search(expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO {backup_name}"))
def test_file_engine():
backup_name = f"File('/var/lib/clickhouse/backups/file/')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"