Merge pull request #39455 from vitlibar/fix-locks-add-tests

Improve synchronization between hosts in distributed backup and fix locks
This commit is contained in:
Vitaly Baranov 2022-07-27 09:02:58 +02:00 committed by GitHub
commit 16a60b5e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1468 additions and 817 deletions

View File

@ -13,20 +13,20 @@ using FileInfo = IBackupCoordination::FileInfo;
BackupCoordinationLocal::BackupCoordinationLocal() = default;
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
void BackupCoordinationLocal::setStatus(const String &, const String &, const String &)
void BackupCoordinationLocal::setStage(const String &, const String &, const String &)
{
}
void BackupCoordinationLocal::setErrorStatus(const String &, const Exception &)
void BackupCoordinationLocal::setError(const String &, const Exception &)
{
}
Strings BackupCoordinationLocal::waitStatus(const Strings &, const String &)
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &)
{
return {};
}
Strings BackupCoordinationLocal::waitStatusFor(const Strings &, const String &, UInt64)
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
{
return {};
}

View File

@ -20,10 +20,10 @@ public:
BackupCoordinationLocal();
~BackupCoordinationLocal() override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
void setErrorStatus(const String & current_host, const Exception & exception) override;
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;

View File

@ -165,55 +165,94 @@ namespace
constexpr size_t NUM_ATTEMPTS = 10;
}
BackupCoordinationRemote::BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
BackupCoordinationRemote::BackupCoordinationRemote(
const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("BackupCoordination"))
, remove_zk_nodes_in_destructor(remove_zk_nodes_in_destructor_)
{
createRootNodes();
stage_sync.emplace(
zookeeper_path_ + "/stage", [this] { return getZooKeeper(); }, &Poco::Logger::get("BackupCoordination"));
}
BackupCoordinationRemote::~BackupCoordinationRemote() = default;
BackupCoordinationRemote::~BackupCoordinationRemote()
{
try
{
if (remove_zk_nodes_in_destructor)
removeAllNodes();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
zkutil::ZooKeeperPtr BackupCoordinationRemote::getZooKeeper() const
{
std::lock_guard lock{mutex};
return getZooKeeperNoLock();
}
zkutil::ZooKeeperPtr BackupCoordinationRemote::getZooKeeperNoLock() const
{
if (!zookeeper || zookeeper->expired())
{
zookeeper = get_zookeeper();
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
zookeeper->sync(zookeeper_path);
}
return zookeeper;
}
void BackupCoordinationRemote::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_part_names", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_mutations", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_data_paths", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_access", "");
zookeeper->createIfNotExists(zookeeper_path + "/file_names", "");
zookeeper->createIfNotExists(zookeeper_path + "/file_infos", "");
zookeeper->createIfNotExists(zookeeper_path + "/archive_suffixes", "");
auto zk = getZooKeeper();
zk->createAncestors(zookeeper_path);
zk->createIfNotExists(zookeeper_path, "");
zk->createIfNotExists(zookeeper_path + "/repl_part_names", "");
zk->createIfNotExists(zookeeper_path + "/repl_mutations", "");
zk->createIfNotExists(zookeeper_path + "/repl_data_paths", "");
zk->createIfNotExists(zookeeper_path + "/repl_access", "");
zk->createIfNotExists(zookeeper_path + "/file_names", "");
zk->createIfNotExists(zookeeper_path + "/file_infos", "");
zk->createIfNotExists(zookeeper_path + "/archive_suffixes", "");
}
void BackupCoordinationRemote::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
/// Usually this function is called by the initiator when a backup is complete so we don't need the coordination anymore.
///
/// However there can be a rare situation when this function is called after an error occurs on the initiator of a query
/// while some hosts are still making the backup. Removing all the nodes will remove the parent node of the backup coordination
/// at `zookeeper_path` which might cause such hosts to stop with exception "ZNONODE". Or such hosts might still do some useless part
/// of their backup work before that. Anyway in this case backup won't be finalized (because only an initiator can do that).
auto zk = getZooKeeper();
zk->removeRecursive(zookeeper_path);
}
void BackupCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message)
void BackupCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
{
status_sync.set(current_host, new_status, message);
stage_sync->set(current_host, new_stage, message);
}
void BackupCoordinationRemote::setErrorStatus(const String & current_host, const Exception & exception)
void BackupCoordinationRemote::setError(const String & current_host, const Exception & exception)
{
status_sync.setError(current_host, exception);
stage_sync->setError(current_host, exception);
}
Strings BackupCoordinationRemote::waitStatus(const Strings & all_hosts, const String & status_to_wait)
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
{
return status_sync.wait(all_hosts, status_to_wait);
return stage_sync->wait(all_hosts, stage_to_wait);
}
Strings BackupCoordinationRemote::waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms)
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return status_sync.waitFor(all_hosts, status_to_wait, timeout_ms);
return stage_sync->waitFor(all_hosts, stage_to_wait, timeout);
}
@ -229,11 +268,11 @@ void BackupCoordinationRemote::addReplicatedPartNames(
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedPartNames() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_part_names/" + escapeForFileName(table_shared_id);
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(replica_name);
zookeeper->create(path, ReplicatedPartNames::serialize(part_names_and_checksums, table_name_for_logs), zkutil::CreateMode::Persistent);
zk->create(path, ReplicatedPartNames::serialize(part_names_and_checksums, table_name_for_logs), zkutil::CreateMode::Persistent);
}
Strings BackupCoordinationRemote::getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const
@ -255,11 +294,11 @@ void BackupCoordinationRemote::addReplicatedMutations(
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedMutations() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_mutations/" + escapeForFileName(table_shared_id);
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(replica_name);
zookeeper->create(path, ReplicatedMutations::serialize(mutations, table_name_for_logs), zkutil::CreateMode::Persistent);
zk->create(path, ReplicatedMutations::serialize(mutations, table_name_for_logs), zkutil::CreateMode::Persistent);
}
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationRemote::getReplicatedMutations(const String & table_shared_id, const String & replica_name) const
@ -279,11 +318,11 @@ void BackupCoordinationRemote::addReplicatedDataPath(
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedDataPath() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_data_paths/" + escapeForFileName(table_shared_id);
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(data_path);
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
}
Strings BackupCoordinationRemote::getReplicatedDataPaths(const String & table_shared_id) const
@ -300,18 +339,18 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
return;
replicated_tables.emplace();
auto zookeeper = get_zookeeper();
auto zk = getZooKeeperNoLock();
{
String path = zookeeper_path + "/repl_part_names";
for (const String & escaped_table_shared_id : zookeeper->getChildren(path))
for (const String & escaped_table_shared_id : zk->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
for (const String & escaped_replica_name : zookeeper->getChildren(path2))
for (const String & escaped_replica_name : zk->getChildren(path2))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto part_names = ReplicatedPartNames::deserialize(zookeeper->get(path2 + "/" + escaped_replica_name));
auto part_names = ReplicatedPartNames::deserialize(zk->get(path2 + "/" + escaped_replica_name));
replicated_tables->addPartNames(table_shared_id, part_names.table_name_for_logs, replica_name, part_names.part_names_and_checksums);
}
}
@ -319,14 +358,14 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
{
String path = zookeeper_path + "/repl_mutations";
for (const String & escaped_table_shared_id : zookeeper->getChildren(path))
for (const String & escaped_table_shared_id : zk->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
for (const String & escaped_replica_name : zookeeper->getChildren(path2))
for (const String & escaped_replica_name : zk->getChildren(path2))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto mutations = ReplicatedMutations::deserialize(zookeeper->get(path2 + "/" + escaped_replica_name));
auto mutations = ReplicatedMutations::deserialize(zk->get(path2 + "/" + escaped_replica_name));
replicated_tables->addMutations(table_shared_id, mutations.table_name_for_logs, replica_name, mutations.mutations);
}
}
@ -334,11 +373,11 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
{
String path = zookeeper_path + "/repl_data_paths";
for (const String & escaped_table_shared_id : zookeeper->getChildren(path))
for (const String & escaped_table_shared_id : zk->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
for (const String & escaped_data_path : zookeeper->getChildren(path2))
for (const String & escaped_data_path : zk->getChildren(path2))
{
String data_path = unescapeForFileName(escaped_data_path);
replicated_tables->addDataPath(table_shared_id, data_path);
@ -356,13 +395,13 @@ void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedAccessFilePath() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_access/" + escapeForFileName(access_zk_path);
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
path += "/" + AccessEntityTypeInfo::get(access_entity_type).name;
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
path += "/" + host_id;
zookeeper->createIfNotExists(path, file_path);
zk->createIfNotExists(path, file_path);
}
Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
@ -378,20 +417,20 @@ void BackupCoordinationRemote::prepareReplicatedAccess() const
return;
replicated_access.emplace();
auto zookeeper = get_zookeeper();
auto zk = getZooKeeperNoLock();
String path = zookeeper_path + "/repl_access";
for (const String & escaped_access_zk_path : zookeeper->getChildren(path))
for (const String & escaped_access_zk_path : zk->getChildren(path))
{
String access_zk_path = unescapeForFileName(escaped_access_zk_path);
String path2 = path + "/" + escaped_access_zk_path;
for (const String & type_str : zookeeper->getChildren(path2))
for (const String & type_str : zk->getChildren(path2))
{
AccessEntityType type = AccessEntityTypeInfo::parseType(type_str);
String path3 = path2 + "/" + type_str;
for (const String & host_id : zookeeper->getChildren(path3))
for (const String & host_id : zk->getChildren(path3))
{
String file_path = zookeeper->get(path3 + "/" + host_id);
String file_path = zk->get(path3 + "/" + host_id);
replicated_access->addFilePath(access_zk_path, type, host_id, file_path);
}
}
@ -401,11 +440,11 @@ void BackupCoordinationRemote::prepareReplicatedAccess() const
void BackupCoordinationRemote::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String full_path = zookeeper_path + "/file_names/" + escapeForFileName(file_info.file_name);
String size_and_checksum = serializeSizeAndChecksum(std::pair{file_info.size, file_info.checksum});
zookeeper->create(full_path, size_and_checksum, zkutil::CreateMode::Persistent);
zk->create(full_path, size_and_checksum, zkutil::CreateMode::Persistent);
if (!file_info.size)
{
@ -414,7 +453,7 @@ void BackupCoordinationRemote::addFileInfo(const FileInfo & file_info, bool & is
}
full_path = zookeeper_path + "/file_infos/" + size_and_checksum;
auto code = zookeeper->tryCreate(full_path, serializeFileInfo(file_info), zkutil::CreateMode::Persistent);
auto code = zk->tryCreate(full_path, serializeFileInfo(file_info), zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, full_path);
@ -426,15 +465,15 @@ void BackupCoordinationRemote::updateFileInfo(const FileInfo & file_info)
if (!file_info.size)
return; /// we don't keep FileInfos for empty files, nothing to update
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String size_and_checksum = serializeSizeAndChecksum(std::pair{file_info.size, file_info.checksum});
String full_path = zookeeper_path + "/file_infos/" + size_and_checksum;
for (size_t attempt = 0; attempt < NUM_ATTEMPTS; ++attempt)
{
Coordination::Stat stat;
auto new_info = deserializeFileInfo(zookeeper->get(full_path, &stat));
auto new_info = deserializeFileInfo(zk->get(full_path, &stat));
new_info.archive_suffix = file_info.archive_suffix;
auto code = zookeeper->trySet(full_path, serializeFileInfo(new_info), stat.version);
auto code = zk->trySet(full_path, serializeFileInfo(new_info), stat.version);
if (code == Coordination::Error::ZOK)
return;
bool is_last_attempt = (attempt == NUM_ATTEMPTS - 1);
@ -445,16 +484,16 @@ void BackupCoordinationRemote::updateFileInfo(const FileInfo & file_info)
std::vector<FileInfo> BackupCoordinationRemote::getAllFileInfos() const
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
std::vector<FileInfo> file_infos;
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
Strings escaped_names = zk->getChildren(zookeeper_path + "/file_names");
for (const String & escaped_name : escaped_names)
{
String size_and_checksum = zookeeper->get(zookeeper_path + "/file_names/" + escaped_name);
String size_and_checksum = zk->get(zookeeper_path + "/file_names/" + escaped_name);
UInt64 size = deserializeSizeAndChecksum(size_and_checksum).first;
FileInfo file_info;
if (size) /// we don't keep FileInfos for empty files
file_info = deserializeFileInfo(zookeeper->get(zookeeper_path + "/file_infos/" + size_and_checksum));
file_info = deserializeFileInfo(zk->get(zookeeper_path + "/file_infos/" + size_and_checksum));
file_info.file_name = unescapeForFileName(escaped_name);
file_infos.emplace_back(std::move(file_info));
}
@ -463,8 +502,8 @@ std::vector<FileInfo> BackupCoordinationRemote::getAllFileInfos() const
Strings BackupCoordinationRemote::listFiles(const String & directory, bool recursive) const
{
auto zookeeper = get_zookeeper();
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
auto zk = getZooKeeper();
Strings escaped_names = zk->getChildren(zookeeper_path + "/file_names");
String prefix = directory;
if (!prefix.empty() && !prefix.ends_with('/'))
@ -496,8 +535,8 @@ Strings BackupCoordinationRemote::listFiles(const String & directory, bool recur
bool BackupCoordinationRemote::hasFiles(const String & directory) const
{
auto zookeeper = get_zookeeper();
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
auto zk = getZooKeeper();
Strings escaped_names = zk->getChildren(zookeeper_path + "/file_names");
String prefix = directory;
if (!prefix.empty() && !prefix.ends_with('/'))
@ -515,42 +554,42 @@ bool BackupCoordinationRemote::hasFiles(const String & directory) const
std::optional<FileInfo> BackupCoordinationRemote::getFileInfo(const String & file_name) const
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String size_and_checksum;
if (!zookeeper->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
if (!zk->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
return std::nullopt;
UInt64 size = deserializeSizeAndChecksum(size_and_checksum).first;
FileInfo file_info;
if (size) /// we don't keep FileInfos for empty files
file_info = deserializeFileInfo(zookeeper->get(zookeeper_path + "/file_infos/" + size_and_checksum));
file_info = deserializeFileInfo(zk->get(zookeeper_path + "/file_infos/" + size_and_checksum));
file_info.file_name = file_name;
return file_info;
}
std::optional<FileInfo> BackupCoordinationRemote::getFileInfo(const SizeAndChecksum & size_and_checksum) const
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String file_info_str;
if (!zookeeper->tryGet(zookeeper_path + "/file_infos/" + serializeSizeAndChecksum(size_and_checksum), file_info_str))
if (!zk->tryGet(zookeeper_path + "/file_infos/" + serializeSizeAndChecksum(size_and_checksum), file_info_str))
return std::nullopt;
return deserializeFileInfo(file_info_str);
}
std::optional<SizeAndChecksum> BackupCoordinationRemote::getFileSizeAndChecksum(const String & file_name) const
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String size_and_checksum;
if (!zookeeper->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
if (!zk->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
return std::nullopt;
return deserializeSizeAndChecksum(size_and_checksum);
}
String BackupCoordinationRemote::getNextArchiveSuffix()
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/archive_suffixes/a";
String path_created;
auto code = zookeeper->tryCreate(path, "", zkutil::CreateMode::PersistentSequential, path_created);
auto code = zk->tryCreate(path, "", zkutil::CreateMode::PersistentSequential, path_created);
if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, path);
return formatArchiveSuffix(extractCounterFromSequentialNodeName(path_created));
@ -558,16 +597,11 @@ String BackupCoordinationRemote::getNextArchiveSuffix()
Strings BackupCoordinationRemote::getAllArchiveSuffixes() const
{
auto zookeeper = get_zookeeper();
Strings node_names = zookeeper->getChildren(zookeeper_path + "/archive_suffixes");
auto zk = getZooKeeper();
Strings node_names = zk->getChildren(zookeeper_path + "/archive_suffixes");
for (auto & node_name : node_names)
node_name = formatArchiveSuffix(extractCounterFromSequentialNodeName(node_name));
return node_names;
}
void BackupCoordinationRemote::drop()
{
removeAllNodes();
}
}

View File

@ -3,7 +3,7 @@
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationReplicatedAccess.h>
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <Backups/BackupCoordinationStatusSync.h>
#include <Backups/BackupCoordinationStageSync.h>
namespace DB
@ -13,13 +13,13 @@ namespace DB
class BackupCoordinationRemote : public IBackupCoordination
{
public:
BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_);
~BackupCoordinationRemote() override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
void setErrorStatus(const String & current_host, const Exception & exception) override;
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(
const String & table_shared_id,
@ -56,9 +56,9 @@ public:
String getNextArchiveSuffix() override;
Strings getAllArchiveSuffixes() const override;
void drop() override;
private:
zkutil::ZooKeeperPtr getZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeperNoLock() const;
void createRootNodes();
void removeAllNodes();
void prepareReplicatedTables() const;
@ -66,10 +66,12 @@ private:
const String zookeeper_path;
const zkutil::GetZooKeeper get_zookeeper;
const bool remove_zk_nodes_in_destructor;
BackupCoordinationStatusSync status_sync;
std::optional<BackupCoordinationStageSync> stage_sync;
mutable std::mutex mutex;
mutable zkutil::ZooKeeperPtr zookeeper;
mutable std::optional<BackupCoordinationReplicatedTables> replicated_tables;
mutable std::optional<BackupCoordinationReplicatedAccess> replicated_access;
};

View File

@ -0,0 +1,13 @@
#include <Backups/BackupCoordinationStage.h>
#include <fmt/format.h>
namespace DB
{
String BackupCoordinationStage::formatGatheringMetadata(size_t pass)
{
return fmt::format("{} ({})", GATHERING_METADATA, pass);
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <base/types.h>
namespace DB
{
namespace BackupCoordinationStage
{
/// Finding all tables and databases which we're going to put to the backup and collecting their metadata.
constexpr const char * GATHERING_METADATA = "gathering metadata";
String formatGatheringMetadata(size_t pass);
/// Making temporary hard links and prepare backup entries.
constexpr const char * EXTRACTING_DATA_FROM_TABLES = "extracting data from tables";
/// Running special tasks for replicated tables which can also prepare some backup entries.
constexpr const char * RUNNING_POST_TASKS = "running post-tasks";
/// Writing backup entries to the backup and removing temporary hard links.
constexpr const char * WRITING_BACKUP = "writing backup";
/// Finding databases and tables in the backup which we're going to restore.
constexpr const char * FINDING_TABLES_IN_BACKUP = "finding tables in backup";
/// Creating databases or finding them and checking their definitions.
constexpr const char * CREATING_DATABASES = "creating databases";
/// Creating tables or finding them and checking their definition.
constexpr const char * CREATING_TABLES = "creating tables";
/// Inserting restored data to tables.
constexpr const char * INSERTING_DATA_TO_TABLES = "inserting data to tables";
/// Coordination stage meaning that a host finished its work.
constexpr const char * COMPLETED = "completed";
}
}

View File

@ -0,0 +1,201 @@
#include <Backups/BackupCoordinationStageSync.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <base/chrono_io.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
}
BackupCoordinationStageSync::BackupCoordinationStageSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, log(log_)
{
createRootNodes();
}
void BackupCoordinationStageSync::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
}
void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message)
{
auto zookeeper = get_zookeeper();
/// Make an ephemeral node so the initiator can track if the current host is still working.
String alive_node_path = zookeeper_path + "/alive|" + current_host;
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
throw zkutil::KeeperException(code, alive_node_path);
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
zookeeper->create(zookeeper_path + "/current|" + current_host + "|" + new_stage, message, zkutil::CreateMode::Persistent);
}
void BackupCoordinationStageSync::setError(const String & current_host, const Exception & exception)
{
auto zookeeper = get_zookeeper();
WriteBufferFromOwnString buf;
writeStringBinary(current_host, buf);
writeException(exception, buf, true);
zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str());
}
Strings BackupCoordinationStageSync::wait(const Strings & all_hosts, const String & stage_to_wait)
{
return waitImpl(all_hosts, stage_to_wait, {});
}
Strings BackupCoordinationStageSync::waitFor(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return waitImpl(all_hosts, stage_to_wait, timeout);
}
namespace
{
struct UnreadyHostState
{
bool started = false;
bool alive = false;
};
}
struct BackupCoordinationStageSync::State
{
Strings results;
std::map<String, UnreadyHostState> unready_hosts;
std::optional<std::pair<String, Exception>> error;
std::optional<String> host_terminated;
};
BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState(
zkutil::ZooKeeperPtr zookeeper, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const
{
std::unordered_set<std::string_view> zk_nodes_set{zk_nodes.begin(), zk_nodes.end()};
State state;
if (zk_nodes_set.contains("error"))
{
ReadBufferFromOwnString buf{zookeeper->get(zookeeper_path + "/error")};
String host;
readStringBinary(host, buf);
state.error = std::make_pair(host, readException(buf, fmt::format("Got error from {}", host)));
return state;
}
for (const auto & host : all_hosts)
{
if (!zk_nodes_set.contains("current|" + host + "|" + stage_to_wait))
{
UnreadyHostState unready_host_state;
unready_host_state.started = zk_nodes_set.contains("started|" + host);
unready_host_state.alive = zk_nodes_set.contains("alive|" + host);
state.unready_hosts.emplace(host, unready_host_state);
if (!unready_host_state.alive && unready_host_state.started && !state.host_terminated)
state.host_terminated = host;
}
}
if (state.host_terminated || !state.unready_hosts.empty())
return state;
state.results.reserve(all_hosts.size());
for (const auto & host : all_hosts)
state.results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait));
return state;
}
Strings BackupCoordinationStageSync::waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional<std::chrono::milliseconds> timeout) const
{
if (all_hosts.empty())
return {};
/// Wait until all hosts are ready or an error happens or time is out.
auto zookeeper = get_zookeeper();
/// Set by ZooKepper when list of zk nodes have changed.
auto watch = std::make_shared<Poco::Event>();
bool use_timeout = timeout.has_value();
std::chrono::steady_clock::time_point end_of_timeout;
if (use_timeout)
end_of_timeout = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(*timeout);
State state;
String previous_unready_host; /// Used for logging: we don't want to log the same unready host again.
for (;;)
{
/// Get zk nodes and subscribe on their changes.
Strings zk_nodes = zookeeper->getChildren(zookeeper_path, nullptr, watch);
/// Read and analyze the current state of zk nodes.
state = readCurrentState(zookeeper, zk_nodes, all_hosts, stage_to_wait);
if (state.error || state.host_terminated || state.unready_hosts.empty())
break; /// Error happened or everything is ready.
/// Log that we will wait for another host.
const auto & unready_host = state.unready_hosts.begin()->first;
if (unready_host != previous_unready_host)
{
LOG_TRACE(log, "Waiting for host {}", unready_host);
previous_unready_host = unready_host;
}
/// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed.
{
if (use_timeout)
{
auto current_time = std::chrono::steady_clock::now();
if ((current_time > end_of_timeout)
|| !watch->tryWait(std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time).count()))
break;
}
else
{
watch->wait();
}
}
}
/// Rethrow an error raised originally on another host.
if (state.error)
state.error->second.rethrow();
/// Another host terminated without errors.
if (state.host_terminated)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Host {} suddenly stopped working", *state.host_terminated);
/// Something's unready, timeout is probably not enough.
if (!state.unready_hosts.empty())
{
const auto & [unready_host, unready_host_state] = *state.unready_hosts.begin();
throw Exception(
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited for host {} too long (> {}){}",
unready_host,
to_string(*timeout),
unready_host_state.started ? "" : ": Operation didn't start");
}
return state.results;
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Common/ZooKeeper/Common.h>
namespace DB
{
/// Used to coordinate hosts so all hosts would come to a specific stage at around the same time.
class BackupCoordinationStageSync
{
public:
BackupCoordinationStageSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
void set(const String & current_host, const String & new_stage, const String & message);
void setError(const String & current_host, const Exception & exception);
/// Sets the stage of the current host and waits until all hosts come to the same stage.
/// The function returns the messages all hosts set when they come to the required stage.
Strings wait(const Strings & all_hosts, const String & stage_to_wait);
/// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time.
Strings waitFor(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout);
private:
void createRootNodes();
struct State;
State readCurrentState(zkutil::ZooKeeperPtr zookeeper, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const;
Strings waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional<std::chrono::milliseconds> timeout) const;
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;
Poco::Logger * log;
};
}

View File

@ -1,182 +0,0 @@
#include <Backups/BackupCoordinationStatusSync.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <base/chrono_io.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
}
BackupCoordinationStatusSync::BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, log(log_)
{
createRootNodes();
}
void BackupCoordinationStatusSync::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
}
void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status, const String & message)
{
auto zookeeper = get_zookeeper();
zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + new_status, message);
}
void BackupCoordinationStatusSync::setError(const String & current_host, const Exception & exception)
{
auto zookeeper = get_zookeeper();
Exception exception2 = exception;
exception2.addMessage("Host {}", current_host);
WriteBufferFromOwnString buf;
writeException(exception2, buf, true);
zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str());
}
Strings BackupCoordinationStatusSync::wait(const Strings & all_hosts, const String & status_to_wait)
{
return waitImpl(all_hosts, status_to_wait, {});
}
Strings BackupCoordinationStatusSync::waitFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms)
{
return waitImpl(all_hosts, status_to_wait, timeout_ms);
}
Strings BackupCoordinationStatusSync::waitImpl(const Strings & all_hosts, const String & status_to_wait, std::optional<UInt64> timeout_ms)
{
if (all_hosts.empty())
return {};
/// Wait for other hosts.
Strings ready_hosts_results;
ready_hosts_results.resize(all_hosts.size());
std::map<String, std::vector<size_t> /* index in `ready_hosts_results` */> unready_hosts;
for (size_t i = 0; i != all_hosts.size(); ++i)
unready_hosts[all_hosts[i]].push_back(i);
std::optional<Exception> error;
auto zookeeper = get_zookeeper();
/// Process ZooKeeper's nodes and set `all_hosts_ready` or `unready_host` or `error_message`.
auto process_zk_nodes = [&](const Strings & zk_nodes)
{
for (const String & zk_node : zk_nodes)
{
if (zk_node.starts_with("remove_watch-"))
continue;
if (zk_node == "error")
{
ReadBufferFromOwnString buf{zookeeper->get(zookeeper_path + "/error")};
error = readException(buf, "", true);
break;
}
size_t separator_pos = zk_node.find('|');
if (separator_pos == String::npos)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Unexpected zk node {}", zookeeper_path + "/" + zk_node);
String host = zk_node.substr(0, separator_pos);
String status = zk_node.substr(separator_pos + 1);
auto it = unready_hosts.find(host);
if ((it != unready_hosts.end()) && (status == status_to_wait))
{
String result = zookeeper->get(zookeeper_path + "/" + zk_node);
for (size_t i : it->second)
ready_hosts_results[i] = result;
unready_hosts.erase(it);
}
}
};
/// Wait until all hosts are ready or an error happens or time is out.
std::atomic<bool> watch_set = false;
std::condition_variable watch_triggered_event;
auto watch_callback = [&](const Coordination::WatchResponse &)
{
watch_set = false; /// After it's triggered it's not set until we call getChildrenWatch() again.
watch_triggered_event.notify_all();
};
auto watch_triggered = [&] { return !watch_set; };
bool use_timeout = timeout_ms.has_value();
std::chrono::milliseconds timeout{timeout_ms.value_or(0)};
std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
std::chrono::steady_clock::duration elapsed;
std::mutex dummy_mutex;
String previous_unready_host;
while (!unready_hosts.empty() && !error)
{
watch_set = true;
Strings nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback);
process_zk_nodes(nodes);
if (!unready_hosts.empty() && !error)
{
const auto & unready_host = unready_hosts.begin()->first;
if (unready_host != previous_unready_host)
{
LOG_TRACE(log, "Waiting for host {}", unready_host);
previous_unready_host = unready_host;
}
std::unique_lock dummy_lock{dummy_mutex};
if (use_timeout)
{
elapsed = std::chrono::steady_clock::now() - start_time;
if ((elapsed > timeout) || !watch_triggered_event.wait_for(dummy_lock, timeout - elapsed, watch_triggered))
break;
}
else
watch_triggered_event.wait(dummy_lock, watch_triggered);
}
}
if (watch_set)
{
/// Remove watch by triggering it.
zookeeper->create(zookeeper_path + "/remove_watch-", "", zkutil::CreateMode::EphemeralSequential);
std::unique_lock dummy_lock{dummy_mutex};
watch_triggered_event.wait(dummy_lock, watch_triggered);
}
if (error)
error->rethrow();
if (!unready_hosts.empty())
{
throw Exception(
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited for host {} too long ({})",
unready_hosts.begin()->first,
to_string(elapsed));
}
return ready_hosts_results;
}
}

View File

@ -1,37 +0,0 @@
#pragma once
#include <Common/ZooKeeper/Common.h>
namespace DB
{
/// Used to coordinate hosts so all hosts would come to a specific status at around the same time.
class BackupCoordinationStatusSync
{
public:
BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
/// Sets the status of the current host and signal other hosts if there were other hosts waiting for that.
void set(const String & current_host, const String & new_status, const String & message);
void setError(const String & current_host, const Exception & exception);
/// Sets the status of the current host and waits until all hosts come to the same status.
/// The function returns the messages all hosts set when they come to the required status.
Strings wait(const Strings & all_hosts, const String & status_to_wait);
/// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time.
Strings waitFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms);
static constexpr const char * kErrorStatus = "error";
private:
void createRootNodes();
Strings waitImpl(const Strings & all_hosts, const String & status_to_wait, std::optional<UInt64> timeout_ms);
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;
Poco::Logger * log;
};
}

View File

@ -1,6 +1,7 @@
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationStage.h>
#include <Backups/BackupUtils.h>
#include <Backups/DDLAdjustingForBackupVisitor.h>
#include <Databases/IDatabase.h>
@ -31,20 +32,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace Stage = BackupCoordinationStage;
namespace
{
/// Finding all tables and databases which we're going to put to the backup and collecting their metadata.
constexpr const char * kGatheringMetadataStatus = "gathering metadata";
/// Making temporary hard links and prepare backup entries.
constexpr const char * kExtractingDataFromTablesStatus = "extracting data from tables";
/// Running special tasks for replicated tables which can also prepare some backup entries.
constexpr const char * kRunningPostTasksStatus = "running post-tasks";
/// Writing backup entries to the backup and removing temporary hard links.
constexpr const char * kWritingBackupStatus = "writing backup";
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
{
@ -90,7 +82,8 @@ BackupEntriesCollector::BackupEntriesCollector(
, backup_settings(backup_settings_)
, backup_coordination(backup_coordination_)
, context(context_)
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 300000))
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000))
, log(&Poco::Logger::get("BackupEntriesCollector"))
{
}
@ -100,7 +93,7 @@ BackupEntriesCollector::~BackupEntriesCollector() = default;
BackupEntries BackupEntriesCollector::run()
{
/// run() can be called onle once.
if (!current_status.empty())
if (!current_stage.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Already making backup entries");
/// Find other hosts working along with us to execute this ON CLUSTER query.
@ -123,36 +116,40 @@ BackupEntries BackupEntriesCollector::run()
makeBackupEntriesForTablesDefs();
/// Make backup entries for the data of the found tables.
setStatus(kExtractingDataFromTablesStatus);
setStage(Stage::EXTRACTING_DATA_FROM_TABLES);
makeBackupEntriesForTablesData();
/// Run all the tasks added with addPostCollectingTask().
setStatus(kRunningPostTasksStatus);
setStage(Stage::RUNNING_POST_TASKS);
runPostTasks();
/// No more backup entries or tasks are allowed after this point.
setStatus(kWritingBackupStatus);
setStage(Stage::WRITING_BACKUP);
return std::move(backup_entries);
}
Strings BackupEntriesCollector::setStatus(const String & new_status, const String & message)
Strings BackupEntriesCollector::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, "{}", toUpperFirst(new_status));
current_status = new_status;
LOG_TRACE(log, "{}", toUpperFirst(new_stage));
current_stage = new_stage;
backup_coordination->setStatus(backup_settings.host_id, new_status, message);
backup_coordination->setStage(backup_settings.host_id, new_stage, message);
if (new_status.starts_with(kGatheringMetadataStatus))
if (new_stage == Stage::formatGatheringMetadata(1))
{
auto now = std::chrono::steady_clock::now();
auto end_of_timeout = std::max(now, consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout);
return backup_coordination->waitStatusFor(
all_hosts, new_status, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - now).count());
return backup_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
}
else if (new_stage.starts_with(Stage::GATHERING_METADATA))
{
auto current_time = std::chrono::steady_clock::now();
auto end_of_timeout = std::max(current_time, consistent_metadata_snapshot_end_time);
return backup_coordination->waitForStage(
all_hosts, new_stage, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time));
}
else
{
return backup_coordination->waitStatus(all_hosts, new_status);
return backup_coordination->waitForStage(all_hosts, new_stage);
}
}
@ -173,18 +170,18 @@ void BackupEntriesCollector::calculateRootPathInBackup()
/// Finds databases and tables which we will put to the backup.
void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
{
consistent_metadata_snapshot_start_time = std::chrono::steady_clock::now();
auto end_of_timeout = consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout;
setStatus(fmt::format("{} ({})", kGatheringMetadataStatus, 1));
setStage(Stage::formatGatheringMetadata(1));
consistent_metadata_snapshot_end_time = std::chrono::steady_clock::now() + consistent_metadata_snapshot_timeout;
for (size_t pass = 1;; ++pass)
{
String new_status = fmt::format("{} ({})", kGatheringMetadataStatus, pass + 1);
String next_stage = Stage::formatGatheringMetadata(pass + 1);
std::optional<Exception> inconsistency_error;
if (tryGatherMetadataAndCompareWithPrevious(inconsistency_error))
{
/// Gathered metadata and checked consistency, cool! But we have to check that other hosts cope with that too.
auto all_hosts_results = setStatus(new_status, "consistent");
auto all_hosts_results = setStage(next_stage, "consistent");
std::optional<String> host_with_inconsistency;
std::optional<String> inconsistency_error_on_other_host;
@ -210,13 +207,13 @@ void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
else
{
/// Failed to gather metadata or something wasn't consistent. We'll let other hosts know that and try again.
setStatus(new_status, inconsistency_error->displayText());
setStage(next_stage, inconsistency_error->displayText());
}
/// Two passes is minimum (we need to compare with table names with previous ones to be sure we don't miss anything).
if (pass >= 2)
{
if (std::chrono::steady_clock::now() > end_of_timeout)
if (std::chrono::steady_clock::now() > consistent_metadata_snapshot_end_time)
inconsistency_error->rethrow();
else
LOG_WARNING(log, "{}", inconsistency_error->displayText());
@ -239,6 +236,7 @@ bool BackupEntriesCollector::tryGatherMetadataAndCompareWithPrevious(std::option
table_infos.clear();
gatherDatabasesMetadata();
gatherTablesMetadata();
lockTablesForReading();
}
catch (Exception & e)
{
@ -526,12 +524,11 @@ void BackupEntriesCollector::lockTablesForReading()
for (auto & [table_name, table_info] : table_infos)
{
auto storage = table_info.storage;
TableLockHolder table_lock;
if (storage)
{
try
{
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
table_info.table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
}
catch (Exception & e)
{
@ -712,7 +709,7 @@ void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableN
void BackupEntriesCollector::addBackupEntry(const String & file_name, BackupEntryPtr backup_entry)
{
if (current_status == kWritingBackupStatus)
if (current_stage == Stage::WRITING_BACKUP)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding backup entries is not allowed");
backup_entries.emplace_back(file_name, backup_entry);
}
@ -724,21 +721,21 @@ void BackupEntriesCollector::addBackupEntry(const std::pair<String, BackupEntryP
void BackupEntriesCollector::addBackupEntries(const BackupEntries & backup_entries_)
{
if (current_status == kWritingBackupStatus)
if (current_stage == Stage::WRITING_BACKUP)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of backup entries is not allowed");
insertAtEnd(backup_entries, backup_entries_);
}
void BackupEntriesCollector::addBackupEntries(BackupEntries && backup_entries_)
{
if (current_status == kWritingBackupStatus)
if (current_stage == Stage::WRITING_BACKUP)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of backup entries is not allowed");
insertAtEnd(backup_entries, std::move(backup_entries_));
}
void BackupEntriesCollector::addPostTask(std::function<void()> task)
{
if (current_status == kWritingBackupStatus)
if (current_stage == Stage::WRITING_BACKUP)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of post tasks is not allowed");
post_tasks.push(std::move(task));
}

View File

@ -86,12 +86,13 @@ private:
void runPostTasks();
Strings setStatus(const String & new_status, const String & message = "");
Strings setStage(const String & new_stage, const String & message = "");
const ASTBackupQuery::Elements backup_query_elements;
const BackupSettings backup_settings;
std::shared_ptr<IBackupCoordination> backup_coordination;
ContextPtr context;
std::chrono::milliseconds on_cluster_first_sync_timeout;
std::chrono::milliseconds consistent_metadata_snapshot_timeout;
Poco::Logger * log;
@ -129,8 +130,8 @@ private:
std::optional<ASTs> partitions;
};
String current_status;
std::chrono::steady_clock::time_point consistent_metadata_snapshot_start_time;
String current_stage;
std::chrono::steady_clock::time_point consistent_metadata_snapshot_end_time;
std::unordered_map<String, DatabaseInfo> database_infos;
std::unordered_map<QualifiedTableName, TableInfo> table_infos;
std::vector<std::pair<String, String>> previous_databases_metadata;

View File

@ -5,6 +5,7 @@
#include <Backups/BackupUtils.h>
#include <Backups/IBackupEntry.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupCoordinationStage.h>
#include <Backups/BackupCoordinationRemote.h>
#include <Backups/BackupCoordinationLocal.h>
#include <Backups/RestoreCoordinationRemote.h>
@ -18,7 +19,6 @@
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
@ -30,25 +30,79 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace Stage = BackupCoordinationStage;
namespace
{
/// Coordination status meaning that a host finished its work.
constexpr const char * kCompletedCoordinationStatus = "completed";
/// Sends information about the current exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendErrorToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host)
std::shared_ptr<IBackupCoordination> makeBackupCoordination(const String & coordination_zk_path, const ContextPtr & context, bool is_internal_backup)
{
if (!coordination_zk_path.empty())
{
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
return std::make_shared<BackupCoordinationRemote>(coordination_zk_path, get_zookeeper, !is_internal_backup);
}
else
{
return std::make_shared<BackupCoordinationLocal>();
}
}
std::shared_ptr<IRestoreCoordination> makeRestoreCoordination(const String & coordination_zk_path, const ContextPtr & context, bool is_internal_backup)
{
if (!coordination_zk_path.empty())
{
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
return std::make_shared<RestoreCoordinationRemote>(coordination_zk_path, get_zookeeper, !is_internal_backup);
}
else
{
return std::make_shared<RestoreCoordinationLocal>();
}
}
/// Sends information about an exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host, const Exception & exception)
{
if (!coordination)
return;
try
{
coordination->setErrorStatus(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
if (coordination)
coordination->setError(current_host, exception);
}
catch (...)
{
}
}
/// Sends information about the current exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendCurrentExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host)
{
try
{
throw;
}
catch (const Exception & e)
{
sendExceptionToCoordination(coordination, current_host, e);
}
catch (...)
{
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
}
}
/// Used to change num_active_backups.
size_t getNumActiveBackupsChange(BackupStatus status)
{
return status == BackupStatus::MAKING_BACKUP;
}
/// Used to change num_active_restores.
size_t getNumActiveRestoresChange(BackupStatus status)
{
return status == BackupStatus::RESTORING;
}
}
@ -60,7 +114,8 @@ BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threa
/// We set max_free_threads = 0 because we don't want to keep any threads if there is no BACKUP or RESTORE query running right now.
}
UUID BackupsWorker::start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context)
std::pair<UUID, bool> BackupsWorker::start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context)
{
const ASTBackupQuery & backup_query = typeid_cast<const ASTBackupQuery &>(*backup_or_restore_query);
if (backup_query.kind == ASTBackupQuery::Kind::BACKUP)
@ -70,316 +125,402 @@ UUID BackupsWorker::start(const ASTPtr & backup_or_restore_query, ContextMutable
}
UUID BackupsWorker::startMakingBackup(const ASTPtr & query, const ContextPtr & context)
std::pair<UUID, bool> BackupsWorker::startMakingBackup(const ASTPtr & query, const ContextPtr & context)
{
auto backup_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
auto backup_settings = BackupSettings::fromBackupQuery(*backup_query);
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
bool on_cluster = !backup_query->cluster.empty();
if (!backup_settings.backup_uuid)
backup_settings.backup_uuid = UUIDHelpers::generateV4();
UUID backup_uuid = *backup_settings.backup_uuid;
/// Prepare context to use.
ContextPtr context_in_use = context;
ContextMutablePtr mutable_context;
if (on_cluster || backup_settings.async)
std::shared_ptr<IBackupCoordination> backup_coordination;
if (backup_settings.internal)
{
/// For ON CLUSTER queries we will need to change some settings.
/// For ASYNC queries we have to clone the context anyway.
context_in_use = mutable_context = Context::createCopy(context);
/// The following call of makeBackupCoordination() is not essential because doBackup() will later create a backup coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startMakingBackup() other hosts will know about that.
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
}
addInfo(backup_uuid, backup_info.toString(), BackupStatus::MAKING_BACKUP, backup_settings.internal);
try
{
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
addInfo(backup_uuid, backup_settings.internal, backup_info.toString(), BackupStatus::MAKING_BACKUP);
auto job = [this,
/// Prepare context to use.
ContextPtr context_in_use = context;
ContextMutablePtr mutable_context;
bool on_cluster = !backup_query->cluster.empty();
if (on_cluster || backup_settings.async)
{
/// For ON CLUSTER queries we will need to change some settings.
/// For ASYNC queries we have to clone the context anyway.
context_in_use = mutable_context = Context::createCopy(context);
}
if (backup_settings.async)
{
backups_thread_pool.scheduleOrThrowOnError(
[this, backup_uuid, backup_query, backup_settings, backup_info, backup_coordination, context_in_use, mutable_context] {
doBackup(
backup_uuid,
backup_query,
backup_settings,
backup_info,
backup_coordination,
context_in_use,
mutable_context,
/* called_async= */ true);
});
}
else
{
doBackup(
backup_uuid,
backup_query,
backup_settings,
backup_info,
on_cluster,
backup_coordination,
context_in_use,
mutable_context](bool async) mutable
mutable_context,
/* called_async= */ false);
}
return {backup_uuid, backup_settings.internal};
}
catch (...)
{
std::optional<CurrentThread::QueryScope> query_scope;
std::shared_ptr<IBackupCoordination> backup_coordination;
SCOPE_EXIT_SAFE(if (backup_coordination && !backup_settings.internal) backup_coordination->drop(););
try
{
if (async)
{
query_scope.emplace(mutable_context);
setThreadName("BackupWorker");
}
/// Checks access rights if this is not ON CLUSTER query.
/// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.)
auto required_access = getRequiredAccessToBackup(backup_query->elements);
if (!on_cluster)
context_in_use->checkAccess(required_access);
ClusterPtr cluster;
if (on_cluster)
{
backup_query->cluster = context_in_use->getMacros()->expand(backup_query->cluster);
cluster = context_in_use->getCluster(backup_query->cluster);
backup_settings.cluster_host_ids = cluster->getHostIDs();
if (backup_settings.coordination_zk_path.empty())
{
String root_zk_path = context_in_use->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
backup_settings.coordination_zk_path = root_zk_path + "/backup-" + toString(backup_uuid);
}
}
/// Make a backup coordination.
if (!backup_settings.coordination_zk_path.empty())
{
backup_coordination = std::make_shared<BackupCoordinationRemote>(
backup_settings.coordination_zk_path,
[global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); });
}
else
{
backup_coordination = std::make_shared<BackupCoordinationLocal>();
}
/// Opens a backup for writing.
BackupFactory::CreateParams backup_create_params;
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
backup_create_params.context = context_in_use;
backup_create_params.backup_info = backup_info;
backup_create_params.base_backup_info = backup_settings.base_backup_info;
backup_create_params.compression_method = backup_settings.compression_method;
backup_create_params.compression_level = backup_settings.compression_level;
backup_create_params.password = backup_settings.password;
backup_create_params.is_internal_backup = backup_settings.internal;
backup_create_params.backup_coordination = backup_coordination;
backup_create_params.backup_uuid = backup_uuid;
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = backup_settings.shard_num;
params.only_replica_num = backup_settings.replica_num;
params.access_to_check = required_access;
backup_settings.copySettingsToQuery(*backup_query);
// executeDDLQueryOnCluster() will return without waiting for completion
mutable_context->setSetting("distributed_ddl_task_timeout", Field{0});
mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(backup_query, mutable_context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
backup_coordination->waitStatus(all_hosts, kCompletedCoordinationStatus);
}
else
{
backup_query->setCurrentDatabase(context_in_use->getCurrentDatabase());
/// Prepare backup entries.
BackupEntries backup_entries;
{
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context_in_use};
backup_entries = backup_entries_collector.run();
}
/// Write the backup entries to the backup.
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStatus(backup_settings.host_id, kCompletedCoordinationStatus, "");
}
/// Finalize backup (write its metadata).
if (!backup_settings.internal)
backup->finalizeWriting();
/// Close the backup.
backup.reset();
setStatus(backup_uuid, BackupStatus::BACKUP_COMPLETE);
}
catch (...)
{
/// Something bad happened, the backup has not built.
setStatus(backup_uuid, BackupStatus::FAILED_TO_BACKUP);
sendErrorToCoordination(backup_coordination, backup_settings.host_id);
if (!async)
throw;
}
};
if (backup_settings.async)
backups_thread_pool.scheduleOrThrowOnError([job]() mutable { job(true); });
else
job(false);
return backup_uuid;
/// Something bad happened, the backup has not built.
setStatus(backup_uuid, backup_settings.internal, BackupStatus::FAILED_TO_BACKUP);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
throw;
}
}
UUID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context)
void BackupsWorker::doBackup(
const UUID & backup_uuid,
const std::shared_ptr<ASTBackupQuery> & backup_query,
BackupSettings backup_settings,
const BackupInfo & backup_info,
std::shared_ptr<IBackupCoordination> backup_coordination,
const ContextPtr & context,
ContextMutablePtr mutable_context,
bool called_async)
{
std::optional<CurrentThread::QueryScope> query_scope;
try
{
if (called_async)
{
query_scope.emplace(mutable_context);
setThreadName("BackupWorker");
}
bool on_cluster = !backup_query->cluster.empty();
assert(mutable_context || (!on_cluster && !called_async));
/// Checks access rights if this is not ON CLUSTER query.
/// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.)
auto required_access = getRequiredAccessToBackup(backup_query->elements);
if (!on_cluster)
context->checkAccess(required_access);
ClusterPtr cluster;
if (on_cluster)
{
backup_query->cluster = context->getMacros()->expand(backup_query->cluster);
cluster = context->getCluster(backup_query->cluster);
backup_settings.cluster_host_ids = cluster->getHostIDs();
if (backup_settings.coordination_zk_path.empty())
{
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
backup_settings.coordination_zk_path = root_zk_path + "/backup-" + toString(backup_uuid);
}
}
/// Make a backup coordination.
if (!backup_coordination)
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
/// Opens a backup for writing.
BackupFactory::CreateParams backup_create_params;
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
backup_create_params.context = context;
backup_create_params.backup_info = backup_info;
backup_create_params.base_backup_info = backup_settings.base_backup_info;
backup_create_params.compression_method = backup_settings.compression_method;
backup_create_params.compression_level = backup_settings.compression_level;
backup_create_params.password = backup_settings.password;
backup_create_params.is_internal_backup = backup_settings.internal;
backup_create_params.backup_coordination = backup_coordination;
backup_create_params.backup_uuid = backup_uuid;
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = backup_settings.shard_num;
params.only_replica_num = backup_settings.replica_num;
params.access_to_check = required_access;
backup_settings.copySettingsToQuery(*backup_query);
// executeDDLQueryOnCluster() will return without waiting for completion
mutable_context->setSetting("distributed_ddl_task_timeout", Field{0});
mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(backup_query, mutable_context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
backup_coordination->waitForStage(all_hosts, Stage::COMPLETED);
}
else
{
backup_query->setCurrentDatabase(context->getCurrentDatabase());
/// Prepare backup entries.
BackupEntries backup_entries;
{
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context};
backup_entries = backup_entries_collector.run();
}
/// Write the backup entries to the backup.
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, "");
}
/// Finalize backup (write its metadata).
if (!backup_settings.internal)
backup->finalizeWriting();
/// Close the backup.
backup.reset();
LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_info.toString());
setStatus(backup_uuid, backup_settings.internal, BackupStatus::BACKUP_COMPLETE);
}
catch (...)
{
/// Something bad happened, the backup has not built.
if (called_async)
{
tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
setStatus(backup_uuid, backup_settings.internal, BackupStatus::FAILED_TO_BACKUP);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
}
else
{
/// setStatus() and sendCurrentExceptionToCoordination() will be called by startMakingBackup().
throw;
}
}
}
std::pair<UUID, bool> BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context)
{
UUID restore_uuid = UUIDHelpers::generateV4();
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
bool on_cluster = !restore_query->cluster.empty();
UUID restore_uuid = UUIDHelpers::generateV4();
/// Prepare context to use.
ContextMutablePtr context_in_use = context;
if (restore_settings.async || on_cluster)
std::shared_ptr<IRestoreCoordination> restore_coordination;
if (restore_settings.internal)
{
/// For ON CLUSTER queries we will need to change some settings.
/// For ASYNC queries we have to clone the context anyway.
context_in_use = Context::createCopy(context);
/// The following call of makeRestoreCoordination() is not essential because doRestore() will later create a restore coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startRestoring() other hosts will know about that.
restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal);
}
addInfo(restore_uuid, backup_info.toString(), BackupStatus::RESTORING, restore_settings.internal);
try
{
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
addInfo(restore_uuid, restore_settings.internal, backup_info.toString(), BackupStatus::RESTORING);
auto job = [this,
/// Prepare context to use.
ContextMutablePtr context_in_use = context;
bool on_cluster = !restore_query->cluster.empty();
if (restore_settings.async || on_cluster)
{
/// For ON CLUSTER queries we will need to change some settings.
/// For ASYNC queries we have to clone the context anyway.
context_in_use = Context::createCopy(context);
}
if (restore_settings.async)
{
backups_thread_pool.scheduleOrThrowOnError(
[this, restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use] {
doRestore(
restore_uuid,
restore_query,
restore_settings,
backup_info,
restore_coordination,
context_in_use,
/* called_async= */ true);
});
}
else
{
doRestore(
restore_uuid,
restore_query,
restore_settings,
backup_info,
on_cluster,
context_in_use](bool async) mutable
restore_coordination,
context_in_use,
/* called_async= */ false);
}
return {restore_uuid, restore_settings.internal};
}
catch (...)
{
std::optional<CurrentThread::QueryScope> query_scope;
std::shared_ptr<IRestoreCoordination> restore_coordination;
SCOPE_EXIT_SAFE(if (restore_coordination && !restore_settings.internal) restore_coordination->drop(););
try
{
if (async)
{
query_scope.emplace(context_in_use);
setThreadName("RestoreWorker");
}
/// Open the backup for reading.
BackupFactory::CreateParams backup_open_params;
backup_open_params.open_mode = IBackup::OpenMode::READ;
backup_open_params.context = context_in_use;
backup_open_params.backup_info = backup_info;
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.password = restore_settings.password;
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context_in_use->getCurrentDatabase();
/// Checks access rights if this is ON CLUSTER query.
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
if (on_cluster)
{
restore_query->cluster = context_in_use->getMacros()->expand(restore_query->cluster);
cluster = context_in_use->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
/// because different replicas can contain different set of tables and so the required access rights can differ too.
/// So the right way is pass through the entire cluster and check access for each host.
auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num);
for (const auto * address : addresses)
{
restore_settings.host_id = address->toString();
auto restore_elements = restore_query->elements;
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context_in_use};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
/// Make a restore coordination.
if (on_cluster && restore_settings.coordination_zk_path.empty())
{
String root_zk_path = context_in_use->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_uuid);
}
if (!restore_settings.coordination_zk_path.empty())
{
restore_coordination = std::make_shared<RestoreCoordinationRemote>(
restore_settings.coordination_zk_path,
[global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); });
}
else
{
restore_coordination = std::make_shared<RestoreCoordinationLocal>();
}
/// Do RESTORE.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = restore_settings.shard_num;
params.only_replica_num = restore_settings.replica_num;
restore_settings.copySettingsToQuery(*restore_query);
// executeDDLQueryOnCluster() will return without waiting for completion
context_in_use->setSetting("distributed_ddl_task_timeout", Field{0});
context_in_use->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(restore_query, context_in_use, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
restore_coordination->waitStatus(all_hosts, kCompletedCoordinationStatus);
}
else
{
restore_query->setCurrentDatabase(current_database);
/// Restore metadata and prepare data restoring tasks.
DataRestoreTasks data_restore_tasks;
{
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context_in_use};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}
/// Execute the data restoring tasks.
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStatus(restore_settings.host_id, kCompletedCoordinationStatus, "");
}
setStatus(restore_uuid, BackupStatus::RESTORED);
}
catch (...)
{
/// Something bad happened, the backup has not built.
setStatus(restore_uuid, BackupStatus::FAILED_TO_RESTORE);
sendErrorToCoordination(restore_coordination, restore_settings.host_id);
if (!async)
throw;
}
};
if (restore_settings.async)
backups_thread_pool.scheduleOrThrowOnError([job]() mutable { job(true); });
else
job(false);
return restore_uuid;
/// Something bad happened, the backup has not built.
setStatus(restore_uuid, restore_settings.internal, BackupStatus::FAILED_TO_RESTORE);
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
throw;
}
}
void BackupsWorker::addInfo(const UUID & uuid, const String & backup_name, BackupStatus status, bool internal)
void BackupsWorker::doRestore(
const UUID & restore_uuid,
const std::shared_ptr<ASTBackupQuery> & restore_query,
RestoreSettings restore_settings,
const BackupInfo & backup_info,
std::shared_ptr<IRestoreCoordination> restore_coordination,
ContextMutablePtr context,
bool called_async)
{
std::optional<CurrentThread::QueryScope> query_scope;
try
{
if (called_async)
{
query_scope.emplace(context);
setThreadName("RestoreWorker");
}
/// Open the backup for reading.
BackupFactory::CreateParams backup_open_params;
backup_open_params.open_mode = IBackup::OpenMode::READ;
backup_open_params.context = context;
backup_open_params.backup_info = backup_info;
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.password = restore_settings.password;
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context->getCurrentDatabase();
/// Checks access rights if this is ON CLUSTER query.
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
bool on_cluster = !restore_query->cluster.empty();
if (on_cluster)
{
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
cluster = context->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
/// because different replicas can contain different set of tables and so the required access rights can differ too.
/// So the right way is pass through the entire cluster and check access for each host.
auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num);
for (const auto * address : addresses)
{
restore_settings.host_id = address->toString();
auto restore_elements = restore_query->elements;
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
/// Make a restore coordination.
if (on_cluster && restore_settings.coordination_zk_path.empty())
{
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_uuid);
}
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal);
/// Do RESTORE.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = restore_settings.shard_num;
params.only_replica_num = restore_settings.replica_num;
restore_settings.copySettingsToQuery(*restore_query);
// executeDDLQueryOnCluster() will return without waiting for completion
context->setSetting("distributed_ddl_task_timeout", Field{0});
context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(restore_query, context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
restore_coordination->waitForStage(all_hosts, Stage::COMPLETED);
}
else
{
restore_query->setCurrentDatabase(current_database);
/// Restore metadata and prepare data restoring tasks.
DataRestoreTasks data_restore_tasks;
{
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}
/// Execute the data restoring tasks.
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, "");
}
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_info.toString());
setStatus(restore_uuid, restore_settings.internal, BackupStatus::RESTORED);
}
catch (...)
{
/// Something bad happened, the backup has not built.
if (called_async)
{
tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
setStatus(restore_uuid, restore_settings.internal, BackupStatus::FAILED_TO_RESTORE);
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
}
else
{
/// setStatus() and sendCurrentExceptionToCoordination() will be called by startRestoring().
throw;
}
}
}
void BackupsWorker::addInfo(const UUID & uuid, bool internal, const String & backup_name, BackupStatus status)
{
Info info;
info.uuid = uuid;
@ -387,48 +528,41 @@ void BackupsWorker::addInfo(const UUID & uuid, const String & backup_name, Backu
info.status = status;
info.status_changed_time = time(nullptr);
info.internal = internal;
std::lock_guard lock{infos_mutex};
infos[uuid] = std::move(info);
bool inserted = infos.try_emplace({uuid, internal}, std::move(info)).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pair of UUID={} and internal={} is already in use", uuid, internal);
num_active_backups += getNumActiveBackupsChange(status);
num_active_restores += getNumActiveRestoresChange(status);
}
void BackupsWorker::setStatus(const UUID & uuid, BackupStatus status)
void BackupsWorker::setStatus(const UUID & uuid, bool internal, BackupStatus status)
{
std::lock_guard lock{infos_mutex};
auto & info = infos.at(uuid);
auto it = infos.find({uuid, internal});
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown pair of UUID={} and internal={}", uuid, internal);
auto & info = it->second;
auto old_status = info.status;
info.status = status;
info.status_changed_time = time(nullptr);
if (status == BackupStatus::BACKUP_COMPLETE)
{
LOG_INFO(log, "{} {} was created successfully", (info.internal ? "Internal backup" : "Backup"), info.backup_name);
}
else if (status == BackupStatus::RESTORED)
{
LOG_INFO(log, "Restored from {} {} successfully", (info.internal ? "internal backup" : "backup"), info.backup_name);
}
else if ((status == BackupStatus::FAILED_TO_BACKUP) || (status == BackupStatus::FAILED_TO_RESTORE))
{
String start_of_message;
if (status == BackupStatus::FAILED_TO_BACKUP)
start_of_message = fmt::format("Failed to create {} {}", (info.internal ? "internal backup" : "backup"), info.backup_name);
else
start_of_message = fmt::format("Failed to restore from {} {}", (info.internal ? "internal backup" : "backup"), info.backup_name);
tryLogCurrentException(log, start_of_message);
info.error_message = getCurrentExceptionMessage(false);
info.exception = std::current_exception();
}
num_active_backups += getNumActiveBackupsChange(status) - getNumActiveBackupsChange(old_status);
num_active_restores += getNumActiveRestoresChange(status) - getNumActiveRestoresChange(old_status);
}
void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool rethrow_exception)
void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool internal, bool rethrow_exception)
{
std::unique_lock lock{infos_mutex};
status_changed.wait(lock, [&]
{
auto it = infos.find(backup_or_restore_uuid);
auto it = infos.find({backup_or_restore_uuid, internal});
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "BackupsWorker: Unknown UUID {}", toString(backup_or_restore_uuid));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown pair of UUID={} and internal={}", backup_or_restore_uuid, internal);
const auto & info = it->second;
auto current_status = info.status;
if (rethrow_exception && ((current_status == BackupStatus::FAILED_TO_BACKUP) || (current_status == BackupStatus::FAILED_TO_RESTORE)))
@ -437,12 +571,12 @@ void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool rethrow_excep
});
}
BackupsWorker::Info BackupsWorker::getInfo(const UUID & backup_or_restore_uuid) const
BackupsWorker::Info BackupsWorker::getInfo(const UUID & backup_or_restore_uuid, bool internal) const
{
std::lock_guard lock{infos_mutex};
auto it = infos.find(backup_or_restore_uuid);
auto it = infos.find({backup_or_restore_uuid, internal});
if (it == infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "BackupsWorker: Unknown UUID {}", toString(backup_or_restore_uuid));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown pair of UUID={} and internal={}", backup_or_restore_uuid, internal);
return it->second;
}
@ -457,14 +591,15 @@ std::vector<BackupsWorker::Info> BackupsWorker::getAllInfos() const
void BackupsWorker::shutdown()
{
size_t num_active_backups = backups_thread_pool.active();
size_t num_active_restores = restores_thread_pool.active();
if (!num_active_backups && !num_active_restores)
return;
LOG_INFO(log, "Waiting for {} backup and {} restore tasks to be finished", num_active_backups, num_active_restores);
bool has_active_backups_or_restores = (num_active_backups || num_active_restores);
if (has_active_backups_or_restores)
LOG_INFO(log, "Waiting for {} backups and {} restores to be finished", num_active_backups, num_active_restores);
backups_thread_pool.wait();
restores_thread_pool.wait();
LOG_INFO(log, "All backup and restore tasks have finished");
if (has_active_backups_or_restores)
LOG_INFO(log, "All backup and restore tasks have finished");
}
}

View File

@ -11,6 +11,13 @@ namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
class ASTBackupQuery;
struct BackupSettings;
struct RestoreSettings;
struct BackupInfo;
class IBackupCoordination;
class IRestoreCoordination;
/// Manager of backups and restores: executes backups and restores' threads in the background.
/// Keeps information about backups and restores started in this session.
class BackupsWorker
@ -22,11 +29,11 @@ public:
void shutdown();
/// Starts executing a BACKUP or RESTORE query. Returns UUID of the operation.
UUID start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context);
std::pair<UUID, bool> start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context);
/// Waits until a BACKUP or RESTORE query started by start() is finished.
/// The function returns immediately if the operation is already finished.
void wait(const UUID & backup_or_restore_uuid, bool rethrow_exception = true);
void wait(const UUID & backup_or_restore_uuid, bool internal, bool rethrow_exception = true);
/// Information about executing a BACKUP or RESTORE query started by calling start().
struct Info
@ -47,21 +54,32 @@ public:
bool internal = false;
};
Info getInfo(const UUID & backup_or_restore_uuid) const;
Info getInfo(const UUID & backup_or_restore_uuid, bool internal) const;
std::vector<Info> getAllInfos() const;
private:
UUID startMakingBackup(const ASTPtr & query, const ContextPtr & context);
UUID startRestoring(const ASTPtr & query, ContextMutablePtr context);
std::pair<UUID, bool> startMakingBackup(const ASTPtr & query, const ContextPtr & context);
void addInfo(const UUID & uuid, const String & backup_name, BackupStatus status, bool internal);
void setStatus(const UUID & uuid, BackupStatus status);
void doBackup(const UUID & backup_uuid, const std::shared_ptr<ASTBackupQuery> & backup_query, BackupSettings backup_settings,
const BackupInfo & backup_info, std::shared_ptr<IBackupCoordination> backup_coordination, const ContextPtr & context,
ContextMutablePtr mutable_context, bool called_async);
std::pair<UUID, bool> startRestoring(const ASTPtr & query, ContextMutablePtr context);
void doRestore(const UUID & restore_uuid, const std::shared_ptr<ASTBackupQuery> & restore_query, RestoreSettings restore_settings,
const BackupInfo & backup_info, std::shared_ptr<IRestoreCoordination> restore_coordination, ContextMutablePtr context,
bool called_async);
void addInfo(const UUID & uuid, bool internal, const String & backup_name, BackupStatus status);
void setStatus(const UUID & uuid, bool internal, BackupStatus status);
ThreadPool backups_thread_pool;
ThreadPool restores_thread_pool;
std::unordered_map<UUID, Info> infos;
std::map<std::pair<UUID, bool>, Info> infos;
std::condition_variable status_changed;
std::atomic<size_t> num_active_backups = 0;
std::atomic<size_t> num_active_restores = 0;
mutable std::mutex infos_mutex;
Poco::Logger * log;
};

View File

@ -18,11 +18,11 @@ class IBackupCoordination
public:
virtual ~IBackupCoordination() = default;
/// Sets the current status and waits for other hosts to come to this status too.
virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0;
virtual void setErrorStatus(const String & current_host, const Exception & exception) = 0;
virtual Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) = 0;
virtual Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) = 0;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
virtual void setError(const String & current_host, const Exception & exception) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
struct PartNameAndChecksum
{
@ -115,9 +115,6 @@ public:
/// Returns the list of all the archive suffixes which were generated.
virtual Strings getAllArchiveSuffixes() const = 0;
/// Removes remotely stored information.
virtual void drop() {}
};
}

View File

@ -16,11 +16,11 @@ class IRestoreCoordination
public:
virtual ~IRestoreCoordination() = default;
/// Sets the current status and waits for other hosts to come to this status too.
virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0;
virtual void setErrorStatus(const String & current_host, const Exception & exception) = 0;
virtual Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) = 0;
virtual Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) = 0;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
virtual void setError(const String & current_host, const Exception & exception) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
static constexpr const char * kErrorStatus = "error";
@ -34,9 +34,6 @@ public:
/// Sets that this replica is going to restore a ReplicatedAccessStorage.
/// The function returns false if this access storage is being already restored by another replica.
virtual bool acquireReplicatedAccessStorage(const String & access_storage_zk_path) = 0;
/// Removes remotely stored information.
virtual void drop() {}
};
}

View File

@ -7,20 +7,20 @@ namespace DB
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setStatus(const String &, const String &, const String &)
void RestoreCoordinationLocal::setStage(const String &, const String &, const String &)
{
}
void RestoreCoordinationLocal::setErrorStatus(const String &, const Exception &)
void RestoreCoordinationLocal::setError(const String &, const Exception &)
{
}
Strings RestoreCoordinationLocal::waitStatus(const Strings &, const String &)
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &)
{
return {};
}
Strings RestoreCoordinationLocal::waitStatusFor(const Strings &, const String &, UInt64)
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
{
return {};
}

View File

@ -18,11 +18,11 @@ public:
RestoreCoordinationLocal();
~RestoreCoordinationLocal() override;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
void setStatus(const String & current_host, const String & new_status, const String & message) override;
void setErrorStatus(const String & current_host, const Exception & exception) override;
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -6,57 +6,86 @@
namespace DB
{
RestoreCoordinationRemote::RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
RestoreCoordinationRemote::RestoreCoordinationRemote(
const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("RestoreCoordination"))
, remove_zk_nodes_in_destructor(remove_zk_nodes_in_destructor_)
{
createRootNodes();
stage_sync.emplace(
zookeeper_path_ + "/stage", [this] { return getZooKeeper(); }, &Poco::Logger::get("RestoreCoordination"));
}
RestoreCoordinationRemote::~RestoreCoordinationRemote() = default;
RestoreCoordinationRemote::~RestoreCoordinationRemote()
{
try
{
if (remove_zk_nodes_in_destructor)
removeAllNodes();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
zkutil::ZooKeeperPtr RestoreCoordinationRemote::getZooKeeper() const
{
std::lock_guard lock{mutex};
if (!zookeeper || zookeeper->expired())
{
zookeeper = get_zookeeper();
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
zookeeper->sync(zookeeper_path);
}
return zookeeper;
}
void RestoreCoordinationRemote::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_databases_tables_acquired", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_tables_data_acquired", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_access_storages_acquired", "");
auto zk = getZooKeeper();
zk->createAncestors(zookeeper_path);
zk->createIfNotExists(zookeeper_path, "");
zk->createIfNotExists(zookeeper_path + "/repl_databases_tables_acquired", "");
zk->createIfNotExists(zookeeper_path + "/repl_tables_data_acquired", "");
zk->createIfNotExists(zookeeper_path + "/repl_access_storages_acquired", "");
}
void RestoreCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message)
void RestoreCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
{
status_sync.set(current_host, new_status, message);
stage_sync->set(current_host, new_stage, message);
}
void RestoreCoordinationRemote::setErrorStatus(const String & current_host, const Exception & exception)
void RestoreCoordinationRemote::setError(const String & current_host, const Exception & exception)
{
status_sync.setError(current_host, exception);
stage_sync->setError(current_host, exception);
}
Strings RestoreCoordinationRemote::waitStatus(const Strings & all_hosts, const String & status_to_wait)
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
{
return status_sync.wait(all_hosts, status_to_wait);
return stage_sync->wait(all_hosts, stage_to_wait);
}
Strings RestoreCoordinationRemote::waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms)
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return status_sync.waitFor(all_hosts, status_to_wait, timeout_ms);
return stage_sync->waitFor(all_hosts, stage_to_wait, timeout);
}
bool RestoreCoordinationRemote::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_databases_tables_acquired/" + escapeForFileName(database_zk_path);
zookeeper->createIfNotExists(path, "");
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(table_name);
auto code = zookeeper->tryCreate(path, "", zkutil::CreateMode::Persistent);
auto code = zk->tryCreate(path, "", zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
@ -65,10 +94,10 @@ bool RestoreCoordinationRemote::acquireCreatingTableInReplicatedDatabase(const S
bool RestoreCoordinationRemote::acquireInsertingDataIntoReplicatedTable(const String & table_zk_path)
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_tables_data_acquired/" + escapeForFileName(table_zk_path);
auto code = zookeeper->tryCreate(path, "", zkutil::CreateMode::Persistent);
auto code = zk->tryCreate(path, "", zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
@ -77,10 +106,10 @@ bool RestoreCoordinationRemote::acquireInsertingDataIntoReplicatedTable(const St
bool RestoreCoordinationRemote::acquireReplicatedAccessStorage(const String & access_storage_zk_path)
{
auto zookeeper = get_zookeeper();
auto zk = getZooKeeper();
String path = zookeeper_path + "/repl_access_storages_acquired/" + escapeForFileName(access_storage_zk_path);
auto code = zookeeper->tryCreate(path, "", zkutil::CreateMode::Persistent);
auto code = zk->tryCreate(path, "", zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
@ -89,13 +118,15 @@ bool RestoreCoordinationRemote::acquireReplicatedAccessStorage(const String & ac
void RestoreCoordinationRemote::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
}
/// Usually this function is called by the initiator when a restore operation is complete so we don't need the coordination anymore.
///
/// However there can be a rare situation when this function is called after an error occurs on the initiator of a query
/// while some hosts are still restoring something. Removing all the nodes will remove the parent node of the restore coordination
/// at `zookeeper_path` which might cause such hosts to stop with exception "ZNONODE". Or such hosts might still do some part
/// of their restore work before that.
void RestoreCoordinationRemote::drop()
{
removeAllNodes();
auto zk = getZooKeeper();
zk->removeRecursive(zookeeper_path);
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupCoordinationStatusSync.h>
#include <Backups/BackupCoordinationStageSync.h>
namespace DB
@ -11,14 +11,14 @@ namespace DB
class RestoreCoordinationRemote : public IRestoreCoordination
{
public:
RestoreCoordinationRemote(const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper);
RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_);
~RestoreCoordinationRemote() override;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
void setStatus(const String & current_host, const String & new_status, const String & message) override;
void setErrorStatus(const String & current_host, const Exception & exception) override;
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
@ -31,10 +31,8 @@ public:
/// The function returns false if this access storage is being already restored by another replica.
bool acquireReplicatedAccessStorage(const String & access_storage_zk_path) override;
/// Removes remotely stored information.
void drop() override;
private:
zkutil::ZooKeeperPtr getZooKeeper() const;
void createRootNodes();
void removeAllNodes();
@ -42,7 +40,12 @@ private:
const String zookeeper_path;
const zkutil::GetZooKeeper get_zookeeper;
BackupCoordinationStatusSync status_sync;
const bool remove_zk_nodes_in_destructor;
std::optional<BackupCoordinationStageSync> stage_sync;
mutable std::mutex mutex;
mutable zkutil::ZooKeeperPtr zookeeper;
};
}

View File

@ -1,5 +1,6 @@
#include <Backups/RestorerFromBackup.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupCoordinationStage.h>
#include <Backups/BackupSettings.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
@ -38,20 +39,10 @@ namespace ErrorCodes
}
namespace Stage = BackupCoordinationStage;
namespace
{
/// Finding databases and tables in the backup which we're going to restore.
constexpr const char * kFindingTablesInBackupStatus = "finding tables in backup";
/// Creating databases or finding them and checking their definitions.
constexpr const char * kCreatingDatabasesStatus = "creating databases";
/// Creating tables or finding them and checking their definition.
constexpr const char * kCreatingTablesStatus = "creating tables";
/// Inserting restored data to tables.
constexpr const char * kInsertingDataToTablesStatus = "inserting data to tables";
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
{
@ -102,6 +93,7 @@ RestorerFromBackup::RestorerFromBackup(
, restore_coordination(restore_coordination_)
, backup(backup_)
, context(context_)
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, log(&Poco::Logger::get("RestorerFromBackup"))
{
@ -112,7 +104,7 @@ RestorerFromBackup::~RestorerFromBackup() = default;
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
{
/// run() can be called onle once.
if (!current_status.empty())
if (!current_stage.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Already restoring");
/// Find other hosts working along with us to execute this ON CLUSTER query.
@ -126,7 +118,7 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
findRootPathsInBackup();
/// Find all the databases and tables which we will read from the backup.
setStatus(kFindingTablesInBackupStatus);
setStage(Stage::FINDING_TABLES_IN_BACKUP);
findDatabasesAndTablesInBackup();
/// Check access rights.
@ -136,27 +128,31 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
return {};
/// Create databases using the create queries read from the backup.
setStatus(kCreatingDatabasesStatus);
setStage(Stage::CREATING_DATABASES);
createDatabases();
/// Create tables using the create queries read from the backup.
setStatus(kCreatingTablesStatus);
setStage(Stage::CREATING_TABLES);
createTables();
/// All what's left is to insert data to tables.
/// No more data restoring tasks are allowed after this point.
setStatus(kInsertingDataToTablesStatus);
setStage(Stage::INSERTING_DATA_TO_TABLES);
return getDataRestoreTasks();
}
void RestorerFromBackup::setStatus(const String & new_status, const String & message)
void RestorerFromBackup::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, "{}", toUpperFirst(new_status));
current_status = new_status;
LOG_TRACE(log, "{}", toUpperFirst(new_stage));
current_stage = new_stage;
if (restore_coordination)
{
restore_coordination->setStatus(restore_settings.host_id, new_status, message);
restore_coordination->waitStatus(all_hosts, new_status);
restore_coordination->setStage(restore_settings.host_id, new_stage, message);
if (new_stage == Stage::FINDING_TABLES_IN_BACKUP)
restore_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
else
restore_coordination->waitForStage(all_hosts, new_stage);
}
}
@ -814,14 +810,14 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
void RestorerFromBackup::addDataRestoreTask(DataRestoreTask && new_task)
{
if (current_status == kInsertingDataToTablesStatus)
if (current_stage == Stage::INSERTING_DATA_TO_TABLES)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
data_restore_tasks.push_back(std::move(new_task));
}
void RestorerFromBackup::addDataRestoreTasks(DataRestoreTasks && new_tasks)
{
if (current_status == kInsertingDataToTablesStatus)
if (current_stage == Stage::INSERTING_DATA_TO_TABLES)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
insertAtEnd(data_restore_tasks, std::move(new_tasks));
}

View File

@ -73,6 +73,7 @@ private:
std::shared_ptr<IRestoreCoordination> restore_coordination;
BackupPtr backup;
ContextMutablePtr context;
std::chrono::milliseconds on_cluster_first_sync_timeout;
std::chrono::milliseconds create_table_timeout;
Poco::Logger * log;
@ -100,7 +101,7 @@ private:
DataRestoreTasks getDataRestoreTasks();
void setStatus(const String & new_status, const String & message = "");
void setStage(const String & new_stage, const String & message = "");
struct DatabaseInfo
{
@ -124,7 +125,7 @@ private:
std::vector<QualifiedTableName> findTablesWithoutDependencies() const;
String current_status;
String current_stage;
std::unordered_map<String, DatabaseInfo> database_infos;
std::map<QualifiedTableName, TableInfo> table_infos;
std::vector<DataRestoreTask> data_restore_tasks;

View File

@ -19,18 +19,17 @@ namespace
{
Block getResultRow(const BackupsWorker::Info & info)
{
Block res_columns;
auto column_uuid = ColumnUUID::create();
column_uuid->insert(info.uuid);
res_columns.insert(0, {std::move(column_uuid), std::make_shared<DataTypeUUID>(), "uuid"});
auto column_backup_name = ColumnString::create();
column_backup_name->insert(info.backup_name);
res_columns.insert(1, {std::move(column_backup_name), std::make_shared<DataTypeString>(), "backup_name"});
auto column_status = ColumnInt8::create();
column_uuid->insert(info.uuid);
column_backup_name->insert(info.backup_name);
column_status->insert(static_cast<Int8>(info.status));
Block res_columns;
res_columns.insert(0, {std::move(column_uuid), std::make_shared<DataTypeUUID>(), "uuid"});
res_columns.insert(1, {std::move(column_backup_name), std::make_shared<DataTypeString>(), "backup_name"});
res_columns.insert(2, {std::move(column_status), std::make_shared<DataTypeEnum8>(getBackupStatusEnumValues()), "status"});
return res_columns;
@ -40,9 +39,9 @@ namespace
BlockIO InterpreterBackupQuery::execute()
{
auto & backups_worker = context->getBackupsWorker();
UUID uuid = backups_worker.start(query_ptr, context);
auto [uuid, internal] = backups_worker.start(query_ptr, context);
BlockIO res_io;
res_io.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(getResultRow(backups_worker.getInfo(uuid))));
res_io.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(getResultRow(backups_worker.getInfo(uuid, internal))));
return res_io;
}

View File

@ -304,11 +304,13 @@ def test_async():
[id, _, status] = instance.query(
f"BACKUP TABLE test.table TO {backup_name} ASYNC"
).split("\t")
assert status == "MAKING_BACKUP\n" or status == "BACKUP_COMPLETE\n"
assert_eq_with_retry(
instance,
f"SELECT status FROM system.backups WHERE uuid='{id}'",
"BACKUP_COMPLETE\n",
f"SELECT status, error FROM system.backups WHERE uuid='{id}'",
TSV([["BACKUP_COMPLETE", ""]]),
)
instance.query("DROP TABLE test.table")
@ -316,9 +318,13 @@ def test_async():
[id, _, status] = instance.query(
f"RESTORE TABLE test.table FROM {backup_name} ASYNC"
).split("\t")
assert status == "RESTORING\n" or status == "RESTORED\n"
assert_eq_with_retry(
instance, f"SELECT status FROM system.backups WHERE uuid='{id}'", "RESTORED\n"
instance,
f"SELECT status, error FROM system.backups WHERE uuid='{id}'",
TSV([["RESTORED", ""]]),
)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
@ -341,14 +347,13 @@ def test_async_backups_to_same_destination(interface):
assert_eq_with_retry(
instance,
f"SELECT count() FROM system.backups WHERE uuid IN ['{id1}', '{id2}'] AND status != 'BACKUP_COMPLETE' AND status != 'FAILED_TO_BACKUP'",
"0\n",
f"SELECT status FROM system.backups WHERE uuid IN ['{id1}', '{id2}'] AND status == 'MAKING_BACKUP'",
"",
)
assert (
instance.query(f"SELECT status FROM system.backups WHERE uuid='{id1}'")
== "BACKUP_COMPLETE\n"
)
assert instance.query(
f"SELECT status, error FROM system.backups WHERE uuid='{id1}'"
) == TSV([["BACKUP_COMPLETE", ""]])
assert (
instance.query(f"SELECT status FROM system.backups WHERE uuid='{id2}'")
@ -747,24 +752,26 @@ def test_system_users_async():
instance.query("CREATE USER u1 IDENTIFIED BY 'qwe123' SETTINGS custom_c = 3")
backup_name = new_backup_name()
[id, _, status] = instance.query(
id = instance.query(
f"BACKUP DATABASE default, TABLE system.users, TABLE system.roles, TABLE system.settings_profiles, TABLE system.row_policies, TABLE system.quotas TO {backup_name} ASYNC"
).split("\t")
).split("\t")[0]
assert_eq_with_retry(
instance,
f"SELECT status FROM system.backups WHERE uuid='{id}'",
"BACKUP_COMPLETE\n",
f"SELECT status, error FROM system.backups WHERE uuid='{id}'",
TSV([["BACKUP_COMPLETE", ""]]),
)
instance.query("DROP USER u1")
[id, _, status] = instance.query(
id = instance.query(
f"RESTORE DATABASE default, TABLE system.users, TABLE system.roles, TABLE system.settings_profiles, TABLE system.row_policies, TABLE system.quotas FROM {backup_name} ASYNC"
).split("\t")
).split("\t")[0]
assert_eq_with_retry(
instance,
f"SELECT status FROM system.backups WHERE uuid='{id}'",
"RESTORED\n",
f"SELECT status, error FROM system.backups WHERE uuid='{id}'",
TSV([["RESTORED", ""]]),
)
assert (
@ -889,8 +896,8 @@ def test_mutation():
)
instance.query("ALTER TABLE test.table UPDATE x=x+1 WHERE 1")
instance.query("ALTER TABLE test.table UPDATE x=x+1+sleep(1) WHERE 1")
instance.query("ALTER TABLE test.table UPDATE x=x+1+sleep(2) WHERE 1")
instance.query("ALTER TABLE test.table UPDATE x=x+1+sleep(3) WHERE 1")
instance.query("ALTER TABLE test.table UPDATE x=x+1+sleep(3) WHERE 1")
backup_name = new_backup_name()
instance.query(f"BACKUP TABLE test.table TO {backup_name}")

View File

@ -2,6 +2,7 @@
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_deprecated_database_ordinary>1</allow_deprecated_database_ordinary>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<backups>
<on_cluster_first_sync_timeout>1000</on_cluster_first_sync_timeout>
<consistent_metadata_snapshot_timeout>10000</consistent_metadata_snapshot_timeout>
<create_table_timeout>1000</create_table_timeout>
</backups>
</clickhouse>

View File

@ -1,5 +1,6 @@
from time import sleep
import pytest
import re
import os.path
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, assert_eq_with_retry
@ -11,10 +12,11 @@ main_configs = [
"configs/remote_servers.xml",
"configs/replicated_access_storage.xml",
"configs/backups_disk.xml",
"configs/lesser_timeouts.xml", # Default timeouts are quite big (a few minutes), the tests don't need them to be that big.
]
user_configs = [
"configs/allow_experimental_database_replicated.xml",
"configs/allow_database_types.xml",
]
node1 = cluster.add_instance(
@ -33,6 +35,7 @@ node2 = cluster.add_instance(
external_dirs=["/backups/"],
macros={"replica": "node2", "shard": "shard1"},
with_zookeeper=True,
stay_alive=True, # Necessary for the "test_stop_other_host_while_backup" test
)
@ -401,8 +404,8 @@ def test_replicated_database_async():
assert_eq_with_retry(
node1,
f"SELECT status FROM system.backups WHERE uuid='{id}'",
"BACKUP_COMPLETE\n",
f"SELECT status, error FROM system.backups WHERE uuid='{id}' AND NOT internal",
TSV([["BACKUP_COMPLETE", ""]]),
)
node1.query("DROP DATABASE mydb ON CLUSTER 'cluster' NO DELAY")
@ -414,7 +417,9 @@ def test_replicated_database_async():
assert status == "RESTORING\n" or status == "RESTORED\n"
assert_eq_with_retry(
node1, f"SELECT status FROM system.backups WHERE uuid='{id}'", "RESTORED\n"
node1,
f"SELECT status, error FROM system.backups WHERE uuid='{id}' AND NOT internal",
TSV([["RESTORED", ""]]),
)
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl")
@ -457,8 +462,8 @@ def test_async_backups_to_same_destination(interface, on_cluster):
for i in range(len(nodes)):
assert_eq_with_retry(
nodes[i],
f"SELECT count() FROM system.backups WHERE uuid='{ids[i]}' AND status != 'BACKUP_COMPLETE' AND status != 'FAILED_TO_BACKUP'",
"0\n",
f"SELECT status FROM system.backups WHERE uuid='{ids[i]}' AND status == 'MAKING_BACKUP'",
"",
)
num_completed_backups = sum(
@ -466,7 +471,7 @@ def test_async_backups_to_same_destination(interface, on_cluster):
int(
nodes[i]
.query(
f"SELECT count() FROM system.backups WHERE uuid='{ids[i]}' AND status == 'BACKUP_COMPLETE'"
f"SELECT count() FROM system.backups WHERE uuid='{ids[i]}' AND status == 'BACKUP_COMPLETE' AND NOT internal"
)
.strip()
)
@ -474,7 +479,16 @@ def test_async_backups_to_same_destination(interface, on_cluster):
]
)
if num_completed_backups != 1:
for i in range(len(nodes)):
print(
nodes[i].query(
f"SELECT status, error FROM system.backups WHERE uuid='{ids[i]}' AND NOT internal"
)
)
assert num_completed_backups == 1
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node1.query(f"RESTORE TABLE tbl FROM {backup_name}")
assert node1.query("SELECT * FROM tbl") == "1\n"
@ -749,8 +763,8 @@ def test_mutation():
node1.query("INSERT INTO tbl SELECT number, toString(number) FROM numbers(10, 5)")
node1.query("ALTER TABLE tbl UPDATE x=x+1 WHERE 1")
node1.query("ALTER TABLE tbl UPDATE x=x+1+sleep(1) WHERE 1")
node1.query("ALTER TABLE tbl UPDATE x=x+1+sleep(2) WHERE 1")
node1.query("ALTER TABLE tbl UPDATE x=x+1+sleep(3) WHERE 1")
node1.query("ALTER TABLE tbl UPDATE x=x+1+sleep(3) WHERE 1")
backup_name = new_backup_name()
node1.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}")
@ -763,3 +777,67 @@ def test_mutation():
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node1.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
def test_get_error_from_other_host():
node1.query("CREATE TABLE tbl (`x` UInt8) ENGINE = MergeTree ORDER BY x")
node1.query("INSERT INTO tbl VALUES (3)")
backup_name = new_backup_name()
expected_error = "Got error from node2.*Table default.tbl was not found"
assert re.search(
expected_error,
node1.query_and_get_error(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}"
),
)
@pytest.mark.parametrize("kill", [False, True])
def test_stop_other_host_during_backup(kill):
node1.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x UInt8"
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
"ORDER BY x"
)
node1.query("INSERT INTO tbl VALUES (3)")
node2.query("INSERT INTO tbl VALUES (5)")
backup_name = new_backup_name()
id = node1.query(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC"
).split("\t")[0]
# If kill=False the pending backup must be completed
# If kill=True the pending backup might be completed or failed
node2.stop_clickhouse(kill=kill)
assert_eq_with_retry(
node1,
f"SELECT status FROM system.backups WHERE uuid='{id}' AND status == 'MAKING_BACKUP' AND NOT internal",
"",
retry_count=100,
)
status = node1.query(
f"SELECT status FROM system.backups WHERE uuid='{id}' AND NOT internal"
).strip()
if kill:
assert status in ["BACKUP_COMPLETE", "FAILED_TO_BACKUP"]
else:
assert status == "BACKUP_COMPLETE"
node2.start_clickhouse()
if status == "BACKUP_COMPLETE":
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node1.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV([3, 5])
elif status == "FAILED_TO_BACKUP":
assert not os.path.exists(
os.path.join(get_path_to_backup(backup_name), ".backup")
)

View File

@ -0,0 +1,271 @@
from random import randint
import pytest
import os.path
import time
import concurrent
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
num_nodes = 10
def generate_cluster_def():
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/cluster_for_concurrency_test.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write("<clickhouse>\n\t<remote_servers>\n\t\t<cluster>\n\t\t\t<shard>\n")
for i in range(num_nodes):
f.write(
f"\t\t\t\t<replica>\n\t\t\t\t\t<host>node{i}</host>\n\t\t\t\t\t<port>9000</port>\n\t\t\t\t</replica>\n"
)
f.write("\t\t\t</shard>\n\t\t</cluster>\n\t</remote_servers>\n</clickhouse>")
return path
main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
user_configs = ["configs/allow_database_types.xml"]
nodes = []
for i in range(num_nodes):
nodes.append(
cluster.add_instance(
f"node{i}",
main_configs=main_configs,
user_configs=user_configs,
external_dirs=["/backups/"],
macros={"replica": f"node{i}", "shard": "shard1"},
with_zookeeper=True,
)
)
node0 = nodes[0]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_after_test():
try:
yield
finally:
node0.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY")
node0.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' NO DELAY")
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}')"
def create_and_fill_table():
node0.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x Int32"
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
"ORDER BY x"
)
for i in range(num_nodes):
nodes[i].query(f"INSERT INTO tbl VALUES ({i})")
expected_sum = num_nodes * (num_nodes - 1) // 2
def test_replicated_table():
create_and_fill_table()
backup_name = new_backup_name()
node0.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}")
node0.query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node0.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
node0.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
for i in range(num_nodes):
assert nodes[i].query("SELECT sum(x) FROM tbl") == TSV([expected_sum])
num_concurrent_backups = 4
def test_concurrent_backups_on_same_node():
create_and_fill_table()
backup_names = [new_backup_name() for _ in range(num_concurrent_backups)]
ids = []
for backup_name in backup_names:
id = node0.query(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC"
).split("\t")[0]
ids.append(id)
ids_list = "[" + ", ".join([f"'{id}'" for id in ids]) + "]"
assert_eq_with_retry(
node0,
f"SELECT status FROM system.backups WHERE status == 'MAKING_BACKUP' AND uuid IN {ids_list}",
"",
)
assert node0.query(
f"SELECT status, error FROM system.backups WHERE uuid IN {ids_list} AND NOT internal"
) == TSV([["BACKUP_COMPLETE", ""]] * num_concurrent_backups)
for backup_name in backup_names:
node0.query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node0.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
node0.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
for i in range(num_nodes):
assert nodes[i].query("SELECT sum(x) FROM tbl") == TSV([expected_sum])
def test_concurrent_backups_on_different_nodes():
create_and_fill_table()
assert num_concurrent_backups <= num_nodes
backup_names = [new_backup_name() for _ in range(num_concurrent_backups)]
ids = []
for i in range(num_concurrent_backups):
id = (
nodes[i]
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_names[i]} ASYNC")
.split("\t")[0]
)
ids.append(id)
for i in range(num_concurrent_backups):
assert_eq_with_retry(
nodes[i],
f"SELECT status FROM system.backups WHERE status == 'MAKING_BACKUP' AND uuid = '{ids[i]}'",
"",
)
for i in range(num_concurrent_backups):
assert nodes[i].query(
f"SELECT status, error FROM system.backups WHERE uuid = '{ids[i]}' AND NOT internal"
) == TSV([["BACKUP_COMPLETE", ""]])
for i in range(num_concurrent_backups):
nodes[i].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
nodes[i].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_names[i]}")
nodes[i].query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
for j in range(num_nodes):
assert nodes[j].query("SELECT sum(x) FROM tbl") == TSV([expected_sum])
@pytest.mark.parametrize(
"db_engine, table_engine",
[("Replicated", "ReplicatedMergeTree"), ("Ordinary", "MergeTree")],
)
def test_create_or_drop_tables_during_backup(db_engine, table_engine):
if db_engine == "Replicated":
db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')"
if table_engine.endswith("MergeTree"):
table_engine += " ORDER BY tuple()"
node0.query(f"CREATE DATABASE mydb ON CLUSTER 'cluster' ENGINE={db_engine}")
# Will do this test for 60 seconds
start_time = time.time()
end_time = start_time + 60
def create_table():
while time.time() < end_time:
node = nodes[randint(0, num_nodes - 1)]
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node.query(
f"CREATE TABLE IF NOT EXISTS {table_name}(x Int32) ENGINE={table_engine}"
)
node.query_and_get_answer_with_error(
f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)"
)
def drop_table():
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY")
def rename_table():
while time.time() < end_time:
table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query_and_get_answer_with_error(
f"RENAME TABLE {table_name1} TO {table_name2}"
)
def make_backup():
ids = []
while time.time() < end_time:
time.sleep(
5
) # 1 minute total, and around 5 seconds per each backup => around 12 backups should be created
backup_name = new_backup_name()
id = node0.query(
f"BACKUP DATABASE mydb ON CLUSTER 'cluster' TO {backup_name} ASYNC"
).split("\t")[0]
ids.append(id)
return ids
ids = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
ids_future = executor.submit(make_backup)
futures.append(ids_future)
futures.append(executor.submit(create_table))
futures.append(executor.submit(drop_table))
futures.append(executor.submit(rename_table))
for future in futures:
future.result()
ids = ids_future.result()
ids_list = "[" + ", ".join([f"'{id}'" for id in ids]) + "]"
for node in nodes:
assert_eq_with_retry(
node,
f"SELECT status from system.backups WHERE uuid IN {ids_list} AND (status == 'MAKING_BACKUP')",
"",
)
for node in nodes:
assert_eq_with_retry(
node,
f"SELECT status, error from system.backups WHERE uuid IN {ids_list} AND (status == 'FAILED_TO_BACKUP')",
"",
)
backup_names = {}
for node in nodes:
for id in ids:
backup_name = node.query(
f"SELECT backup_name FROM system.backups WHERE uuid='{id}' FORMAT RawBLOB"
).strip()
if backup_name:
backup_names[id] = backup_name
for id in ids:
node0.query("DROP DATABASE mydb ON CLUSTER 'cluster'")
node0.query(
f"RESTORE DATABASE mydb ON CLUSTER 'cluster' FROM {backup_names[id]}"
)