Implement BACKUP/RESTORE ON CLUSTER.

This commit is contained in:
Vitaly Baranov 2022-04-19 20:15:27 +02:00
parent 030f3e488c
commit 68a020ecea
64 changed files with 1614 additions and 516 deletions

View File

@ -1,4 +1,4 @@
#include <Backups/DistributedBackupCoordination.h>
#include <Backups/BackupCoordinationDistributed.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
@ -78,15 +78,15 @@ namespace
constexpr size_t NUM_ATTEMPTS = 10;
}
DistributedBackupCoordination::DistributedBackupCoordination(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
BackupCoordinationDistributed::BackupCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
: zookeeper_path(zookeeper_path_), get_zookeeper(get_zookeeper_)
{
createRootNodes();
}
DistributedBackupCoordination::~DistributedBackupCoordination() = default;
BackupCoordinationDistributed::~BackupCoordinationDistributed() = default;
void DistributedBackupCoordination::createRootNodes()
void BackupCoordinationDistributed::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
@ -97,13 +97,13 @@ void DistributedBackupCoordination::createRootNodes()
zookeeper->createIfNotExists(zookeeper_path + "/current_archive_suffix", "0");
}
void DistributedBackupCoordination::removeAllNodes()
void BackupCoordinationDistributed::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
}
void DistributedBackupCoordination::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
void BackupCoordinationDistributed::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
{
auto zookeeper = get_zookeeper();
@ -125,7 +125,7 @@ void DistributedBackupCoordination::addFileInfo(const FileInfo & file_info, bool
is_data_file_required = (code == Coordination::Error::ZOK) && (file_info.size > file_info.base_size);
}
void DistributedBackupCoordination::updateFileInfo(const FileInfo & file_info)
void BackupCoordinationDistributed::updateFileInfo(const FileInfo & file_info)
{
if (!file_info.size)
return; /// we don't keep FileInfos for empty files, nothing to update
@ -147,7 +147,7 @@ void DistributedBackupCoordination::updateFileInfo(const FileInfo & file_info)
}
}
std::vector<FileInfo> DistributedBackupCoordination::getAllFileInfos()
std::vector<FileInfo> BackupCoordinationDistributed::getAllFileInfos() const
{
auto zookeeper = get_zookeeper();
std::vector<FileInfo> file_infos;
@ -165,7 +165,7 @@ std::vector<FileInfo> DistributedBackupCoordination::getAllFileInfos()
return file_infos;
}
Strings DistributedBackupCoordination::listFiles(const String & prefix, const String & terminator)
Strings BackupCoordinationDistributed::listFiles(const String & prefix, const String & terminator) const
{
auto zookeeper = get_zookeeper();
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
@ -190,7 +190,7 @@ Strings DistributedBackupCoordination::listFiles(const String & prefix, const St
return elements;
}
std::optional<FileInfo> DistributedBackupCoordination::getFileInfo(const String & file_name)
std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const String & file_name) const
{
auto zookeeper = get_zookeeper();
String size_and_checksum;
@ -204,7 +204,7 @@ std::optional<FileInfo> DistributedBackupCoordination::getFileInfo(const String
return file_info;
}
std::optional<FileInfo> DistributedBackupCoordination::getFileInfo(const SizeAndChecksum & size_and_checksum)
std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const SizeAndChecksum & size_and_checksum) const
{
auto zookeeper = get_zookeeper();
String file_info_str;
@ -213,7 +213,7 @@ std::optional<FileInfo> DistributedBackupCoordination::getFileInfo(const SizeAnd
return deserializeFileInfo(file_info_str);
}
std::optional<SizeAndChecksum> DistributedBackupCoordination::getFileSizeAndChecksum(const String & file_name)
std::optional<SizeAndChecksum> BackupCoordinationDistributed::getFileSizeAndChecksum(const String & file_name) const
{
auto zookeeper = get_zookeeper();
String size_and_checksum;
@ -222,7 +222,7 @@ std::optional<SizeAndChecksum> DistributedBackupCoordination::getFileSizeAndChec
return deserializeSizeAndChecksum(size_and_checksum);
}
String DistributedBackupCoordination::getNextArchiveSuffix()
String BackupCoordinationDistributed::getNextArchiveSuffix()
{
auto zookeeper = get_zookeeper();
for (size_t attempt = 0; attempt != NUM_ATTEMPTS; ++attempt)
@ -245,13 +245,13 @@ String DistributedBackupCoordination::getNextArchiveSuffix()
__builtin_unreachable();
}
Strings DistributedBackupCoordination::getAllArchiveSuffixes()
Strings BackupCoordinationDistributed::getAllArchiveSuffixes() const
{
auto zookeeper = get_zookeeper();
return zookeeper->getChildren(zookeeper_path + "/archive_suffixes");
}
void DistributedBackupCoordination::drop()
void BackupCoordinationDistributed::drop()
{
removeAllNodes();
}

View File

@ -7,24 +7,24 @@
namespace DB
{
/// Stores backup contents information in Zookeeper, useful for distributed backups.
class DistributedBackupCoordination : public IBackupCoordination
/// Stores backup temporary information in Zookeeper, used to perform BACKUP ON CLUSTER.
class BackupCoordinationDistributed : public IBackupCoordination
{
public:
DistributedBackupCoordination(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
~DistributedBackupCoordination() override;
BackupCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
~BackupCoordinationDistributed() override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;
std::vector<FileInfo> getAllFileInfos() override;
Strings listFiles(const String & prefix, const String & terminator) override;
std::optional<FileInfo> getFileInfo(const String & file_name) override;
std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) override;
std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) override;
std::vector<FileInfo> getAllFileInfos() const override;
Strings listFiles(const String & prefix, const String & terminator) const override;
std::optional<FileInfo> getFileInfo(const String & file_name) const override;
std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) const override;
std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) const override;
String getNextArchiveSuffix() override;
Strings getAllArchiveSuffixes() override;
Strings getAllArchiveSuffixes() const override;
void drop() override;
@ -32,8 +32,8 @@ private:
void createRootNodes();
void removeAllNodes();
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;
const String zookeeper_path;
const zkutil::GetZooKeeper get_zookeeper;
};
}

View File

@ -1,4 +1,4 @@
#include <Backups/LocalBackupCoordination.h>
#include <Backups/BackupCoordinationLocal.h>
#include <fmt/format.h>
@ -7,10 +7,10 @@ namespace DB
using SizeAndChecksum = IBackupCoordination::SizeAndChecksum;
using FileInfo = IBackupCoordination::FileInfo;
LocalBackupCoordination::LocalBackupCoordination() = default;
LocalBackupCoordination::~LocalBackupCoordination() = default;
BackupCoordinationLocal::BackupCoordinationLocal() = default;
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
void LocalBackupCoordination::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
void BackupCoordinationLocal::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
{
std::lock_guard lock{mutex};
file_names.emplace(file_info.file_name, std::pair{file_info.size, file_info.checksum});
@ -23,7 +23,7 @@ void LocalBackupCoordination::addFileInfo(const FileInfo & file_info, bool & is_
is_data_file_required = inserted_file_info && (file_info.size > file_info.base_size);
}
void LocalBackupCoordination::updateFileInfo(const FileInfo & file_info)
void BackupCoordinationLocal::updateFileInfo(const FileInfo & file_info)
{
if (!file_info.size)
return; /// we don't keep FileInfos for empty files, nothing to update
@ -33,7 +33,7 @@ void LocalBackupCoordination::updateFileInfo(const FileInfo & file_info)
dest.archive_suffix = file_info.archive_suffix;
}
std::vector<FileInfo> LocalBackupCoordination::getAllFileInfos()
std::vector<FileInfo> BackupCoordinationLocal::getAllFileInfos() const
{
std::lock_guard lock{mutex};
std::vector<FileInfo> res;
@ -49,7 +49,7 @@ std::vector<FileInfo> LocalBackupCoordination::getAllFileInfos()
return res;
}
Strings LocalBackupCoordination::listFiles(const String & prefix, const String & terminator)
Strings BackupCoordinationLocal::listFiles(const String & prefix, const String & terminator) const
{
std::lock_guard lock{mutex};
Strings elements;
@ -70,7 +70,7 @@ Strings LocalBackupCoordination::listFiles(const String & prefix, const String &
return elements;
}
std::optional<FileInfo> LocalBackupCoordination::getFileInfo(const String & file_name)
std::optional<FileInfo> BackupCoordinationLocal::getFileInfo(const String & file_name) const
{
std::lock_guard lock{mutex};
auto it = file_names.find(file_name);
@ -85,7 +85,7 @@ std::optional<FileInfo> LocalBackupCoordination::getFileInfo(const String & file
return info;
}
std::optional<FileInfo> LocalBackupCoordination::getFileInfo(const SizeAndChecksum & size_and_checksum)
std::optional<FileInfo> BackupCoordinationLocal::getFileInfo(const SizeAndChecksum & size_and_checksum) const
{
std::lock_guard lock{mutex};
auto it = file_infos.find(size_and_checksum);
@ -94,7 +94,7 @@ std::optional<FileInfo> LocalBackupCoordination::getFileInfo(const SizeAndChecks
return it->second;
}
std::optional<SizeAndChecksum> LocalBackupCoordination::getFileSizeAndChecksum(const String & file_name)
std::optional<SizeAndChecksum> BackupCoordinationLocal::getFileSizeAndChecksum(const String & file_name) const
{
std::lock_guard lock{mutex};
auto it = file_names.find(file_name);
@ -103,7 +103,7 @@ std::optional<SizeAndChecksum> LocalBackupCoordination::getFileSizeAndChecksum(c
return it->second;
}
String LocalBackupCoordination::getNextArchiveSuffix()
String BackupCoordinationLocal::getNextArchiveSuffix()
{
std::lock_guard lock{mutex};
String new_archive_suffix = fmt::format("{:03}", ++current_archive_suffix); /// Outputs 001, 002, 003, ...
@ -111,7 +111,7 @@ String LocalBackupCoordination::getNextArchiveSuffix()
return new_archive_suffix;
}
Strings LocalBackupCoordination::getAllArchiveSuffixes()
Strings BackupCoordinationLocal::getAllArchiveSuffixes() const
{
std::lock_guard lock{mutex};
return archive_suffixes;

View File

@ -6,27 +6,27 @@ namespace DB
{
/// Stores backup contents information in memory.
class LocalBackupCoordination : public IBackupCoordination
class BackupCoordinationLocal : public IBackupCoordination
{
public:
LocalBackupCoordination();
~LocalBackupCoordination() override;
BackupCoordinationLocal();
~BackupCoordinationLocal() override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;
std::vector<FileInfo> getAllFileInfos() override;
Strings listFiles(const String & prefix, const String & terminator) override;
std::vector<FileInfo> getAllFileInfos() const override;
Strings listFiles(const String & prefix, const String & terminator) const override;
std::optional<FileInfo> getFileInfo(const String & file_name) override;
std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) override;
std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) override;
std::optional<FileInfo> getFileInfo(const String & file_name) const override;
std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) const override;
std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) const override;
String getNextArchiveSuffix() override;
Strings getAllArchiveSuffixes() override;
Strings getAllArchiveSuffixes() const override;
private:
std::mutex mutex;
mutable std::mutex mutex;
std::map<String /* file_name */, SizeAndChecksum> file_names; /// Should be ordered alphabetically, see listFiles(). For empty files we assume checksum = 0.
std::map<SizeAndChecksum, FileInfo> file_infos; /// Information about files. Without empty files.
Strings archive_suffixes;

View File

@ -24,12 +24,15 @@ public:
struct CreateParams
{
OpenMode open_mode = OpenMode::WRITE;
std::optional<UUID> backup_uuid;
BackupInfo backup_info;
std::optional<BackupInfo> base_backup_info;
String compression_method;
int compression_level = -1;
String password;
ContextPtr context;
bool is_internal_backup = false;
String coordination_zk_path;
};
static BackupFactory & instance();

View File

@ -1,7 +1,9 @@
#include <Backups/BackupIO_Disk.h>
#include <Common/Exception.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <base/logger_useful.h>
namespace fs = std::filesystem;
@ -49,10 +51,17 @@ std::unique_ptr<WriteBuffer> BackupWriterDisk::writeFile(const String & file_nam
void BackupWriterDisk::removeFilesAfterFailure(const Strings & file_names)
{
for (const auto & file_name : file_names)
disk->removeFile(path / file_name);
if (disk->isDirectory(path) && disk->isDirectoryEmpty(path))
disk->removeDirectory(path);
try
{
for (const auto & file_name : file_names)
disk->removeFileIfExists(path / file_name);
if (disk->isDirectory(path) && disk->isDirectoryEmpty(path))
disk->removeDirectory(path);
}
catch (...)
{
LOG_WARNING(&Poco::Logger::get("BackupWriterDisk"), "RemoveFilesAfterFailure: {}", getCurrentExceptionMessage(false));
}
}
}

View File

@ -1,6 +1,8 @@
#include <Backups/BackupIO_File.h>
#include <Common/Exception.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <base/logger_useful.h>
namespace fs = std::filesystem;
@ -48,10 +50,17 @@ std::unique_ptr<WriteBuffer> BackupWriterFile::writeFile(const String & file_nam
void BackupWriterFile::removeFilesAfterFailure(const Strings & file_names)
{
for (const auto & file_name : file_names)
fs::remove(path / file_name);
if (fs::is_directory(path) && fs::is_empty(path))
fs::remove(path);
try
{
for (const auto & file_name : file_names)
fs::remove(path / file_name);
if (fs::is_directory(path) && fs::is_empty(path))
fs::remove(path);
}
catch (...)
{
LOG_WARNING(&Poco::Logger::get("BackupWriterFile"), "RemoveFilesAfterFailure: {}", getCurrentExceptionMessage(false));
}
}
}

View File

@ -3,10 +3,12 @@
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/BackupIO.h>
#include <Backups/IBackupEntry.h>
#include <Backups/LocalBackupCoordination.h>
#include <Backups/BackupCoordinationLocal.h>
#include <Backups/BackupCoordinationDistributed.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/IArchiveWriter.h>
#include <IO/Archives/createArchiveReader.h>
@ -114,20 +116,18 @@ BackupImpl::BackupImpl(
const String & backup_name_,
const ArchiveParams & archive_params_,
const std::optional<BackupInfo> & base_backup_info_,
std::shared_ptr<IBackupWriter> writer_,
std::shared_ptr<IBackupCoordination> coordination_,
bool is_helper_backup_,
std::shared_ptr<IBackupReader> reader_,
const ContextPtr & context_)
: backup_name(backup_name_)
, archive_params(archive_params_)
, use_archives(!archive_params.archive_name.empty())
, base_backup_info_initial(base_backup_info_)
, open_mode(OpenMode::WRITE)
, writer(std::move(writer_))
, coordination(coordination_ ? coordination_ : std::make_shared<LocalBackupCoordination>())
, is_helper_backup(is_helper_backup_)
, open_mode(OpenMode::READ)
, reader(std::move(reader_))
, is_internal_backup(false)
, coordination(std::make_shared<BackupCoordinationLocal>())
, context(context_)
, version(CURRENT_BACKUP_VERSION)
, version(INITIAL_BACKUP_VERSION)
, base_backup_info(base_backup_info_)
{
open();
}
@ -137,19 +137,27 @@ BackupImpl::BackupImpl(
const String & backup_name_,
const ArchiveParams & archive_params_,
const std::optional<BackupInfo> & base_backup_info_,
std::shared_ptr<IBackupReader> reader_,
const ContextPtr & context_)
std::shared_ptr<IBackupWriter> writer_,
const ContextPtr & context_,
const std::optional<UUID> & backup_uuid_,
bool is_internal_backup_,
const String & coordination_zk_path_)
: backup_name(backup_name_)
, archive_params(archive_params_)
, use_archives(!archive_params.archive_name.empty())
, base_backup_info_initial(base_backup_info_)
, open_mode(OpenMode::READ)
, reader(std::move(reader_))
, coordination(std::make_shared<LocalBackupCoordination>())
, is_helper_backup(false)
, open_mode(OpenMode::WRITE)
, writer(std::move(writer_))
, is_internal_backup(is_internal_backup_)
, context(context_)
, version(INITIAL_BACKUP_VERSION)
, uuid(backup_uuid_)
, version(CURRENT_BACKUP_VERSION)
, base_backup_info(base_backup_info_)
{
if (coordination_zk_path_.empty())
coordination = std::make_shared<BackupCoordinationLocal>();
else
coordination = std::make_shared<BackupCoordinationDistributed>(coordination_zk_path_, [&] { return context->getZooKeeper(); });
open();
}
@ -185,14 +193,16 @@ void BackupImpl::open()
if (open_mode == OpenMode::WRITE)
{
timestamp = std::time(nullptr);
uuid = UUIDHelpers::generateV4();
if (!uuid)
uuid = UUIDHelpers::generateV4();
writing_finalized = false;
}
base_backup_info = base_backup_info_initial;
if (open_mode == OpenMode::READ)
readBackupMetadata();
assert(uuid); /// Backup's UUID must be loaded or generated at this point.
if (base_backup_info)
{
BackupFactory::CreateParams params;
@ -213,17 +223,17 @@ void BackupImpl::close()
{
std::lock_guard lock{mutex};
if (!is_helper_backup && writing_finalized)
if (!is_internal_backup && writing_finalized)
writeBackupMetadata();
archive_readers.clear();
archive_writer_with_empty_suffix.reset();
current_archive_writer.reset();
for (auto & archive_writer : archive_writers)
archive_writer = {"", nullptr};
if (!is_helper_backup && writer && !writing_finalized)
if (!is_internal_backup && writer && !writing_finalized)
removeAllFilesAfterFailure();
if (!is_helper_backup)
if (!is_internal_backup)
coordination->drop();
}
@ -238,7 +248,7 @@ void BackupImpl::writeBackupMetadata()
Poco::AutoPtr<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration()};
config->setUInt("version", CURRENT_BACKUP_VERSION);
config->setString("timestamp", toString(LocalDateTime{timestamp}));
config->setString("uuid", toString(uuid));
config->setString("uuid", toString(*uuid));
if (base_backup_info)
{
@ -595,6 +605,7 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
bool is_data_file_required;
info.data_file_name = info.file_name;
info.archive_suffix = current_archive_suffix;
coordination->addFileInfo(info, is_data_file_required);
if (!is_data_file_required)
return; /// We copy data only if it's a new combination of size & checksum.
@ -617,19 +628,19 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
{
String archive_suffix = current_archive_suffix;
bool next_suffix = false;
if (info.archive_suffix.empty() && is_helper_backup)
if (current_archive_suffix.empty() && is_internal_backup)
next_suffix = true;
/*if (archive_params.max_volume_size && current_archive_writer
&& (current_archive_writer->getTotalSize() + size - base_size > archive_params.max_volume_size))
next_suffix = true;*/
if (next_suffix)
archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != archive_suffix)
current_archive_suffix = coordination->getNextArchiveSuffix();
if (info.archive_suffix != current_archive_suffix)
{
info.archive_suffix = archive_suffix;
info.archive_suffix = current_archive_suffix;
coordination->updateFileInfo(info);
}
out = getArchiveWriter(info.archive_suffix)->writeFile(info.data_file_name);
out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
}
else
{
@ -672,19 +683,19 @@ std::shared_ptr<IArchiveReader> BackupImpl::getArchiveReader(const String & suff
std::shared_ptr<IArchiveWriter> BackupImpl::getArchiveWriter(const String & suffix)
{
if (suffix.empty() && archive_writer_with_empty_suffix)
return archive_writer_with_empty_suffix;
if ((current_archive_suffix == suffix) && current_archive_writer)
return current_archive_writer;
for (const auto & archive_writer : archive_writers)
{
if ((suffix == archive_writer.first) && archive_writer.second)
return archive_writer.second;
}
String archive_name_with_suffix = getArchiveNameWithSuffix(suffix);
auto new_archive_writer = createArchiveWriter(archive_params.archive_name, writer->writeFile(archive_name_with_suffix));
new_archive_writer->setPassword(archive_params.password);
current_archive_writer = new_archive_writer;
current_archive_suffix = suffix;
if (suffix.empty())
archive_writer_with_empty_suffix = new_archive_writer;
size_t pos = suffix.empty() ? 0 : 1;
archive_writers[pos] = {suffix, new_archive_writer};
return new_archive_writer;
}

View File

@ -38,24 +38,25 @@ public:
const String & backup_name_,
const ArchiveParams & archive_params_,
const std::optional<BackupInfo> & base_backup_info_,
std::shared_ptr<IBackupWriter> writer_,
std::shared_ptr<IBackupCoordination> coordination_,
bool is_helper_backup_,
std::shared_ptr<IBackupReader> reader_,
const ContextPtr & context_);
BackupImpl(
const String & backup_name_,
const ArchiveParams & archive_params_,
const std::optional<BackupInfo> & base_backup_info_,
std::shared_ptr<IBackupReader> reader_,
const ContextPtr & context_);
std::shared_ptr<IBackupWriter> writer_,
const ContextPtr & context_,
const std::optional<UUID> & backup_uuid_ = {},
bool is_internal_backup_ = false,
const String & coordination_zk_path_ = {});
~BackupImpl() override;
const String & getName() const override { return backup_name; }
OpenMode getOpenMode() const override { return open_mode; }
time_t getTimestamp() const override;
UUID getUUID() const override { return uuid; }
UUID getUUID() const override { return *uuid; }
Strings listFiles(const String & prefix, const String & terminator) const override;
bool fileExists(const String & file_name) const override;
bool fileExists(const SizeAndChecksum & size_and_checksum) const override;
@ -84,24 +85,22 @@ private:
const String backup_name;
const ArchiveParams archive_params;
const bool use_archives;
const std::optional<BackupInfo> base_backup_info_initial;
const OpenMode open_mode;
std::shared_ptr<IBackupWriter> writer;
std::shared_ptr<IBackupReader> reader;
const bool is_internal_backup;
std::shared_ptr<IBackupCoordination> coordination;
const bool is_helper_backup;
ContextPtr context;
mutable std::mutex mutex;
UUID uuid = {};
std::optional<UUID> uuid;
time_t timestamp = 0;
UInt64 version;
std::optional<BackupInfo> base_backup_info;
std::shared_ptr<const IBackup> base_backup;
std::optional<UUID> base_backup_uuid;
mutable std::unordered_map<String /* archive_suffix */, std::shared_ptr<IArchiveReader>> archive_readers;
std::shared_ptr<IArchiveWriter> archive_writer_with_empty_suffix;
std::shared_ptr<IArchiveWriter> current_archive_writer;
std::pair<String, std::shared_ptr<IArchiveWriter>> archive_writers[2];
String current_archive_suffix;
bool writing_finalized = false;
};

View File

@ -16,6 +16,21 @@ namespace ErrorCodes
}
String BackupInfo::toString() const
{
ASTPtr ast = toAST();
return serializeAST(*ast);
}
BackupInfo BackupInfo::fromString(const String & str)
{
ParserIdentifierWithOptionalParameters parser;
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
return fromAST(*ast);
}
ASTPtr BackupInfo::toAST() const
{
auto func = std::make_shared<ASTFunction>();
func->name = backup_engine_name;
@ -32,15 +47,7 @@ String BackupInfo::toString() const
for (const auto & arg : args)
list->children.push_back(std::make_shared<ASTLiteral>(arg));
return serializeAST(*func);
}
BackupInfo BackupInfo::fromString(const String & str)
{
ParserIdentifierWithOptionalParameters parser;
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
return fromAST(*ast);
return func;
}

View File

@ -6,6 +6,7 @@
namespace DB
{
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
/// Information about a backup.
struct BackupInfo
@ -16,6 +17,8 @@ struct BackupInfo
String toString() const;
static BackupInfo fromString(const String & str);
ASTPtr toAST() const;
static BackupInfo fromAST(const IAST & ast);
};

View File

@ -1,45 +0,0 @@
#include <Backups/BackupSettings.h>
#include <Backups/BackupInfo.h>
#include <Core/SettingsFields.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
BackupSettings BackupSettings::fromBackupQuery(const ASTBackupQuery & query)
{
BackupSettings res;
if (query.base_backup_name)
res.base_backup_info = BackupInfo::fromAST(*query.base_backup_name);
if (query.settings)
{
const auto & settings = query.settings->as<const ASTSetQuery &>().changes;
for (const auto & setting : settings)
{
if (setting.name == "compression_method")
res.compression_method = SettingFieldString{setting.value};
else if (setting.name == "compression_level")
res.compression_level = SettingFieldInt64{setting.value};
else if (setting.name == "password")
res.password = SettingFieldString{setting.value};
else if (setting.name == "structure_only")
res.structure_only = SettingFieldBool{setting.value};
else if (setting.name == "async")
res.async = SettingFieldBool{setting.value};
else
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", setting.name);
}
}
return res;
}
}

View File

@ -1,33 +0,0 @@
#pragma once
#include <Backups/BackupInfo.h>
#include <optional>
namespace DB
{
class ASTBackupQuery;
/// Settings specified in the "SETTINGS" clause of a BACKUP query.
struct BackupSettings
{
/// Base backup, if it's set an incremental backup will be built.
std::optional<BackupInfo> base_backup_info;
/// Compression method and level for writing the backup (when applicable).
String compression_method; /// "" means default method
int compression_level = -1; /// -1 means default level
/// Password used to encrypt the backup.
String password;
/// If this is set to true then only create queries will be written to backup,
/// without the data of tables.
bool structure_only = false;
bool async = false;
static BackupSettings fromBackupQuery(const ASTBackupQuery & query);
};
}

View File

@ -1,15 +1,17 @@
#include <Backups/BackupUtils.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/BackupSettings.h>
#include <Backups/Common/BackupSettings.h>
#include <Backups/DDLCompareUtils.h>
#include <Backups/DDLRenamingVisitor.h>
#include <Backups/IBackup.h>
#include <Backups/formatTableNameOrTemporaryTableName.h>
#include <Backups/replaceTableUUIDWithMacroInReplicatedTableDef.h>
#include <Common/escapeForFileName.h>
#include <Access/Common/AccessFlags.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>
@ -26,6 +28,67 @@ namespace ErrorCodes
namespace
{
/// Helper to calculate paths inside a backup.
class PathsInBackup
{
public:
/// Returns the path to metadata in backup.
static String getMetadataPath(const DatabaseAndTableName & table_name, size_t shard_index, size_t replica_index)
{
if (table_name.first.empty() || table_name.second.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty");
return getPathForShardAndReplica(shard_index, replica_index) + String{"metadata/"} + escapeForFileName(table_name.first) + "/"
+ escapeForFileName(table_name.second) + ".sql";
}
static String getMetadataPath(const String & database_name, size_t shard_index, size_t replica_index)
{
if (database_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name must not be empty");
return getPathForShardAndReplica(shard_index, replica_index) + String{"metadata/"} + escapeForFileName(database_name) + ".sql";
}
static String getMetadataPath(const IAST & create_query, size_t shard_index, size_t replica_index)
{
const auto & create = create_query.as<const ASTCreateQuery &>();
if (!create.table)
return getMetadataPath(create.getDatabase(), shard_index, replica_index);
if (create.temporary)
return getMetadataPath({DatabaseCatalog::TEMPORARY_DATABASE, create.getTable()}, shard_index, replica_index);
return getMetadataPath({create.getDatabase(), create.getTable()}, shard_index, replica_index);
}
/// Returns the path to table's data in backup.
static String getDataPath(const DatabaseAndTableName & table_name, size_t shard_index, size_t replica_index)
{
if (table_name.first.empty() || table_name.second.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty");
assert(!table_name.first.empty() && !table_name.second.empty());
return getPathForShardAndReplica(shard_index, replica_index) + String{"data/"} + escapeForFileName(table_name.first) + "/"
+ escapeForFileName(table_name.second) + "/";
}
static String getDataPath(const IAST & create_query, size_t shard_index, size_t replica_index)
{
const auto & create = create_query.as<const ASTCreateQuery &>();
if (!create.table)
return {};
if (create.temporary)
return getDataPath({DatabaseCatalog::TEMPORARY_DATABASE, create.getTable()}, shard_index, replica_index);
return getDataPath({create.getDatabase(), create.getTable()}, shard_index, replica_index);
}
private:
static String getPathForShardAndReplica(size_t shard_index, size_t replica_index)
{
if (shard_index || replica_index)
return fmt::format("shard{}/replica{}/", shard_index, replica_index);
else
return "";
}
};
using Kind = ASTBackupQuery::Kind;
using Element = ASTBackupQuery::Element;
using Elements = ASTBackupQuery::Elements;
@ -92,7 +155,7 @@ namespace
auto data_backup = info.storage->backupData(context, info.partitions);
if (!data_backup.empty())
{
String data_path = getDataPathInBackup(*info.create_query);
String data_path = PathsInBackup::getDataPath(*info.create_query, backup_settings.shard, backup_settings.replica);
for (auto & [path_in_backup, backup_entry] : data_backup)
res.emplace_back(data_path + path_in_backup, std::move(backup_entry));
}
@ -209,6 +272,7 @@ namespace
ASTPtr query = ast;
::DB::renameInCreateQuery(query, context, renaming_settings);
auto create_query = typeid_cast<std::shared_ptr<ASTCreateQuery>>(query);
replaceTableUUIDWithMacroInReplicatedTableDef(*create_query, create_query->uuid);
create_query->uuid = UUIDHelpers::Nil;
create_query->to_inner_uuid = UUIDHelpers::Nil;
return create_query;
@ -219,10 +283,10 @@ namespace
return (database_name == DatabaseCatalog::SYSTEM_DATABASE) || (database_name == DatabaseCatalog::TEMPORARY_DATABASE);
}
static std::pair<String, BackupEntryPtr> makeBackupEntryForMetadata(const IAST & create_query)
std::pair<String, BackupEntryPtr> makeBackupEntryForMetadata(const IAST & create_query) const
{
auto metadata_entry = std::make_unique<BackupEntryFromMemory>(serializeAST(create_query));
String metadata_path = getMetadataPathInBackup(create_query);
String metadata_path = PathsInBackup::getMetadataPath(create_query, backup_settings.shard, backup_settings.replica);
return {metadata_path, std::move(metadata_entry)};
}
@ -319,47 +383,4 @@ void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries
backup->finalizeWriting();
}
String getDataPathInBackup(const DatabaseAndTableName & table_name)
{
if (table_name.first.empty() || table_name.second.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty");
assert(!table_name.first.empty() && !table_name.second.empty());
return String{"data/"} + escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second) + "/";
}
String getDataPathInBackup(const IAST & create_query)
{
const auto & create = create_query.as<const ASTCreateQuery &>();
if (!create.table)
return {};
if (create.temporary)
return getDataPathInBackup({DatabaseCatalog::TEMPORARY_DATABASE, create.getTable()});
return getDataPathInBackup({create.getDatabase(), create.getTable()});
}
String getMetadataPathInBackup(const DatabaseAndTableName & table_name)
{
if (table_name.first.empty() || table_name.second.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty");
return String{"metadata/"} + escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second) + ".sql";
}
String getMetadataPathInBackup(const String & database_name)
{
if (database_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name must not be empty");
return String{"metadata/"} + escapeForFileName(database_name) + ".sql";
}
String getMetadataPathInBackup(const IAST & create_query)
{
const auto & create = create_query.as<const ASTCreateQuery &>();
if (!create.table)
return getMetadataPathInBackup(create.getDatabase());
if (create.temporary)
return getMetadataPathInBackup({DatabaseCatalog::TEMPORARY_DATABASE, create.getTable()});
return getMetadataPathInBackup({create.getDatabase(), create.getTable()});
}
}

View File

@ -6,6 +6,7 @@
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class IBackupEntry;
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
@ -20,13 +21,4 @@ BackupEntries makeBackupEntries(const ContextPtr & context, const ASTBackupQuery
/// Write backup entries to an opened backup.
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads);
/// Returns the path to metadata in backup.
String getMetadataPathInBackup(const DatabaseAndTableName & table_name);
String getMetadataPathInBackup(const String & database_name);
String getMetadataPathInBackup(const IAST & create_query);
/// Returns the path to table's data in backup.
String getDataPathInBackup(const DatabaseAndTableName & table_name);
String getDataPathInBackup(const IAST & create_query);
}

View File

@ -0,0 +1,72 @@
#include <Backups/BackupInfo.h>
#include <Backups/Common/BackupSettings.h>
#include <Core/SettingsFields.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
/// List of backup settings except base_backup_name.
#define LIST_OF_BACKUP_SETTINGS(M) \
M(String, compression_method) \
M(Int64, compression_level) \
M(String, password) \
M(Bool, structure_only) \
M(Bool, async) \
M(UInt64, shard) \
M(UInt64, replica) \
M(Bool, allow_storing_multiple_replicas) \
M(Bool, internal) \
M(String, coordination_zk_path)
BackupSettings BackupSettings::fromBackupQuery(const ASTBackupQuery & query)
{
BackupSettings res;
if (query.base_backup_name)
res.base_backup_info = BackupInfo::fromAST(*query.base_backup_name);
if (query.settings)
{
const auto & settings = query.settings->as<const ASTSetQuery &>().changes;
for (const auto & setting : settings)
{
#define GET_SETTINGS_FROM_BACKUP_QUERY_HELPER(TYPE, NAME) \
if (setting.name == #NAME) \
res.NAME = SettingField##TYPE{setting.value}; \
else
LIST_OF_BACKUP_SETTINGS(GET_SETTINGS_FROM_BACKUP_QUERY_HELPER)
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", setting.name);
}
}
return res;
}
void BackupSettings::copySettingsToBackupQuery(ASTBackupQuery & query) const
{
query.base_backup_name = base_backup_info ? base_backup_info->toAST() : nullptr;
auto query_settings = std::make_shared<ASTSetQuery>();
query_settings->is_standalone = false;
static const BackupSettings default_settings;
#define SET_SETTINGS_IN_BACKUP_QUERY_HELPER(TYPE, NAME) \
if (NAME != default_settings.NAME) \
query_settings->changes.emplace_back(#NAME, static_cast<Field>(SettingField##TYPE{NAME}));
LIST_OF_BACKUP_SETTINGS(SET_SETTINGS_IN_BACKUP_QUERY_HELPER)
query.settings = query_settings;
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <Backups/BackupInfo.h>
#include <optional>
namespace DB
{
class ASTBackupQuery;
/// Settings specified in the "SETTINGS" clause of a BACKUP query.
struct BackupSettings
{
/// Base backup, if it's set an incremental backup will be built.
std::optional<BackupInfo> base_backup_info;
/// Compression method and level for writing the backup (when applicable).
String compression_method; /// "" means default method
int compression_level = -1; /// -1 means default level
/// Password used to encrypt the backup.
String password;
/// If this is set to true then only create queries will be written to backup,
/// without the data of tables.
bool structure_only = false;
/// Whether BACKUP command must return immediately without waiting until the backup is completed.
bool async = false;
/// 1-based shard index to store in the backup. 0 means all shards.
/// Can only be used with BACKUP ON CLUSTER.
size_t shard = 0;
/// 1-based replica index to store in the backup. 0 means all replicas (see also allow_storing_multiple_replicas).
/// Can only be used with BACKUP ON CLUSTER.
size_t replica = 0;
/// Allows storing in the backup of multiple replicas.
bool allow_storing_multiple_replicas = false;
/// Internal, should not be specified by user.
/// Whether this backup is a part of a distributed backup created by BACKUP ON CLUSTER.
bool internal = false;
/// Internal, should not be specified by user.
/// Path in Zookeeper used to coordinate a distributed backup created by BACKUP ON CLUSTER.
String coordination_zk_path;
static BackupSettings fromBackupQuery(const ASTBackupQuery & query);
void copySettingsToBackupQuery(ASTBackupQuery & query) const;
};
}

View File

@ -0,0 +1,5 @@
include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_common_backups .)
add_library(clickhouse_common_backups ${clickhouse_common_backups_headers} ${clickhouse_common_backups_sources})
target_link_libraries(clickhouse_common_backups PUBLIC clickhouse_common_io)

View File

@ -0,0 +1,122 @@
#include <Backups/BackupInfo.h>
#include <Backups/Common/RestoreSettings.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Core/SettingsFields.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int CANNOT_PARSE_RESTORE_TABLE_CREATION_MODE;
}
namespace
{
struct SettingFieldRestoreTableCreationMode
{
RestoreTableCreationMode value;
explicit SettingFieldRestoreTableCreationMode(const Field & field)
{
if (field.getType() == Field::Types::String)
{
String str = field.get<String>();
if (str == "1" || boost::iequals(str, "true"))
value = RestoreTableCreationMode::kCreate;
else if (str == "0" || boost::iequals(str, "false"))
value = RestoreTableCreationMode::kMustExist;
else if (boost::iequals(str, "if not exists"))
value = RestoreTableCreationMode::kCreateIfNotExists;
else throw Exception("Cannot parse creation mode from string '" + str + "'",
ErrorCodes::CANNOT_PARSE_RESTORE_TABLE_CREATION_MODE);
}
else
{
if (applyVisitor(FieldVisitorConvertToNumber<bool>(), field))
value = RestoreTableCreationMode::kCreate;
else
value = RestoreTableCreationMode::kMustExist;
}
}
explicit operator Field() const
{
switch (value)
{
case RestoreTableCreationMode::kCreate: return Field{true};
case RestoreTableCreationMode::kMustExist: return Field{false};
case RestoreTableCreationMode::kCreateIfNotExists: return Field{"if not exists"};
}
}
operator RestoreTableCreationMode() const { return value; }
};
using SettingFieldRestoreDatabaseCreationMode = SettingFieldRestoreTableCreationMode;
}
/// List of restore settings except base_backup_name.
#define LIST_OF_RESTORE_SETTINGS(M) \
M(String, password) \
M(Bool, structure_only) \
M(RestoreTableCreationMode, create_table) \
M(RestoreDatabaseCreationMode, create_database) \
M(Bool, allow_different_table_def) \
M(Bool, allow_different_database_def) \
M(Bool, async) \
M(UInt64, shard) \
M(UInt64, replica) \
M(UInt64, shard_in_backup) \
M(UInt64, replica_in_backup) \
M(Bool, internal) \
M(String, coordination_zk_path)
RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query)
{
RestoreSettings res;
if (query.base_backup_name)
res.base_backup_info = BackupInfo::fromAST(*query.base_backup_name);
if (query.settings)
{
const auto & settings = query.settings->as<const ASTSetQuery &>().changes;
for (const auto & setting : settings)
{
#define GET_SETTINGS_FROM_RESTORE_QUERY_HELPER(TYPE, NAME) \
if (setting.name == #NAME) \
res.NAME = SettingField##TYPE{setting.value}; \
else
LIST_OF_RESTORE_SETTINGS(GET_SETTINGS_FROM_RESTORE_QUERY_HELPER)
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", setting.name);
}
}
return res;
}
void RestoreSettings::copySettingsToRestoreQuery(ASTBackupQuery & query) const
{
query.base_backup_name = base_backup_info ? base_backup_info->toAST() : nullptr;
auto query_settings = std::make_shared<ASTSetQuery>();
query_settings->is_standalone = false;
static const RestoreSettings default_settings;
#define SET_SETTINGS_IN_RESTORE_QUERY_HELPER(TYPE, NAME) \
if (NAME != default_settings.NAME) \
query_settings->changes.emplace_back(#NAME, static_cast<Field>(SettingField##TYPE{NAME}));
LIST_OF_RESTORE_SETTINGS(SET_SETTINGS_IN_RESTORE_QUERY_HELPER)
query.settings = query_settings;
}
}

View File

@ -58,9 +58,36 @@ struct RestoreSettings : public StorageRestoreSettings
/// Set `allow_different_database_def` to true to skip this check.
bool allow_different_database_def = false;
/// Whether RESTORE command must return immediately without waiting until the backup is completed.
bool async = false;
/// 1-based shard index to restore from the backup. 0 means all shards.
/// Can only be used with RESTORE ON CLUSTER.
size_t shard = 0;
/// 1-based replica index to restore from the backup. 0 means all replicas.
/// Can only be used with RESTORE ON CLUSTER.
size_t replica = 0;
/// 1-based index of a shard stored in the backup to get data from.
/// By default it's 0: if the backup contains only one shard it means the index of that shard
/// else it means the same as `shard`.
size_t shard_in_backup = 0;
/// 1-based index of a replica stored in the backup to get data from.
/// By default it's 0: if the backup contains only one replica for the current shard it means the index of that replica
/// else it means the same as `replica`.
size_t replica_in_backup = 0;
/// Internal, should not be specified by user.
bool internal = false;
/// Internal, should not be specified by user.
/// Path in Zookeeper used to coordinate restoring process while executing by RESTORE ON CLUSTER.
String coordination_zk_path;
static RestoreSettings fromRestoreQuery(const ASTBackupQuery & query);
void copySettingsToRestoreQuery(ASTBackupQuery & query) const;
};
}

View File

@ -0,0 +1,59 @@
#include <Backups/Common/rewriteBackupQueryWithoutOnCluster.h>
#include <Backups/Common/BackupSettings.h>
#include <Backups/Common/RestoreSettings.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
namespace
{
void setDatabaseInElements(ASTBackupQuery::Elements & elements, const String & new_database)
{
for (auto & element : elements)
{
if (element.type == ASTBackupQuery::TABLE)
{
if (element.name.first.empty() && !element.name.second.empty() && !element.name_is_in_temp_db)
element.name.first = new_database;
if (element.new_name.first.empty() && !element.name.second.empty() && !element.name_is_in_temp_db)
element.new_name.first = new_database;
}
}
}
}
std::shared_ptr<ASTBackupQuery>
rewriteBackupQueryWithoutOnCluster(const ASTBackupQuery & backup_query, const WithoutOnClusterASTRewriteParams & params)
{
auto backup_settings = BackupSettings::fromBackupQuery(backup_query);
backup_settings.internal = true;
backup_settings.async = false;
backup_settings.shard = params.shard_index;
backup_settings.replica = params.replica_index;
auto new_query = std::static_pointer_cast<ASTBackupQuery>(backup_query.clone());
new_query->cluster.clear();
backup_settings.copySettingsToBackupQuery(*new_query);
setDatabaseInElements(new_query->elements, params.default_database);
return new_query;
}
std::shared_ptr<ASTBackupQuery>
rewriteRestoreQueryWithoutOnCluster(const ASTBackupQuery & restore_query, const WithoutOnClusterASTRewriteParams & params)
{
auto restore_settings = RestoreSettings::fromRestoreQuery(restore_query);
restore_settings.internal = true;
restore_settings.async = false;
restore_settings.shard = params.shard_index;
restore_settings.replica = params.replica_index;
auto new_query = std::static_pointer_cast<ASTBackupQuery>(restore_query.clone());
new_query->cluster.clear();
restore_settings.copySettingsToRestoreQuery(*new_query);
setDatabaseInElements(new_query->elements, params.default_database);
return new_query;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <memory>
namespace DB
{
class ASTBackupQuery;
struct WithoutOnClusterASTRewriteParams;
/// Rewrites elements of BACKUP-ON-CLUSTER query after receiving it on shards or replica.
std::shared_ptr<ASTBackupQuery>
rewriteBackupQueryWithoutOnCluster(const ASTBackupQuery & backup_query, const WithoutOnClusterASTRewriteParams & params);
/// Rewrites elements of RESTORE-ON-CLUSTER query after receiving it on shards or replica.
std::shared_ptr<ASTBackupQuery>
rewriteRestoreQueryWithoutOnCluster(const ASTBackupQuery & restore_query, const WithoutOnClusterASTRewriteParams & params);
}

View File

@ -31,7 +31,7 @@ public:
UInt64 pos_in_archive = static_cast<UInt64>(-1);
};
virtual ~IBackupCoordination() { }
virtual ~IBackupCoordination() = default;
/// Adds file information.
/// If specified checksum+size are new for this IBackupContentsInfo the function sets `is_data_file_required`.
@ -46,20 +46,20 @@ public:
/// Updates some fields (currently only `archive_suffix`) of a stored file's information.
virtual void updateFileInfo(const FileInfo & file_info) = 0;
virtual std::vector<FileInfo> getAllFileInfos() = 0;
virtual Strings listFiles(const String & prefix, const String & terminator) = 0;
virtual std::vector<FileInfo> getAllFileInfos() const = 0;
virtual Strings listFiles(const String & prefix, const String & terminator) const = 0;
using SizeAndChecksum = std::pair<UInt64, UInt128>;
virtual std::optional<FileInfo> getFileInfo(const String & file_name) = 0;
virtual std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) = 0;
virtual std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) = 0;
virtual std::optional<FileInfo> getFileInfo(const String & file_name) const = 0;
virtual std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) const = 0;
virtual std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) const = 0;
/// Generates a new archive suffix, e.g. "001", "002", "003", ...
virtual String getNextArchiveSuffix() = 0;
/// Returns the list of all the archive suffixes which were generated.
virtual Strings getAllArchiveSuffixes() = 0;
virtual Strings getAllArchiveSuffixes() const = 0;
/// Removes remotely stored information.
virtual void drop() {}

View File

@ -0,0 +1,39 @@
#pragma once
#include <base/types.h>
namespace DB
{
/// Keeps information about files contained in a backup.
class IRestoreCoordination
{
public:
virtual ~IRestoreCoordination() = default;
/// Sets or gets path in the backup for a specified path in ZooKeeper.
virtual void setOrGetPathInBackupForZkPath(const String & zk_path_, String & path_in_backup_) = 0;
/// Sets that this replica is going to restore a partition in a replicated table or a table in a replicated database.
/// This function should be called to prevent other replicas from doing that in parallel.
virtual bool acquireZkPathAndName(const String & zk_path_, const String & name_) = 0;
enum Result
{
SUCCEEDED,
FAILED,
};
/// Sets the result for an acquired path and name.
virtual void setResultForZkPathAndName(const String & zk_path_, const String & name_, Result res_) = 0;
/// Waits for the result set by another replica for another replica's acquired path and name.
/// Returns false if time is out.
virtual bool getResultForZkPathAndName(const String & zk_path_, const String & name_, Result & res_, std::chrono::milliseconds timeout_) const = 0;
/// Removes remotely stored information.
virtual void drop() {}
};
}

View File

@ -0,0 +1,148 @@
#include <Backups/RestoreCoordinationDistributed.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/escapeForFileName.h>
namespace DB
{
RestoreCoordinationDistributed::RestoreCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
: zookeeper_path(zookeeper_path_), get_zookeeper(get_zookeeper_)
{
createRootNodes();
}
RestoreCoordinationDistributed::~RestoreCoordinationDistributed() = default;
void RestoreCoordinationDistributed::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
zookeeper->createIfNotExists(zookeeper_path + "/paths_in_backup", "");
zookeeper->createIfNotExists(zookeeper_path + "/acquired", "");
}
void RestoreCoordinationDistributed::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
}
void RestoreCoordinationDistributed::setOrGetPathInBackupForZkPath(const String & zk_path_, String & path_in_backup_)
{
{
std::lock_guard lock{mutex};
auto it = paths_in_backup_by_zk_path.find(zk_path_);
if (it != paths_in_backup_by_zk_path.end())
{
path_in_backup_ = it->second;
return;
}
}
auto zookeeper = get_zookeeper();
String combined_path = zookeeper_path + "/paths_in_backup/" + escapeForFileName(zk_path_);
auto code = zookeeper->tryCreate(combined_path, path_in_backup_, zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, combined_path);
if (code == Coordination::Error::ZNODEEXISTS)
path_in_backup_ = zookeeper->get(combined_path);
{
std::lock_guard lock{mutex};
paths_in_backup_by_zk_path[zk_path_] = path_in_backup_;
}
}
bool RestoreCoordinationDistributed::acquireZkPathAndName(const String & zk_path_, const String & name_)
{
std::pair<String, String> key{zk_path_, name_};
{
std::lock_guard lock{mutex};
if (acquired.contains(key))
return true;
}
auto zookeeper = get_zookeeper();
String combined_path = zookeeper_path + "/acquired/" + escapeForFileName(zk_path_) + "|" + escapeForFileName(name_);
auto code = zookeeper->tryCreate(combined_path, "", zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, combined_path);
if (code == Coordination::Error::ZNODEEXISTS)
return false;
{
std::lock_guard lock{mutex};
acquired.emplace(key, std::nullopt);
return true;
}
}
void RestoreCoordinationDistributed::setResultForZkPathAndName(const String & zk_path_, const String & name_, Result res_)
{
auto zookeeper = get_zookeeper();
String combined_path = zookeeper_path + "/acquired/" + escapeForFileName(zk_path_) + "|" + escapeForFileName(name_);
zookeeper->set(combined_path, (res_ == Result::SUCCEEDED) ? "1" : "0");
{
std::lock_guard lock{mutex};
acquired[std::pair{zk_path_, name_}] = res_;
}
}
bool RestoreCoordinationDistributed::getResultForZkPathAndName(const String & zk_path_, const String & name_, Result & res_, std::chrono::milliseconds timeout_) const
{
{
std::lock_guard lock{mutex};
auto value = acquired[std::pair{zk_path_, name_}];
if (value)
{
res_ = *value;
return true;
}
}
auto zookeeper = get_zookeeper();
String combined_path = zookeeper_path + "/acquired/" + escapeForFileName(zk_path_) + "|" + escapeForFileName(name_);
std::atomic<bool> changed = false;
std::condition_variable changed_condvar;
const auto watch = [&changed, &changed_condvar, zk_path_, name_](const Coordination::WatchResponse &)
{
changed = true;
changed_condvar.notify_one();
};
String res_str = zookeeper->getWatch(combined_path, nullptr, watch);
if (res_str.empty())
{
std::mutex dummy_mutex;
std::unique_lock lock{dummy_mutex};
changed_condvar.wait_for(lock, timeout_, [&changed] { return changed.load(); });
res_str = zookeeper->get(combined_path);
}
if (res_str.empty())
return false;
res_ = (res_str == "1") ? Result::SUCCEEDED : Result::FAILED;
{
std::lock_guard lock{mutex};
acquired[std::pair{zk_path_, name_}] = res_;
}
return true;
}
void RestoreCoordinationDistributed::drop()
{
removeAllNodes();
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Backups/IRestoreCoordination.h>
#include <Common/ZooKeeper/Common.h>
#include <map>
namespace DB
{
/// Stores restore temporary information in Zookeeper, used to perform RESTORE ON CLUSTER.
class RestoreCoordinationDistributed : public IRestoreCoordination
{
public:
RestoreCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
~RestoreCoordinationDistributed() override;
void setOrGetPathInBackupForZkPath(const String & zk_path_, String & path_in_backup_) override;
bool acquireZkPathAndName(const String & zk_path_, const String & name_) override;
void setResultForZkPathAndName(const String & zk_path_, const String & name_, Result res_) override;
bool getResultForZkPathAndName(const String & zk_path_, const String & name_, Result & res_, std::chrono::milliseconds timeout_) const override;
void drop() override;
private:
void createRootNodes();
void removeAllNodes();
const String zookeeper_path;
const zkutil::GetZooKeeper get_zookeeper;
mutable std::mutex mutex;
mutable std::map<std::pair<String, String>, std::optional<Result>> acquired;
std::unordered_map<String, String> paths_in_backup_by_zk_path;
};
}

View File

@ -0,0 +1,73 @@
#include <Backups/RestoreCoordinationLocal.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setOrGetPathInBackupForZkPath(const String & zk_path_, String & path_in_backup_)
{
std::lock_guard lock{mutex};
auto [it, inserted] = paths_in_backup_by_zk_path.try_emplace(zk_path_, path_in_backup_);
if (!inserted)
path_in_backup_ = it->second;
}
bool RestoreCoordinationLocal::acquireZkPathAndName(const String & path_, const String & name_)
{
std::lock_guard lock{mutex};
acquired.emplace(std::pair{path_, name_}, std::nullopt);
return true;
}
void RestoreCoordinationLocal::setResultForZkPathAndName(const String & zk_path_, const String & name_, Result res_)
{
std::lock_guard lock{mutex};
getResultRef(zk_path_, name_) = res_;
result_changed.notify_all();
}
bool RestoreCoordinationLocal::getResultForZkPathAndName(const String & zk_path_, const String & name_, Result & res_, std::chrono::milliseconds timeout_) const
{
std::unique_lock lock{mutex};
auto value = getResultRef(zk_path_, name_);
if (value)
{
res_ = *value;
return true;
}
bool waited = result_changed.wait_for(lock, timeout_, [this, zk_path_, name_] { return getResultRef(zk_path_, name_).has_value(); });
if (!waited)
return false;
res_ = *getResultRef(zk_path_, name_);
return true;
}
std::optional<IRestoreCoordination::Result> & RestoreCoordinationLocal::getResultRef(const String & zk_path_, const String & name_)
{
auto it = acquired.find(std::pair{zk_path_, name_});
if (it == acquired.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path ({}, {}) is not acquired", zk_path_, name_);
return it->second;
}
const std::optional<IRestoreCoordination::Result> & RestoreCoordinationLocal::getResultRef(const String & zk_path_, const String & name_) const
{
auto it = acquired.find(std::pair{zk_path_, name_});
if (it == acquired.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path ({}, {}) is not acquired", zk_path_, name_);
return it->second;
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Backups/IRestoreCoordination.h>
#include <map>
#include <unordered_map>
namespace DB
{
class RestoreCoordinationLocal : public IRestoreCoordination
{
public:
RestoreCoordinationLocal();
~RestoreCoordinationLocal() override;
void setOrGetPathInBackupForZkPath(const String & zk_path_, String & path_in_backup_) override;
bool acquireZkPathAndName(const String & zk_path_, const String & name_) override;
void setResultForZkPathAndName(const String & zk_path_, const String & name_, Result res_) override;
bool getResultForZkPathAndName(const String & zk_path_, const String & name_, Result & res_, std::chrono::milliseconds timeout_) const override;
private:
std::optional<Result> & getResultRef(const String & zk_path_, const String & name_);
const std::optional<Result> & getResultRef(const String & zk_path_, const String & name_) const;
mutable std::mutex mutex;
std::unordered_map<String, String> paths_in_backup_by_zk_path;
std::map<std::pair<String, String>, std::optional<Result>> acquired;
mutable std::condition_variable result_changed;
};
}

View File

@ -1,75 +0,0 @@
#include <Backups/RestoreSettings.h>
#include <Backups/BackupInfo.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Core/SettingsFields.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int CANNOT_PARSE_RESTORE_TABLE_CREATION_MODE;
}
namespace
{
RestoreTableCreationMode parseRestoreTableCreationMode(const Field & field)
{
if (field.getType() == Field::Types::String)
{
String str = field.get<String>();
if (str == "1" || boost::iequals(str, "true"))
return RestoreTableCreationMode::kCreate;
if (str == "0" || boost::iequals(str, "false"))
return RestoreTableCreationMode::kMustExist;
if (boost::iequals(str, "if not exists"))
return RestoreTableCreationMode::kCreateIfNotExists;
throw Exception("Cannot parse creation mode from string '" + str + "'",
ErrorCodes::CANNOT_PARSE_RESTORE_TABLE_CREATION_MODE);
}
if (applyVisitor(FieldVisitorConvertToNumber<bool>(), field))
return RestoreTableCreationMode::kCreate;
else
return RestoreTableCreationMode::kMustExist;
}
}
RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query)
{
RestoreSettings res;
if (query.base_backup_name)
res.base_backup_info = BackupInfo::fromAST(*query.base_backup_name);
if (query.settings)
{
const auto & settings = query.settings->as<const ASTSetQuery &>().changes;
for (const auto & setting : settings)
{
if (setting.name == "password")
res.password = SettingFieldString{setting.value};
else if (setting.name == "structure_only")
res.structure_only = SettingFieldBool{setting.value};
else if (setting.name == "create_table")
res.create_table = parseRestoreTableCreationMode(setting.value);
else if (setting.name == "create_database")
res.create_database = parseRestoreTableCreationMode(setting.value);
else if (setting.name == "allow_different_table_def")
res.allow_different_table_def = SettingFieldBool{setting.value};
else if (setting.name == "allow_different_database_def")
res.allow_different_database_def = SettingFieldBool{setting.value};
else if (setting.name == "async")
res.async = SettingFieldBool{setting.value};
else
throw Exception(ErrorCodes::UNKNOWN_SETTING, "Unknown setting {}", setting.name);
}
}
return res;
}
}

View File

@ -1,25 +1,33 @@
#include <Backups/RestoreUtils.h>
#include <Backups/BackupUtils.h>
#include <Backups/Common/RestoreSettings.h>
#include <Backups/DDLCompareUtils.h>
#include <Backups/DDLRenamingVisitor.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
#include <Backups/IRestoreTask.h>
#include <Backups/RestoreSettings.h>
#include <Backups/RestoreCoordinationDistributed.h>
#include <Backups/formatTableNameOrTemporaryTableName.h>
#include <Common/escapeForFileName.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseReplicated.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Storages/IStorage.h>
#include <base/chrono_io.h>
#include <base/insertAtEnd.h>
#include <boost/range/adaptor/reversed.hpp>
#include <boost/range/algorithm_ext/erase.hpp>
#include <filesystem>
#include <base/sleep.h>
namespace fs = std::filesystem;
@ -29,10 +37,145 @@ namespace ErrorCodes
{
extern const int CANNOT_RESTORE_TABLE;
extern const int CANNOT_RESTORE_DATABASE;
extern const int BACKUP_ENTRY_NOT_FOUND;
}
namespace
{
class PathsInBackup
{
public:
PathsInBackup(const IBackup & backup_) : backup(backup_) {}
std::vector<size_t> getShards() const
{
std::vector<size_t> res;
constexpr std::string_view shard_prefix = "shard";
for (const String & shard_dir : backup.listFiles(""))
{
if (shard_dir.starts_with(shard_prefix))
{
size_t shard_index = parse<UInt64>(shard_dir.substr(shard_prefix.size()));
res.push_back(shard_index);
}
}
if (res.empty())
res.push_back(1);
return res;
}
std::vector<size_t> getReplicas(size_t shard_index) const
{
std::vector<size_t> res;
constexpr std::string_view replica_prefix = "replica";
for (const String & replica_dir : backup.listFiles(fmt::format("shard{}/", shard_index)))
{
if (replica_dir.starts_with(replica_prefix))
{
size_t replica_index = parse<UInt64>(replica_dir.substr(replica_prefix.size()));
res.push_back(replica_index);
}
}
if (res.empty())
res.push_back(1);
return res;
}
std::vector<String> getDatabases(size_t shard_index, size_t replica_index) const
{
std::vector<String> res;
insertAtEnd(res, backup.listFiles(fmt::format("shard{}/replica{}/metadata/", shard_index, replica_index)));
insertAtEnd(res, backup.listFiles(fmt::format("shard{}/metadata/", shard_index)));
insertAtEnd(res, backup.listFiles(fmt::format("metadata/")));
boost::range::remove_erase_if(
res,
[](String & str)
{
if (str.ends_with(".sql"))
{
str.resize(str.length() - strlen(".sql"));
str = unescapeForFileName(str);
return false;
}
return true;
});
std::sort(res.begin(), res.end());
res.erase(std::unique(res.begin(), res.end()), res.end());
return res;
}
std::vector<String> getTables(const String & database_name, size_t shard_index, size_t replica_index) const
{
std::vector<String> res;
String escaped_database_name = escapeForFileName(database_name);
insertAtEnd(res, backup.listFiles(fmt::format("shard{}/replica{}/metadata/{}/", shard_index, replica_index, escaped_database_name)));
insertAtEnd(res, backup.listFiles(fmt::format("shard{}/metadata/{}/", shard_index, escaped_database_name)));
insertAtEnd(res, backup.listFiles(fmt::format("metadata/{}/", escaped_database_name)));
boost::range::remove_erase_if(
res,
[](String & str)
{
if (str.ends_with(".sql"))
{
str.resize(str.length() - strlen(".sql"));
str = unescapeForFileName(str);
return false;
}
return true;
});
std::sort(res.begin(), res.end());
res.erase(std::unique(res.begin(), res.end()), res.end());
return res;
}
/// Returns the path to metadata in backup.
String getMetadataPath(const DatabaseAndTableName & table_name, size_t shard_index, size_t replica_index) const
{
String escaped_table_name = escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second);
String path1 = fmt::format("shard{}/replica{}/metadata/{}.sql", shard_index, replica_index, escaped_table_name);
if (backup.fileExists(path1))
return path1;
String path2 = fmt::format("shard{}/metadata/{}.sql", shard_index, escaped_table_name);
if (backup.fileExists(path2))
return path2;
String path3 = fmt::format("metadata/{}.sql", escaped_table_name);
return path3;
}
String getMetadataPath(const String & database_name, size_t shard_index, size_t replica_index) const
{
String escaped_database_name = escapeForFileName(database_name);
String path1 = fmt::format("shard{}/replica{}/metadata/{}.sql", shard_index, replica_index, escaped_database_name);
if (backup.fileExists(path1))
return path1;
String path2 = fmt::format("shard{}/metadata/{}.sql", shard_index, escaped_database_name);
if (backup.fileExists(path2))
return path2;
String path3 = fmt::format("metadata/{}.sql", escaped_database_name);
return path3;
}
String getDataPath(const DatabaseAndTableName & table_name, size_t shard_index, size_t replica_index) const
{
String escaped_table_name = escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second);
if (backup.fileExists(fmt::format("shard{}/replica{}/metadata/{}.sql", shard_index, replica_index, escaped_table_name)))
return fmt::format("shard{}/replica{}/data/{}/", shard_index, replica_index, escaped_table_name);
if (backup.fileExists(fmt::format("shard{}/metadata/{}.sql", shard_index, escaped_table_name)))
return fmt::format("shard{}/data/{}/", shard_index, escaped_table_name);
return fmt::format("data/{}/", escaped_table_name);
}
private:
const IBackup & backup;
};
using Kind = ASTBackupQuery::Kind;
using Element = ASTBackupQuery::Element;
using Elements = ASTBackupQuery::Elements;
@ -127,10 +270,11 @@ namespace
const ASTs & partitions_,
const BackupPtr & backup_,
const DatabaseAndTableName & table_name_in_backup_,
const RestoreSettingsPtr & restore_settings_)
const RestoreSettingsPtr & restore_settings_,
const std::shared_ptr<IRestoreCoordination> & restore_coordination_)
: context(context_), create_query(typeid_cast<std::shared_ptr<ASTCreateQuery>>(create_query_)),
partitions(partitions_), backup(backup_), table_name_in_backup(table_name_in_backup_),
restore_settings(restore_settings_)
restore_settings(restore_settings_), restore_coordination(restore_coordination_)
{
table_name = DatabaseAndTableName{create_query->getDatabase(), create_query->getTable()};
if (create_query->temporary)
@ -139,9 +283,28 @@ namespace
RestoreTasks run() override
{
createStorage();
getStorage();
checkStorageCreateQuery();
if (acquireTableCreation())
{
try
{
createStorage();
getStorage();
checkStorageCreateQuery();
setTableCreationResult(IRestoreCoordination::Result::SUCCEEDED);
}
catch (...)
{
setTableCreationResult(IRestoreCoordination::Result::FAILED);
throw;
}
}
else
{
waitForTableCreation();
getStorage();
checkStorageCreateQuery();
}
RestoreTasks tasks;
if (auto task = insertData())
tasks.push_back(std::move(task));
@ -151,6 +314,67 @@ namespace
bool isSequential() const override { return true; }
private:
bool acquireTableCreation()
{
if (restore_settings->create_table == RestoreTableCreationMode::kMustExist)
return true;
auto replicated_db
= typeid_cast<std::shared_ptr<const DatabaseReplicated>>(DatabaseCatalog::instance().getDatabase(table_name.first));
if (!replicated_db)
return true;
use_coordination_for_table_creation = true;
replicated_database_zookeeper_path = replicated_db->getZooKeeperPath();
if (restore_coordination->acquireZkPathAndName(replicated_database_zookeeper_path, table_name.second))
return true;
return false;
}
void setTableCreationResult(IRestoreCoordination::Result res)
{
if (use_coordination_for_table_creation)
restore_coordination->setResultForZkPathAndName(replicated_database_zookeeper_path, table_name.second, res);
}
void waitForTableCreation()
{
if (!use_coordination_for_table_creation)
return;
IRestoreCoordination::Result res;
const auto & config = context->getConfigRef();
auto timeout = std::chrono::seconds(config.getUInt("backups.create_table_in_replicated_db_timeout", 10));
auto start_time = std::chrono::steady_clock::now();
if (!restore_coordination->getResultForZkPathAndName(replicated_database_zookeeper_path, table_name.second, res, timeout))
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Waited too long ({}) for creating of {} on another replica",
to_string(timeout),
formatTableNameOrTemporaryTableName(table_name));
if (res == IRestoreCoordination::Result::FAILED)
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Failed creating of {} on another replica",
formatTableNameOrTemporaryTableName(table_name));
while (std::chrono::steady_clock::now() - start_time < timeout)
{
if (DatabaseCatalog::instance().tryGetDatabaseAndTable({table_name.first, table_name.second}, context).second)
return;
sleepForMilliseconds(50);
}
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Waited too long ({}) for creating of {} on another replica",
to_string(timeout),
formatTableNameOrTemporaryTableName(table_name));
}
void createStorage()
{
if (restore_settings->create_table == RestoreTableCreationMode::kMustExist)
@ -206,7 +430,7 @@ namespace
if (restore_settings->structure_only)
return false;
data_path_in_backup = getDataPathInBackup(table_name_in_backup);
data_path_in_backup = PathsInBackup{*backup}.getDataPath(table_name_in_backup, restore_settings->shard_in_backup, restore_settings->replica_in_backup);
if (backup->listFiles(data_path_in_backup).empty())
return false;
@ -234,7 +458,11 @@ namespace
{
if (!hasData())
return {};
return storage->restoreData(context, partitions, backup, data_path_in_backup, *restore_settings);
if (restore_settings->replica == 2)
sleepForSeconds(5);
return storage->restoreData(context, partitions, backup, data_path_in_backup, *restore_settings, restore_coordination);
}
ContextMutablePtr context;
@ -244,6 +472,9 @@ namespace
BackupPtr backup;
DatabaseAndTableName table_name_in_backup;
RestoreSettingsPtr restore_settings;
std::shared_ptr<IRestoreCoordination> restore_coordination;
bool use_coordination_for_table_creation = false;
String replicated_database_zookeeper_path;
DatabasePtr database;
StoragePtr storage;
ASTPtr storage_create_query;
@ -258,11 +489,17 @@ namespace
{
public:
RestoreTasksBuilder(ContextMutablePtr context_, const BackupPtr & backup_, const RestoreSettings & restore_settings_)
: context(context_), backup(backup_), restore_settings(restore_settings_) {}
: context(context_), backup(backup_), restore_settings(restore_settings_)
{
if (!restore_settings.coordination_zk_path.empty())
restore_coordination = std::make_shared<RestoreCoordinationDistributed>(restore_settings.coordination_zk_path, [context=context] { return context->getZooKeeper(); });
}
/// Prepares internal structures for making tasks for restoring.
void prepare(const ASTBackupQuery::Elements & elements)
{
adjustIndicesOfSourceShardAndReplicaInBackup();
String current_database = context->getCurrentDatabase();
renaming_settings.setFromBackupQuery(elements, current_database);
@ -307,12 +544,39 @@ namespace
/// TODO: We need to restore tables according to their dependencies.
for (const auto & info : tables | boost::adaptors::map_values)
res.push_back(std::make_unique<RestoreTableTask>(context, info.create_query, info.partitions, backup, info.name_in_backup, restore_settings_ptr));
res.push_back(std::make_unique<RestoreTableTask>(context, info.create_query, info.partitions, backup, info.name_in_backup, restore_settings_ptr, restore_coordination));
return res;
}
private:
void adjustIndicesOfSourceShardAndReplicaInBackup()
{
auto shards_in_backup = PathsInBackup{*backup}.getShards();
if (!restore_settings.shard_in_backup)
{
if (shards_in_backup.size() == 1)
restore_settings.shard_in_backup = shards_in_backup[0];
else
restore_settings.shard_in_backup = restore_settings.shard;
}
if (std::find(shards_in_backup.begin(), shards_in_backup.end(), restore_settings.shard_in_backup) == shards_in_backup.end())
throw Exception(ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "No shard #{} in backup", restore_settings.shard_in_backup);
auto replicas_in_backup = PathsInBackup{*backup}.getReplicas(restore_settings.shard_in_backup);
if (!restore_settings.replica_in_backup)
{
if (replicas_in_backup.size() == 1)
restore_settings.replica_in_backup = replicas_in_backup[0];
else
restore_settings.replica_in_backup = restore_settings.replica;
}
if (std::find(replicas_in_backup.begin(), replicas_in_backup.end(), restore_settings.replica_in_backup) == replicas_in_backup.end())
throw Exception(ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "No replica #{} in backup", restore_settings.replica_in_backup);
}
/// Prepares to restore a single table and probably its database's definition.
void prepareToRestoreTable(const DatabaseAndTableName & table_name_, const ASTs & partitions_)
{
@ -339,8 +603,8 @@ namespace
if (databases.contains(new_database_name))
throw Exception(ErrorCodes::CANNOT_RESTORE_DATABASE, "Cannot restore the database {} twice", backQuoteIfNeed(new_database_name));
Strings table_metadata_filenames = backup->listFiles("metadata/" + escapeForFileName(database_name_) + "/", "/");
bool has_tables_in_backup = !table_metadata_filenames.empty();
Strings table_names = PathsInBackup{*backup}.getTables(database_name_, restore_settings.shard_in_backup, restore_settings.replica_in_backup);
bool has_tables_in_backup = !table_names.empty();
bool has_create_query_in_backup = hasCreateQueryInBackup(database_name_);
if (!has_create_query_in_backup && !has_tables_in_backup)
@ -367,9 +631,8 @@ namespace
}
/// Restore tables in this database.
for (const String & table_metadata_filename : table_metadata_filenames)
for (const String & table_name : table_names)
{
String table_name = unescapeForFileName(fs::path{table_metadata_filename}.stem());
if (except_list_.contains(table_name))
continue;
prepareToRestoreTable(DatabaseAndTableName{database_name_, table_name}, ASTs{});
@ -379,10 +642,8 @@ namespace
/// Prepares to restore all the databases contained in the backup.
void prepareToRestoreAllDatabases(const std::set<String> & except_list_)
{
Strings database_metadata_filenames = backup->listFiles("metadata/", "/");
for (const String & database_metadata_filename : database_metadata_filenames)
for (const String & database_name : PathsInBackup{*backup}.getDatabases(restore_settings.shard_in_backup, restore_settings.replica_in_backup))
{
String database_name = unescapeForFileName(fs::path{database_metadata_filename}.stem());
if (except_list_.contains(database_name))
continue;
prepareToRestoreDatabase(database_name, std::set<String>{});
@ -392,7 +653,7 @@ namespace
/// Reads a create query for creating a specified table from the backup.
std::shared_ptr<ASTCreateQuery> readCreateQueryFromBackup(const DatabaseAndTableName & table_name) const
{
String create_query_path = getMetadataPathInBackup(table_name);
String create_query_path = PathsInBackup{*backup}.getMetadataPath(table_name, restore_settings.shard_in_backup, restore_settings.replica_in_backup);
if (!backup->fileExists(create_query_path))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore the {} because there is no such table in the backup",
formatTableNameOrTemporaryTableName(table_name));
@ -407,7 +668,7 @@ namespace
/// Reads a create query for creating a specified database from the backup.
std::shared_ptr<ASTCreateQuery> readCreateQueryFromBackup(const String & database_name) const
{
String create_query_path = getMetadataPathInBackup(database_name);
String create_query_path = PathsInBackup{*backup}.getMetadataPath(database_name, restore_settings.shard_in_backup, restore_settings.replica_in_backup);
if (!backup->fileExists(create_query_path))
throw Exception(ErrorCodes::CANNOT_RESTORE_DATABASE, "Cannot restore the database {} because there is no such database in the backup", backQuoteIfNeed(database_name));
auto read_buffer = backup->readFile(create_query_path)->getReadBuffer();
@ -421,7 +682,7 @@ namespace
/// Whether there is a create query for creating a specified database in the backup.
bool hasCreateQueryInBackup(const String & database_name) const
{
String create_query_path = getMetadataPathInBackup(database_name);
String create_query_path = PathsInBackup{*backup}.getMetadataPath(database_name, restore_settings.shard_in_backup, restore_settings.replica_in_backup);
return backup->fileExists(create_query_path);
}
@ -456,6 +717,7 @@ namespace
ContextMutablePtr context;
BackupPtr backup;
RestoreSettings restore_settings;
std::shared_ptr<IRestoreCoordination> restore_coordination;
DDLRenamingSettings renaming_settings;
std::map<String /* new_db_name */, CreateDatabaseInfo> databases;
std::map<DatabaseAndTableName /* new_table_name */, CreateTableInfo> tables;
@ -612,4 +874,16 @@ void executeRestoreTasks(RestoreTasks && restore_tasks, size_t num_threads)
need_rollback_completed_tasks = false;
}
size_t getMinCountOfReplicas(const IBackup & backup)
{
size_t min_count_of_replicas = static_cast<size_t>(-1);
for (size_t shard_index : PathsInBackup(backup).getShards())
{
size_t count_of_replicas = PathsInBackup(backup).getReplicas(shard_index).size();
min_count_of_replicas = std::min(min_count_of_replicas, count_of_replicas);
}
return min_count_of_replicas;
}
}

View File

@ -13,6 +13,7 @@ using RestoreTaskPtr = std::unique_ptr<IRestoreTask>;
using RestoreTasks = std::vector<RestoreTaskPtr>;
struct RestoreSettings;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
using ContextMutablePtr = std::shared_ptr<Context>;
/// Prepares restore tasks.
@ -21,4 +22,7 @@ RestoreTasks makeRestoreTasks(ContextMutablePtr context, const BackupPtr & backu
/// Executes restore tasks.
void executeRestoreTasks(RestoreTasks && tasks, size_t num_threads);
/// Returns the minimal count of replicas stored in the backup.
size_t getMinCountOfReplicas(const IBackup & backup);
}

View File

@ -5,7 +5,6 @@
#include <Common/quoteString.h>
#include <Disks/IDisk.h>
#include <IO/Archives/hasRegisteredArchiveFileExtension.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
@ -177,7 +176,7 @@ void registerBackupEnginesFileAndDisk(BackupFactory & factory)
writer = std::make_shared<BackupWriterFile>(path);
else
writer = std::make_shared<BackupWriterDisk>(disk, path);
return std::make_unique<BackupImpl>(backup_name, archive_params, params.base_backup_info, writer, nullptr, false, params.context);
return std::make_unique<BackupImpl>(backup_name, archive_params, params.base_backup_info, writer, params.context, params.backup_uuid, params.is_internal_backup, params.coordination_zk_path);
}
};

View File

@ -0,0 +1,37 @@
#include <Backups/replaceTableUUIDWithMacroInReplicatedTableDef.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
void replaceTableUUIDWithMacroInReplicatedTableDef(ASTCreateQuery & create_query, const UUID & table_uuid)
{
if (create_query.getTable().empty() || !create_query.storage || !create_query.storage->engine || (table_uuid == UUIDHelpers::Nil))
return;
auto & engine = *(create_query.storage->engine);
if (!engine.name.starts_with("Replicated") || !engine.arguments)
return;
auto * args = typeid_cast<ASTExpressionList *>(engine.arguments.get());
size_t zookeeper_path_arg_pos = engine.name.starts_with("ReplicatedGraphite") ? 1 : 0;
if (!args || (args->children.size() <= zookeeper_path_arg_pos))
return;
auto * zookeeper_path_arg = typeid_cast<ASTLiteral *>(args->children[zookeeper_path_arg_pos].get());
if (!zookeeper_path_arg || (zookeeper_path_arg->value.getType() != Field::Types::String))
return;
String & zookeeper_path = zookeeper_path_arg->value.get<String>();
String table_uuid_str = toString(table_uuid);
if (size_t uuid_pos = zookeeper_path.find(table_uuid_str); uuid_pos != String::npos)
zookeeper_path.replace(uuid_pos, table_uuid_str.size(), "{uuid}");
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <Core/UUID.h>
namespace DB
{
class ASTCreateQuery;
/// While making a replicated table it replaces "{uuid}" in zookeeper path with the real table UUID.
/// This function reverts this replacement.d
void replaceTableUUIDWithMacroInReplicatedTableDef(ASTCreateQuery & create_query, const UUID & table_uuid);
}

View File

@ -215,6 +215,7 @@ if (TARGET ch_contrib::jemalloc)
endif()
add_subdirectory(Access/Common)
add_subdirectory(Backups/Common)
add_subdirectory(Common/ZooKeeper)
add_subdirectory(Common/Config)

View File

@ -55,6 +55,8 @@ public:
String getFullReplicaName() const;
static std::pair<String, String> parseFullReplicaName(const String & name);
const String & getZooKeeperPath() const { return zookeeper_path; }
/// Returns cluster consisting of database replicas
ClusterPtr getCluster() const;

View File

@ -1,27 +1,30 @@
#include <Interpreters/InterpreterBackupQuery.h>
#include <Backups/Common/BackupSettings.h>
#include <Backups/Common/RestoreSettings.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
#include <Backups/IRestoreTask.h>
#include <Backups/BackupFactory.h>
#include <Backups/BackupSettings.h>
#include <Backups/BackupUtils.h>
#include <Backups/BackupsWorker.h>
#include <Backups/RestoreSettings.h>
#include <Backups/RestoreUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <base/logger_useful.h>
namespace DB
{
namespace
{
BackupMutablePtr createBackup(const BackupInfo & backup_info, const BackupSettings & backup_settings, const ContextPtr & context)
BackupMutablePtr createBackup(const UUID & backup_uuid, const BackupInfo & backup_info, const BackupSettings & backup_settings, const ContextPtr & context)
{
BackupFactory::CreateParams params;
params.open_mode = IBackup::OpenMode::WRITE;
@ -31,6 +34,9 @@ namespace
params.compression_method = backup_settings.compression_method;
params.compression_level = backup_settings.compression_level;
params.password = backup_settings.password;
params.backup_uuid = backup_uuid;
params.is_internal_backup = backup_settings.internal;
params.coordination_zk_path = backup_settings.coordination_zk_path;
return BackupFactory::instance().createBackup(params);
}
@ -45,84 +51,157 @@ namespace
return BackupFactory::instance().createBackup(params);
}
void executeBackupSync(UInt64 task_id, const ContextPtr & context, const BackupInfo & backup_info, const ASTBackupQuery::Elements & backup_elements, const BackupSettings & backup_settings, bool no_throw = false)
void executeBackupSync(const ASTBackupQuery & query, UInt64 task_id, const ContextPtr & context, const BackupInfo & backup_info, const BackupSettings & backup_settings, bool no_throw = false)
{
auto & worker = BackupsWorker::instance();
bool is_internal_backup = backup_settings.internal;
try
{
BackupMutablePtr backup = createBackup(backup_info, backup_settings, context);
worker.update(task_id, BackupStatus::PREPARING);
auto backup_entries = makeBackupEntries(context, backup_elements, backup_settings);
worker.update(task_id, BackupStatus::MAKING_BACKUP);
writeBackupEntries(backup, std::move(backup_entries), context->getSettingsRef().max_backup_threads);
worker.update(task_id, BackupStatus::BACKUP_COMPLETE);
UUID backup_uuid = UUIDHelpers::generateV4();
auto new_backup_settings = backup_settings;
if (!query.cluster.empty() && backup_settings.coordination_zk_path.empty())
new_backup_settings.coordination_zk_path = query.cluster.empty() ? "" : ("/clickhouse/backups/backup-" + toString(backup_uuid));
std::shared_ptr<ASTBackupQuery> new_query = std::static_pointer_cast<ASTBackupQuery>(query.clone());
new_backup_settings.copySettingsToBackupQuery(*new_query);
BackupMutablePtr backup = createBackup(backup_uuid, backup_info, new_backup_settings, context);
if (!query.cluster.empty())
{
if (!is_internal_backup)
worker.update(task_id, BackupStatus::MAKING_BACKUP);
DDLQueryOnClusterParams params;
params.shard_index = new_backup_settings.shard;
params.replica_index = new_backup_settings.replica;
params.allow_multiple_replicas = new_backup_settings.allow_storing_multiple_replicas;
auto res = executeDDLQueryOnCluster(new_query, context, params);
PullingPipelineExecutor executor(res.pipeline);
Block block;
while (executor.pull(block));
backup->finalizeWriting();
}
else
{
auto backup_entries = makeBackupEntries(context, new_query->elements, new_backup_settings);
if (!is_internal_backup)
worker.update(task_id, BackupStatus::MAKING_BACKUP);
writeBackupEntries(backup, std::move(backup_entries), context->getSettingsRef().max_backup_threads);
}
if (!is_internal_backup)
worker.update(task_id, BackupStatus::BACKUP_COMPLETE);
}
catch (...)
{
worker.update(task_id, BackupStatus::FAILED_TO_BACKUP, getCurrentExceptionMessage(false));
if (!is_internal_backup)
worker.update(task_id, BackupStatus::FAILED_TO_BACKUP, getCurrentExceptionMessage(false));
if (!no_throw)
throw;
}
}
void executeRestoreSync(UInt64 task_id, ContextMutablePtr context, const BackupInfo & backup_info, const ASTBackupQuery::Elements & restore_elements, const RestoreSettings & restore_settings, bool no_throw = false)
void executeRestoreSync(const ASTBackupQuery & query, UInt64 task_id, ContextMutablePtr context, const BackupInfo & backup_info, const RestoreSettings & restore_settings, bool no_throw = false)
{
auto & worker = BackupsWorker::instance();
bool is_internal_restore = restore_settings.internal;
try
{
BackupPtr backup = openBackup(backup_info, restore_settings, context);
worker.update(task_id, BackupStatus::RESTORING);
auto restore_tasks = makeRestoreTasks(context, backup, restore_elements, restore_settings);
executeRestoreTasks(std::move(restore_tasks), context->getSettingsRef().max_backup_threads);
worker.update(task_id, BackupStatus::RESTORED);
auto new_restore_settings = restore_settings;
if (!query.cluster.empty() && new_restore_settings.coordination_zk_path.empty())
{
UUID restore_uuid = UUIDHelpers::generateV4();
new_restore_settings.coordination_zk_path
= query.cluster.empty() ? "" : ("/clickhouse/backups/restore-" + toString(restore_uuid));
}
std::shared_ptr<ASTBackupQuery> new_query = std::static_pointer_cast<ASTBackupQuery>(query.clone());
new_restore_settings.copySettingsToRestoreQuery(*new_query);
if (!query.cluster.empty())
{
DDLQueryOnClusterParams params;
params.shard_index = new_restore_settings.shard;
params.replica_index = new_restore_settings.replica;
auto res = executeDDLQueryOnCluster(new_query, context, params);
PullingPipelineExecutor executor(res.pipeline);
Block block;
while (executor.pull(block));
}
else
{
auto restore_tasks = makeRestoreTasks(context, backup, new_query->elements, new_restore_settings);
executeRestoreTasks(std::move(restore_tasks), context->getSettingsRef().max_backup_threads);
}
if (!is_internal_restore)
worker.update(task_id, BackupStatus::RESTORED);
}
catch (...)
{
worker.update(task_id, BackupStatus::FAILED_TO_RESTORE, getCurrentExceptionMessage(false));
if (!is_internal_restore)
worker.update(task_id, BackupStatus::FAILED_TO_RESTORE, getCurrentExceptionMessage(false));
if (!no_throw)
throw;
}
}
UInt64 executeBackup(const ContextPtr & context, const ASTBackupQuery & query)
UInt64 executeBackup(const ASTBackupQuery & query, const ContextPtr & context)
{
const auto backup_info = BackupInfo::fromAST(*query.backup_name);
auto task_id = BackupsWorker::instance().add(backup_info.toString(), BackupStatus::PREPARING);
const auto backup_settings = BackupSettings::fromBackupQuery(query);
size_t task_id = 0;
if (!backup_settings.internal)
task_id = BackupsWorker::instance().add(backup_info.toString(), BackupStatus::PREPARING);
if (backup_settings.async)
{
ThreadFromGlobalPool thread{
&executeBackupSync, task_id, context, backup_info, query.elements, backup_settings, /* no_throw = */ true};
&executeBackupSync, query, task_id, context, backup_info, backup_settings, /* no_throw = */ true};
thread.detach(); /// TODO: Remove this !!! Move that thread to BackupsWorker instead
}
else
{
executeBackupSync(task_id, context, backup_info, query.elements, backup_settings, /* no_throw = */ false);
executeBackupSync(query, task_id, context, backup_info, backup_settings, /* no_throw = */ false);
}
return task_id;
}
UInt64 executeRestore(ContextMutablePtr context, const ASTBackupQuery & query)
UInt64 executeRestore(const ASTBackupQuery & query, ContextMutablePtr context)
{
const auto backup_info = BackupInfo::fromAST(*query.backup_name);
const auto restore_settings = RestoreSettings::fromRestoreQuery(query);
auto task_id = BackupsWorker::instance().add(backup_info.toString(), BackupStatus::RESTORING);
size_t task_id = 0;
if (!restore_settings.internal)
task_id = BackupsWorker::instance().add(backup_info.toString(), BackupStatus::RESTORING);
if (restore_settings.async)
{
ThreadFromGlobalPool thread{&executeRestoreSync, task_id, context, backup_info, query.elements, restore_settings, /* no_throw = */ true};
ThreadFromGlobalPool thread{&executeRestoreSync, query, task_id, context, backup_info, restore_settings, /* no_throw = */ true};
thread.detach(); /// TODO: Remove this !!! Move that thread to BackupsWorker instead
}
else
{
executeRestoreSync(task_id, context, backup_info, query.elements, restore_settings, /* no_throw = */ false);
executeRestoreSync(query, task_id, context, backup_info, restore_settings, /* no_throw = */ false);
}
return task_id;
}
Block getResultRow(UInt64 task_id)
{
if (!task_id)
return {};
auto entry = BackupsWorker::instance().getEntry(task_id);
Block res_columns;
@ -146,11 +225,12 @@ namespace
BlockIO InterpreterBackupQuery::execute()
{
const auto & query = query_ptr->as<const ASTBackupQuery &>();
UInt64 task_id;
if (query.kind == ASTBackupQuery::BACKUP)
task_id = executeBackup(context, query);
task_id = executeBackup(query, context);
else
task_id = executeRestore(context, query);
task_id = executeRestore(query, context);
BlockIO res_io;
res_io.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(getResultRow(task_id)));

View File

@ -27,7 +27,7 @@ struct DDLQueryOnClusterParams
/// 1-bases index of a shard to execute a query on, 0 means all shards.
size_t shard_index = 0;
/// 1-bases index of a replica to execute a query on, 0 means all replicas (see also allow_multiple_replicas).
/// 1-bases index of a replica to execute a query on, 0 means all replicas (see also allow_storing_multiple_replicas).
size_t replica_index = 0;
/// Allows executing a query on multiple replicas.

View File

@ -1,4 +1,5 @@
#include <Parsers/ASTBackupQuery.h>
#include <Backups/Common/rewriteBackupQueryWithoutOnCluster.h>
#include <IO/Operators.h>
#include <Common/quoteString.h>
@ -135,7 +136,6 @@ namespace
format.ostr << ", ";
settings->format(format);
}
}
}
@ -157,6 +157,7 @@ void ASTBackupQuery::formatImpl(const FormatSettings & format, FormatState &, Fo
<< (format.hilite ? hilite_none : "");
formatElements(elements, kind, format);
formatOnCluster(format);
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO " : " FROM ") << (format.hilite ? hilite_none : "");
backup_name->format(format);
@ -165,4 +166,13 @@ void ASTBackupQuery::formatImpl(const FormatSettings & format, FormatState &, Fo
formatSettings(settings, base_backup_name, format);
}
ASTPtr ASTBackupQuery::getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams & params) const
{
if (kind == ASTBackupQuery::Kind::BACKUP)
return rewriteBackupQueryWithoutOnCluster(*this, params);
else
return rewriteRestoreQueryWithoutOnCluster(*this, params);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
@ -15,6 +16,7 @@ using DatabaseAndTableName = std::pair<String, String>;
* ALL TEMPORARY TABLES [EXCEPT ...] |
* DATABASE database_name [EXCEPT ...] [AS database_name_in_backup] |
* ALL DATABASES [EXCEPT ...] } [,...]
* [ON CLUSTER 'cluster_name']
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {File(...) | Disk(...)}]
@ -25,6 +27,7 @@ using DatabaseAndTableName = std::pair<String, String>;
* ALL TEMPORARY TABLES [EXCEPT ...] |
* DATABASE database_name_in_backup [EXCEPT ...] [INTO database_name] |
* ALL DATABASES [EXCEPT ...] } [,...]
* [ON CLUSTER 'cluster_name']
* FROM {File(...) | Disk(...)}
*
* Notes:
@ -42,7 +45,7 @@ using DatabaseAndTableName = std::pair<String, String>;
* The "WITH BASE" clause allows to set a base backup. Only differences made after the base backup will be
* included in a newly created backup, so this option allows to make an incremental backup.
*/
class ASTBackupQuery : public IAST
class ASTBackupQuery : public IAST, public ASTQueryWithOnCluster
{
public:
enum Kind
@ -84,5 +87,6 @@ public:
String getID(char) const override;
ASTPtr clone() const override;
void formatImpl(const FormatSettings & format, FormatState &, FormatStateStacked) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override;
};
}

View File

@ -5,6 +5,7 @@ add_headers_and_sources(clickhouse_parsers ./Access)
add_headers_and_sources(clickhouse_parsers ./MySQL)
add_library(clickhouse_parsers ${clickhouse_parsers_headers} ${clickhouse_parsers_sources})
target_link_libraries(clickhouse_parsers PUBLIC clickhouse_common_io clickhouse_common_access)
target_link_libraries(clickhouse_parsers PUBLIC clickhouse_common_io clickhouse_common_backups)
if (USE_DEBUG_HELPERS)
# CMake generator expression will do insane quoting when it encounters special character like quotes, spaces, etc.

View File

@ -284,6 +284,14 @@ namespace
return true;
});
}
bool parseOnCluster(IParserBase::Pos & pos, Expected & expected, String & cluster)
{
return IParserBase::wrapParseImpl(pos, [&]
{
return ParserKeyword{"ON"}.ignore(pos, expected) && ASTQueryWithOnCluster::parse(pos, cluster, expected);
});
}
}
@ -301,6 +309,9 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parseElements(pos, expected, elements))
return false;
String cluster;
parseOnCluster(pos, expected, cluster);
if (!ParserKeyword{(kind == Kind::BACKUP) ? "TO" : "FROM"}.ignore(pos, expected))
return false;
@ -320,6 +331,7 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->backup_name = std::move(backup_name);
query->base_backup_name = std::move(base_backup_name);
query->settings = std::move(settings);
query->cluster = std::move(cluster);
return true;
}

View File

@ -221,7 +221,7 @@ BackupEntries IStorage::backupData(ContextPtr, const ASTs &)
throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED);
}
RestoreTaskPtr IStorage::restoreData(ContextMutablePtr, const ASTs &, const BackupPtr &, const String &, const StorageRestoreSettings &)
RestoreTaskPtr IStorage::restoreData(ContextMutablePtr, const ASTs &, const BackupPtr &, const String &, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
{
throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -75,6 +75,7 @@ using BackupEntries = std::vector<std::pair<String, std::unique_ptr<IBackupEntry
class IRestoreTask;
using RestoreTaskPtr = std::unique_ptr<IRestoreTask>;
struct StorageRestoreSettings;
class IRestoreCoordination;
struct ColumnSize
{
@ -233,7 +234,7 @@ public:
virtual BackupEntries backupData(ContextPtr context, const ASTs & partitions);
/// Extract data from the backup and put it to the storage.
virtual RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings);
virtual RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination);
/// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.

View File

@ -1063,7 +1063,7 @@ private:
ContextMutablePtr context;
};
RestoreTaskPtr StorageLog::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &)
RestoreTaskPtr StorageLog::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());

View File

@ -54,7 +54,7 @@ public:
bool hasDataToBackup() const override { return true; }
BackupEntries backupData(ContextPtr context, const ASTs & partitions) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
protected:
/** Attach the table with the appropriate name, along the appropriate path (with / at the end),

View File

@ -440,11 +440,11 @@ BackupEntries StorageMaterializedView::backupData(ContextPtr context_, const AST
return getTargetTable()->backupData(context_, partitions_);
}
RestoreTaskPtr StorageMaterializedView::restoreData(ContextMutablePtr context_, const ASTs & partitions_, const BackupPtr & backup_, const String & data_path_in_backup_, const StorageRestoreSettings & restore_settings_)
RestoreTaskPtr StorageMaterializedView::restoreData(ContextMutablePtr context_, const ASTs & partitions_, const BackupPtr & backup_, const String & data_path_in_backup_, const StorageRestoreSettings & restore_settings_, const std::shared_ptr<IRestoreCoordination> & restore_coordination_)
{
if (!hasInnerTable())
return {};
return getTargetTable()->restoreData(context_, partitions_, backup_, data_path_in_backup_, restore_settings_);
return getTargetTable()->restoreData(context_, partitions_, backup_, data_path_in_backup_, restore_settings_, restore_coordination_);
}
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)

View File

@ -100,7 +100,7 @@ public:
bool hasDataToBackup() const override { return hasInnerTable(); }
BackupEntries backupData(ContextPtr context_, const ASTs & partitions_) override;
RestoreTaskPtr restoreData(ContextMutablePtr context_, const ASTs & partitions_, const BackupPtr & backup, const String & data_path_in_backup_, const StorageRestoreSettings & restore_settings_) override;
RestoreTaskPtr restoreData(ContextMutablePtr context_, const ASTs & partitions_, const BackupPtr & backup, const String & data_path_in_backup_, const StorageRestoreSettings & restore_settings_, const std::shared_ptr<IRestoreCoordination> & restore_coordination_) override;
private:
/// Will be initialized in constructor

View File

@ -553,7 +553,7 @@ private:
};
RestoreTaskPtr StorageMemory::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &)
RestoreTaskPtr StorageMemory::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());

View File

@ -68,7 +68,7 @@ public:
bool hasDataToBackup() const override { return true; }
BackupEntries backupData(ContextPtr context, const ASTs & partitions) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
std::optional<UInt64> totalRows(const Settings &) const override;
std::optional<UInt64> totalBytes(const Settings &) const override;

View File

@ -1786,7 +1786,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
}
RestoreTaskPtr StorageMergeTree::restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &)
RestoreTaskPtr StorageMergeTree::restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
{
return restoreDataParts(getPartitionIDsFromQuery(partitions, local_context), backup, data_path_in_backup, &increment);
}

View File

@ -99,7 +99,7 @@ public:
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;

View File

@ -71,6 +71,7 @@
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
#include <Backups/IRestoreTask.h>
#include <Backups/IRestoreCoordination.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/DirectoryIterator.h>
@ -8208,19 +8209,25 @@ public:
const std::shared_ptr<StorageReplicatedMergeTree> & storage_,
const std::unordered_set<String> & partition_ids_,
const BackupPtr & backup_,
const String & data_path_in_backup_)
const String & data_path_in_backup_,
const std::shared_ptr<IRestoreCoordination> & restore_coordination_)
: query_context(query_context_)
, storage(storage_)
, partition_ids(partition_ids_)
, backup(backup_)
, data_path_in_backup(data_path_in_backup_)
, restore_coordination(restore_coordination_)
{
}
RestoreTasks run() override
{
String full_zk_path = storage->getZooKeeperName() + storage->getZooKeeperPath();
String adjusted_data_path_in_backup = data_path_in_backup;
restore_coordination->setOrGetPathInBackupForZkPath(full_zk_path, adjusted_data_path_in_backup);
RestoreTasks restore_part_tasks;
Strings part_names = backup->listFiles(data_path_in_backup);
Strings part_names = backup->listFiles(adjusted_data_path_in_backup);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*storage, metadata_snapshot, 0, 0, 0, false, false, query_context, /*is_attach*/true);
@ -8234,8 +8241,11 @@ public:
if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id))
continue;
if (!restore_coordination->acquireZkPathAndName(full_zk_path, part_info->partition_id))
continue; /// Other replica is already restoring this partition.
restore_part_tasks.push_back(
std::make_unique<RestorePartTask>(storage, sink, part_name, *part_info, backup, data_path_in_backup));
std::make_unique<RestorePartTask>(storage, sink, part_name, *part_info, backup, adjusted_data_path_in_backup));
}
return restore_part_tasks;
}
@ -8246,6 +8256,7 @@ private:
std::unordered_set<String> partition_ids;
BackupPtr backup;
String data_path_in_backup;
std::shared_ptr<IRestoreCoordination> restore_coordination;
class RestorePartTask : public IRestoreTask
{
@ -8346,14 +8357,16 @@ RestoreTaskPtr StorageReplicatedMergeTree::restoreData(
const ASTs & partitions,
const BackupPtr & backup,
const String & data_path_in_backup,
const StorageRestoreSettings &)
const StorageRestoreSettings &,
const std::shared_ptr<IRestoreCoordination> & restore_coordination)
{
return std::make_unique<ReplicatedMergeTreeRestoreTask>(
local_context,
std::static_pointer_cast<StorageReplicatedMergeTree>(shared_from_this()),
getPartitionIDsFromQuery(partitions, local_context),
backup,
data_path_in_backup);
data_path_in_backup,
restore_coordination);
}
}

View File

@ -226,7 +226,7 @@ public:
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
/// Extract data from the backup and put it to the storage.
RestoreTaskPtr restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
RestoreTaskPtr restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
@ -285,6 +285,8 @@ public:
// Return default or custom zookeeper name for table
String getZooKeeperName() const { return zookeeper_name; }
String getZooKeeperPath() const { return zookeeper_path; }
// Return table id, common for different replicas
String getTableSharedID() const override;

View File

@ -637,7 +637,7 @@ private:
};
RestoreTaskPtr StorageStripeLog::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &)
RestoreTaskPtr StorageStripeLog::restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());

View File

@ -54,7 +54,7 @@ public:
bool hasDataToBackup() const override { return true; }
BackupEntries backupData(ContextPtr context, const ASTs & partitions) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
protected:
StorageStripeLog(

View File

@ -3765,10 +3765,10 @@ class ClickHouseInstance:
if self.external_dirs:
for external_dir in self.external_dirs:
external_dir_abs_path = p.abspath(
p.join(self.path, external_dir.lstrip("/"))
p.join(self.cluster.instances_dir, external_dir.lstrip("/"))
)
logging.info(f"external_dir_abs_path={external_dir_abs_path}")
os.mkdir(external_dir_abs_path)
os.makedirs(external_dir_abs_path, exist_ok=True)
external_dirs_volumes += (
"- " + external_dir_abs_path + ":" + external_dir + "\n"
)

View File

@ -77,14 +77,10 @@ def test_restore_table_into_existing_table(engine):
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query(
f"RESTORE TABLE test.table INTO test.table FROM {backup_name}"
)
instance.query(f"RESTORE TABLE test.table INTO test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "200\t9900\n"
instance.query(
f"RESTORE TABLE test.table INTO test.table FROM {backup_name}"
)
instance.query(f"RESTORE TABLE test.table INTO test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "300\t14850\n"

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,158 @@
from time import sleep
import pytest
import os.path
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"],
user_configs=["configs/allow_experimental_database_replicated.xml"],
external_dirs=["/backups/"],
macros={"replica": "node1", "shard": "shard1"},
with_zookeeper=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"],
user_configs=["configs/allow_experimental_database_replicated.xml"],
external_dirs=["/backups/"],
macros={"replica": "node2", "shard": "shard1"},
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_after_test():
try:
yield
finally:
node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY")
node1.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' NO DELAY")
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}.zip')"
def get_path_to_backup(instance, backup_name):
return os.path.join(
instance.path,
"backups",
backup_name.removeprefix("Disk('backups', '").removesuffix("')"),
)
def test_replicated_table():
node1.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x UInt8, y String"
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
"ORDER BY x"
)
node1.query("INSERT INTO tbl VALUES (1, 'Don''t')")
node2.query("INSERT INTO tbl VALUES (2, 'count')")
node1.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (3, 'your')")
node2.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (4, 'chickens')")
backup_name = new_backup_name()
# Make backup on node 1.
node1.query(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} SETTINGS replica=1"
)
# Drop table on both nodes.
node1.query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
# Restore from backup on node2.
node2.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
assert node2.query("SELECT * FROM tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
def test_replicated_database():
node1.query(
"CREATE DATABASE mydb ON CLUSTER 'cluster' ENGINE=Replicated('/clickhouse/path/','{shard}','{replica}')"
)
node1.query(
"CREATE TABLE mydb.tbl(x UInt8, y String) ENGINE=ReplicatedMergeTree ORDER BY x"
)
assert node2.query("EXISTS mydb.tbl") == "1\n"
node1.query("INSERT INTO mydb.tbl VALUES (1, 'Don''t')")
node2.query("INSERT INTO mydb.tbl VALUES (2, 'count')")
node1.query("INSERT INTO mydb.tbl VALUES (3, 'your')")
node2.query("INSERT INTO mydb.tbl VALUES (4, 'chickens')")
# Make backup.
backup_name = new_backup_name()
node1.query(
f"BACKUP DATABASE mydb ON CLUSTER 'cluster' TO {backup_name} SETTINGS replica=2"
)
# Drop table on both nodes.
node1.query("DROP DATABASE mydb ON CLUSTER 'cluster' NO DELAY")
# Restore from backup on node2.
node1.query(f"RESTORE DATABASE mydb ON CLUSTER 'cluster' FROM {backup_name}")
assert node1.query("SELECT * FROM mydb.tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
assert node2.query("SELECT * FROM mydb.tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
def test_different_tables_on_nodes():
node1.query(
"CREATE TABLE tbl (`x` UInt8, `y` String) ENGINE = MergeTree ORDER BY x"
)
node2.query("CREATE TABLE tbl (`w` Int64) ENGINE = MergeTree ORDER BY w")
node1.query(
"INSERT INTO tbl VALUES (1, 'Don''t'), (2, 'count'), (3, 'your'), (4, 'chickens')"
)
node2.query("INSERT INTO tbl VALUES (-333), (-222), (-111), (0), (111)")
backup_name = new_backup_name()
node1.query(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} SETTINGS allow_storing_multiple_replicas = true"
)
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node2.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
assert node1.query("SELECT * FROM tbl") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
assert node2.query("SELECT * FROM tbl") == TSV([-333, -222, -111, 0, 111])

View File

@ -1,105 +0,0 @@
import pytest
import os.path
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"],
external_dirs=["/backups/"],
macros={"replica": "node1"},
with_zookeeper=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"],
external_dirs=["/backups/"],
macros={"replica": "node2"},
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY")
finally:
cluster.shutdown()
def create_table(instance = None):
on_cluster_clause = "" if instance else "ON CLUSTER 'cluster'"
instance_to_execute = instance if instance else node1
instance_to_execute.query(
"CREATE TABLE tbl " + on_cluster_clause + " ("
"x UInt8, y String"
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
"ORDER BY x"
)
def drop_table(instance = None):
on_cluster_clause = "" if instance else "ON CLUSTER 'cluster'"
instance_to_execute = instance if instance else node1
instance_to_execute.query(f"DROP TABLE tbl {on_cluster_clause} NO DELAY")
def insert_data(instance = None):
instance1_to_execute = instance if instance else node1
instance2_to_execute = instance if instance else node2
instance1_to_execute.query("INSERT INTO tbl VALUES (1, 'Don''t')")
instance2_to_execute.query("INSERT INTO tbl VALUES (2, 'count')")
instance1_to_execute.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (3, 'your')")
instance2_to_execute.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (4, 'chickens')")
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}.zip')"
def get_path_to_backup(instance, backup_name):
return os.path.join(
instance.path,
"backups",
backup_name.removeprefix("Disk('backups', '").removesuffix("')"),
)
def test_backup_and_restore():
create_table()
insert_data()
backup_name = new_backup_name()
# Make backup on node 1.
node1.query(f"BACKUP TABLE tbl TO {backup_name}")
# Drop table on both nodes.
drop_table()
# Restore from backup on node2.
os.link(
get_path_to_backup(node1, backup_name), get_path_to_backup(node2, backup_name)
)
node2.query(f"RESTORE TABLE tbl FROM {backup_name}")
assert node2.query("SELECT * FROM tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
# Data should be replicated to node1.
create_table(node1)
assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)