Improve gathering metadata for backup - part 5.

This commit is contained in:
Vitaly Baranov 2022-06-23 20:49:44 +02:00
parent aaf7f66549
commit 6ca400fd89
17 changed files with 317 additions and 252 deletions

View File

@ -157,19 +157,19 @@ void BackupCoordinationDistributed::removeAllNodes()
}
void BackupCoordinationDistributed::setStatus(const String & current_host, const String & new_status)
void BackupCoordinationDistributed::setStatus(const String & current_host, const String & new_status, const String & message)
{
status_sync.set(current_host, new_status);
status_sync.set(current_host, new_status, message);
}
void BackupCoordinationDistributed::setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts)
Strings BackupCoordinationDistributed::setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
status_sync.setAndWait(current_host, new_status, other_hosts);
return status_sync.setAndWait(current_host, new_status, message, all_hosts);
}
void BackupCoordinationDistributed::setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms)
Strings BackupCoordinationDistributed::setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
status_sync.setAndWaitFor(current_host, new_status, other_hosts, timeout_ms);
return status_sync.setAndWaitFor(current_host, new_status, message, all_hosts, timeout_ms);
}

View File

@ -14,9 +14,9 @@ public:
BackupCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
~BackupCoordinationDistributed() override;
void setStatus(const String & current_host, const String & new_status) override;
void setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts) override;
void setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms) override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts) override;
Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms) override;
void addReplicatedPartNames(
const String & table_zk_path,

View File

@ -258,51 +258,45 @@ void BackupCoordinationStatusSync::createRootNodes()
zookeeper->createIfNotExists(zookeeper_path, "");
}
void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status)
void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status, const String & message)
{
setImpl(current_host, new_status, {}, {});
setImpl(current_host, new_status, message, {}, {});
}
void BackupCoordinationStatusSync::setAndWait(const String & current_host, const String & new_status, const Strings & other_hosts)
Strings BackupCoordinationStatusSync::setAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
setImpl(current_host, new_status, other_hosts, {});
return setImpl(current_host, new_status, message, all_hosts, {});
}
void BackupCoordinationStatusSync::setAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms)
Strings BackupCoordinationStatusSync::setAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
setImpl(current_host, new_status, other_hosts, timeout_ms);
return setImpl(current_host, new_status, message, all_hosts, timeout_ms);
}
void BackupCoordinationStatusSync::setImpl(const String & current_host, const String & new_status, const Strings & other_hosts, const std::optional<UInt64> & timeout_ms)
Strings BackupCoordinationStatusSync::setImpl(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, const std::optional<UInt64> & timeout_ms)
{
/// Put new status to ZooKeeper.
/// Put new status to ZooKeeper.
auto zookeeper = get_zookeeper();
zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + new_status, message);
String result_status = new_status;
String message;
std::string_view error_prefix = "error: ";
bool is_error_status = new_status.starts_with(error_prefix);
if (is_error_status)
{
message = new_status.substr(error_prefix.length());
result_status = "error";
}
zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + result_status, message);
if (other_hosts.empty() || ((other_hosts.size() == 1) && (other_hosts.front() == current_host)) || is_error_status)
return;
if (all_hosts.empty() || (new_status == kErrorStatus))
return {};
if ((all_hosts.size() == 1) && (all_hosts.front() == current_host))
return {message};
/// Wait for other hosts.
/// Current stages of all hosts.
Strings ready_hosts_results;
ready_hosts_results.resize(all_hosts.size());
std::map<String, std::vector<size_t> /* index in `ready_hosts_results` */> unready_hosts;
for (size_t i = 0; i != all_hosts.size(); ++i)
unready_hosts[all_hosts[i]].push_back(i);
std::optional<String> host_with_error;
std::optional<String> error_message;
std::map<String, String> unready_hosts;
for (const String & host : other_hosts)
unready_hosts.emplace(host, "");
/// Process ZooKeeper's nodes and set `all_hosts_ready` or `unready_host` or `error_message`.
auto process_zk_nodes = [&](const Strings & zk_nodes)
{
@ -316,18 +310,19 @@ void BackupCoordinationStatusSync::setImpl(const String & current_host, const St
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Unexpected zk node {}", zookeeper_path + "/" + zk_node);
String host = zk_node.substr(0, separator_pos);
String status = zk_node.substr(separator_pos + 1);
if (status == "error")
if (status == kErrorStatus)
{
host_with_error = host;
error_message = zookeeper->get(zookeeper_path + "/" + zk_node);
return;
}
auto it = unready_hosts.find(host);
if (it != unready_hosts.end())
if ((it != unready_hosts.end()) && (status == new_status))
{
it->second = status;
if (status == result_status)
unready_hosts.erase(it);
String result = zookeeper->get(zookeeper_path + "/" + zk_node);
for (size_t i : it->second)
ready_hosts_results[i] = result;
unready_hosts.erase(it);
}
}
};
@ -390,6 +385,8 @@ void BackupCoordinationStatusSync::setImpl(const String & current_host, const St
unready_hosts.begin()->first,
to_string(elapsed));
}
return ready_hosts_results;
}
}

View File

@ -63,13 +63,15 @@ class BackupCoordinationStatusSync
public:
BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
void set(const String & current_host, const String & new_status);
void setAndWait(const String & current_host, const String & new_status, const Strings & other_hosts);
void setAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms);
void set(const String & current_host, const String & new_status, const String & message);
Strings setAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts);
Strings setAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms);
static constexpr const char * kErrorStatus = "error";
private:
void createRootNodes();
void setImpl(const String & current_host, const String & new_status, const Strings & other_hosts, const std::optional<UInt64> & timeout_ms);
Strings setImpl(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, const std::optional<UInt64> & timeout_ms);
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;

View File

@ -13,16 +13,18 @@ using FileInfo = IBackupCoordination::FileInfo;
BackupCoordinationLocal::BackupCoordinationLocal() = default;
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
void BackupCoordinationLocal::setStatus(const String &, const String &)
void BackupCoordinationLocal::setStatus(const String &, const String &, const String &)
{
}
void BackupCoordinationLocal::setStatusAndWait(const String &, const String &, const Strings &)
Strings BackupCoordinationLocal::setStatusAndWait(const String &, const String &, const String &, const Strings &)
{
return {};
}
void BackupCoordinationLocal::setStatusAndWaitFor(const String &, const String &, const Strings &, UInt64)
Strings BackupCoordinationLocal::setStatusAndWaitFor(const String &, const String &, const String &, const Strings &, UInt64)
{
return {};
}
void BackupCoordinationLocal::addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name, const std::vector<PartNameAndChecksum> & part_names_and_checksums)

View File

@ -19,9 +19,9 @@ public:
BackupCoordinationLocal();
~BackupCoordinationLocal() override;
void setStatus(const String & current_host, const String & new_status) override;
void setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts) override;
void setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms) override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts) override;
Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms) override;
void addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;

View File

@ -1,6 +1,7 @@
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupUtils.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
@ -9,6 +10,7 @@
#include <Storages/IStorage.h>
#include <base/chrono_io.h>
#include <base/insertAtEnd.h>
#include <base/sleep.h>
#include <Common/escapeForFileName.h>
#include <boost/range/algorithm/copy.hpp>
#include <filesystem>
@ -29,23 +31,20 @@ namespace ErrorCodes
namespace
{
/// Initial status.
constexpr const char kPreparingStatus[] = "preparing";
/// Finding all tables and databases which we're going to put to the backup and collecting their metadata.
constexpr const char kGatheringMetadataStatus[] = "gathering metadata";
constexpr const char * kGatheringMetadataStatus = "gathering metadata";
/// Making temporary hard links and prepare backup entries.
constexpr const char kExtractingDataFromTablesStatus[] = "extracting data from tables";
constexpr const char * kExtractingDataFromTablesStatus = "extracting data from tables";
/// Running special tasks for replicated tables which can also prepare some backup entries.
constexpr const char kRunningPostTasksStatus[] = "running post-tasks";
constexpr const char * kRunningPostTasksStatus = "running post-tasks";
/// Writing backup entries to the backup and removing temporary hard links.
constexpr const char kWritingBackupStatus[] = "writing backup";
constexpr const char * kWritingBackupStatus = "writing backup";
/// Prefix for error statuses.
constexpr const char kErrorStatus[] = "error: ";
/// Error status.
constexpr const char * kErrorStatus = BackupCoordinationStatusSync::kErrorStatus;
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
@ -67,6 +66,19 @@ namespace
str[0] = std::toupper(str[0]);
return str;
}
/// How long we should sleep after finding an inconsistency error.
std::chrono::milliseconds getSleepTimeAfterInconsistencyError(size_t pass)
{
size_t ms;
if (pass == 1) /* pass is 1-based */
ms = 0;
else if ((pass % 10) != 1)
ms = 0;
else
ms = 1000;
return std::chrono::milliseconds{ms};
}
}
@ -74,36 +86,37 @@ BackupEntriesCollector::BackupEntriesCollector(
const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
std::shared_ptr<IBackupCoordination> backup_coordination_,
const ContextPtr & context_,
std::chrono::seconds timeout_)
const ContextPtr & context_)
: backup_query_elements(backup_query_elements_)
, backup_settings(backup_settings_)
, backup_coordination(backup_coordination_)
, context(context_)
, timeout(timeout_)
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 300000))
, log(&Poco::Logger::get("BackupEntriesCollector"))
, current_status(kPreparingStatus)
{
}
BackupEntriesCollector::~BackupEntriesCollector() = default;
BackupEntries BackupEntriesCollector::getBackupEntries()
BackupEntries BackupEntriesCollector::run()
{
try
{
/// getBackupEntries() must not be called multiple times.
if (current_status != kPreparingStatus)
/// run() can be called onle once.
if (!current_status.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Already making backup entries");
/// Calculate the root path for collecting backup entries, it's either empty or has the format "shards/<shard_num>/replicas/<replica_num>/".
calculateRootPathInBackup();
/// Find other hosts working along with us to execute this ON CLUSTER query.
all_hosts
= BackupSettings::Util::filterHostIDs(backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
/// Do renaming in the create queries according to the renaming config.
renaming_map = makeRenamingMapFromBackupQuery(backup_query_elements);
/// Calculate the root path for collecting backup entries, it's either empty or has the format "shards/<shard_num>/replicas/<replica_num>/".
calculateRootPathInBackup();
/// Find databases and tables which we're going to put to the backup.
setStatus(kGatheringMetadataStatus);
gatherMetadataAndCheckConsistency();
/// Make backup entries for the definitions of the found databases.
@ -129,7 +142,7 @@ BackupEntries BackupEntriesCollector::getBackupEntries()
{
try
{
setStatus(kErrorStatus + getCurrentExceptionMessage(false));
setStatus(kErrorStatus, getCurrentExceptionMessage(false));
}
catch (...)
{
@ -138,21 +151,34 @@ BackupEntries BackupEntriesCollector::getBackupEntries()
}
}
void BackupEntriesCollector::setStatus(const String & new_status)
Strings BackupEntriesCollector::setStatus(const String & new_status, const String & message)
{
bool is_error_status = new_status.starts_with(kErrorStatus);
if (is_error_status)
if (new_status == kErrorStatus)
{
LOG_ERROR(log, "{} failed with {}", toUpperFirst(current_status), new_status);
backup_coordination->setStatus(backup_settings.host_id, new_status);
LOG_ERROR(log, "{} failed with error: {}", toUpperFirst(current_status), message);
backup_coordination->setStatus(backup_settings.host_id, new_status, message);
return {};
}
else
{
LOG_TRACE(log, "{}", toUpperFirst(new_status));
current_status = new_status;
auto all_hosts
= BackupSettings::Util::filterHostIDs(backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
backup_coordination->setStatusAndWait(backup_settings.host_id, new_status, all_hosts);
if (new_status.starts_with(kGatheringMetadataStatus))
{
auto now = std::chrono::steady_clock::now();
auto end_of_timeout = std::max(now, consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout);
return backup_coordination->setStatusAndWaitFor(
backup_settings.host_id,
new_status,
message,
all_hosts,
std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - now).count());
}
else
{
return backup_coordination->setStatusAndWait(backup_settings.host_id, new_status, message, all_hosts);
}
}
}
@ -173,45 +199,87 @@ void BackupEntriesCollector::calculateRootPathInBackup()
/// Finds databases and tables which we will put to the backup.
void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
{
bool use_timeout = (timeout.count() >= 0);
auto start_time = std::chrono::steady_clock::now();
consistent_metadata_snapshot_start_time = std::chrono::steady_clock::now();
auto end_of_timeout = consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout;
setStatus(fmt::format("{} ({})", kGatheringMetadataStatus, 1));
for (size_t pass = 1;; ++pass)
{
try
String new_status = fmt::format("{} ({})", kGatheringMetadataStatus, pass + 1);
std::optional<Exception> inconsistency_error;
if (tryGatherMetadataAndCompareWithPrevious(inconsistency_error))
{
/// Collect information about databases and tables specified in the BACKUP query.
database_infos.clear();
table_infos.clear();
gatherDatabasesMetadata();
gatherTablesMetadata();
/// We have to check consistency of collected information to protect from the case when some table or database is
/// renamed during this collecting making the collected information invalid.
auto comparing_error = compareWithPrevious();
if (!comparing_error)
break; /// no error, everything's fine
/// Gathered metadata and checked consistency, cool! But we have to check that other hosts cope with that too.
auto all_hosts_results = setStatus(new_status, "consistent");
if (pass >= 2) /// Two passes is minimum (we need to compare with table names with previous ones to be sure we don't miss anything).
throw *comparing_error;
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP)
throw;
std::optional<String> host_with_inconsistency;
std::optional<String> inconsistency_error_on_other_host;
for (size_t i = 0; i != all_hosts.size(); ++i)
{
if ((i < all_hosts_results.size()) && (all_hosts_results[i] != "consistent"))
{
host_with_inconsistency = all_hosts[i];
inconsistency_error_on_other_host = all_hosts_results[i];
break;
}
}
auto elapsed = std::chrono::steady_clock::now() - start_time;
e.addMessage("Couldn't gather tables and databases to make a backup (pass #{}, elapsed {})", pass, to_string(elapsed));
if (use_timeout && (elapsed > timeout))
throw;
else
LOG_WARNING(log, "{}", e.displayText());
if (!host_with_inconsistency)
break; /// All hosts managed to gather metadata and everything is consistent, so we can go further to writing the backup.
inconsistency_error = Exception{
ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP,
"Found inconsistency on host {}: {}",
*host_with_inconsistency,
*inconsistency_error_on_other_host};
}
else
{
/// Failed to gather metadata or something wasn't consistent. We'll let other hosts know that and try again.
setStatus(new_status, inconsistency_error->displayText());
}
/// Two passes is minimum (we need to compare with table names with previous ones to be sure we don't miss anything).
if (pass >= 2)
{
if (std::chrono::steady_clock::now() > end_of_timeout)
inconsistency_error->rethrow();
else
LOG_WARNING(log, "{}", inconsistency_error->displayText());
}
auto sleep_time = getSleepTimeAfterInconsistencyError(pass);
if (sleep_time.count() > 0)
sleepForNanoseconds(std::chrono::duration_cast<std::chrono::nanoseconds>(sleep_time).count());
}
LOG_INFO(log, "Will backup {} databases and {} tables", database_infos.size(), table_infos.size());
}
bool BackupEntriesCollector::tryGatherMetadataAndCompareWithPrevious(std::optional<Exception> & inconsistency_error)
{
try
{
/// Collect information about databases and tables specified in the BACKUP query.
database_infos.clear();
table_infos.clear();
gatherDatabasesMetadata();
gatherTablesMetadata();
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP)
throw;
inconsistency_error = e;
return false;
}
/// We have to check consistency of collected information to protect from the case when some table or database is
/// renamed during this collecting making the collected information invalid.
return compareWithPrevious(inconsistency_error);
}
void BackupEntriesCollector::gatherDatabasesMetadata()
{
/// Collect information about databases and tables specified in the BACKUP query.
@ -465,7 +533,7 @@ void BackupEntriesCollector::lockTablesForReading()
}
/// Check consistency of collected information about databases and tables.
std::optional<Exception> BackupEntriesCollector::compareWithPrevious()
bool BackupEntriesCollector::compareWithPrevious(std::optional<Exception> & inconsistency_error)
{
/// We need to scan tables at least twice to be sure that we haven't missed any table which could be renamed
/// while we were scanning.
@ -476,60 +544,64 @@ std::optional<Exception> BackupEntriesCollector::compareWithPrevious()
if (previous_database_names != database_names)
{
std::optional<Exception> comparing_error;
bool error_message_ready = false;
for (const auto & database_name : database_names)
{
if (!previous_database_names.contains(database_name))
{
comparing_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Database {} were added during scanning", backQuoteIfNeed(database_name)};
inconsistency_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Database {} were added during scanning", backQuoteIfNeed(database_name)};
error_message_ready = true;
break;
}
}
if (!comparing_error)
if (!error_message_ready)
{
for (const auto & database_name : previous_database_names)
{
if (!database_names.contains(database_name))
{
comparing_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Database {} were removed during scanning", backQuoteIfNeed(database_name)};
inconsistency_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Database {} were removed during scanning", backQuoteIfNeed(database_name)};
error_message_ready = true;
break;
}
}
}
assert(comparing_error);
assert(error_message_ready);
previous_database_names = std::move(database_names);
previous_table_names = std::move(table_names);
return comparing_error;
return false;
}
if (previous_table_names != table_names)
{
std::optional<Exception> comparing_error;
bool error_message_ready = false;
for (const auto & table_name : table_names)
{
if (!previous_table_names.contains(table_name))
{
comparing_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "{} were added during scanning", tableNameWithTypeToString(table_name.database, table_name.table, true)};
inconsistency_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "{} were added during scanning", tableNameWithTypeToString(table_name.database, table_name.table, true)};
error_message_ready = true;
break;
}
}
if (!comparing_error)
if (!error_message_ready)
{
for (const auto & table_name : previous_table_names)
{
if (!table_names.contains(table_name))
{
comparing_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "{} were removed during scanning", tableNameWithTypeToString(table_name.database, table_name.table, true)};
inconsistency_error = Exception{ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "{} were removed during scanning", tableNameWithTypeToString(table_name.database, table_name.table, true)};
error_message_ready = true;
break;
}
}
}
assert(comparing_error);
assert(error_message_ready);
previous_table_names = std::move(table_names);
return comparing_error;
return false;
}
return {};
return true;
}
/// Make backup entries for all the definitions of all the databases found.

View File

@ -27,26 +27,25 @@ public:
BackupEntriesCollector(const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
std::shared_ptr<IBackupCoordination> backup_coordination_,
const ContextPtr & context_,
std::chrono::seconds timeout_ = std::chrono::seconds(-1) /* no timeout */);
const ContextPtr & context_);
~BackupEntriesCollector();
/// Collects backup entries and returns the result.
/// This function first generates a list of databases and then call IDatabase::backup() for each database from this list.
/// At this moment IDatabase::backup() calls IStorage::backup() and they both call addBackupEntry() to build a list of backup entries.
BackupEntries getBackupEntries();
/// This function first generates a list of databases and then call IDatabase::getTablesForBackup() for each database from this list.
/// Then it calls IStorage::backupData() to build a list of backup entries.
BackupEntries run();
const BackupSettings & getBackupSettings() const { return backup_settings; }
std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; }
ContextPtr getContext() const { return context; }
/// Adds a backup entry which will be later returned by getBackupEntries().
/// These function can be called by implementations of IStorage::backup() in inherited storage classes.
/// Adds a backup entry which will be later returned by run().
/// These function can be called by implementations of IStorage::backupData() in inherited storage classes.
void addBackupEntry(const String & file_name, BackupEntryPtr backup_entry);
void addBackupEntries(const BackupEntries & backup_entries_);
void addBackupEntries(BackupEntries && backup_entries_);
/// Adds a function which must be called after all IStorage::backup() have finished their work on all hosts.
/// Adds a function which must be called after all IStorage::backupData() have finished their work on all hosts.
/// This function is designed to help making a consistent in some complex cases like
/// 1) we need to join (in a backup) the data of replicated tables gathered on different hosts.
void addPostTask(std::function<void()> task);
@ -59,6 +58,8 @@ private:
void gatherMetadataAndCheckConsistency();
bool tryGatherMetadataAndCompareWithPrevious(std::optional<Exception> & inconsistency_error);
void gatherDatabasesMetadata();
void gatherDatabaseMetadata(
@ -73,25 +74,25 @@ private:
void gatherTablesMetadata();
void lockTablesForReading();
std::optional<Exception> compareWithPrevious();
bool compareWithPrevious(std::optional<Exception> & inconsistency_error);
void makeBackupEntriesForDatabasesDefs();
void makeBackupEntriesForTablesDefs();
void makeBackupEntriesForTablesData();
void runPostTasks();
void setStatus(const String & new_status);
Strings setStatus(const String & new_status, const String & message = "");
const ASTBackupQuery::Elements backup_query_elements;
const BackupSettings backup_settings;
std::shared_ptr<IBackupCoordination> backup_coordination;
ContextPtr context;
std::chrono::seconds timeout;
std::chrono::milliseconds consistent_metadata_snapshot_timeout;
Poco::Logger * log;
String current_status;
std::filesystem::path root_path_in_backup;
Strings all_hosts;
DDLRenamingMap renaming_map;
std::filesystem::path root_path_in_backup;
struct DatabaseInfo
{
@ -122,6 +123,8 @@ private:
std::optional<ASTs> partitions;
};
String current_status;
std::chrono::steady_clock::time_point consistent_metadata_snapshot_start_time;
std::unordered_map<String, DatabaseInfo> database_infos;
std::map<QualifiedTableName, TableInfo> table_infos;
std::set<String> previous_database_names;

View File

@ -166,9 +166,8 @@ UUID BackupsWorker::startMakingBackup(const ASTPtr & query, const ContextPtr & c
BackupEntries backup_entries;
{
auto timeout = std::chrono::seconds{context_in_use->getConfigRef().getInt("backups.backup_prepare_timeout", -1)};
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context_in_use, timeout};
backup_entries = backup_entries_collector.getBackupEntries();
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context_in_use};
backup_entries = backup_entries_collector.run();
}
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
@ -272,8 +271,8 @@ UUID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr conte
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context_in_use, {}};
dummy_restorer.checkAccessOnly();
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context_in_use};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
@ -325,11 +324,9 @@ UUID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr conte
DataRestoreTasks data_restore_tasks;
{
auto timeout = std::chrono::seconds{context_in_use->getConfigRef().getInt("backups.restore_metadata_timeout", -1)};
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context_in_use, timeout};
restorer.restoreMetadata();
data_restore_tasks = restorer.getDataRestoreTasks();
backup, context_in_use};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);

View File

@ -14,9 +14,9 @@ public:
virtual ~IBackupCoordination() = default;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
virtual void setStatus(const String & current_host, const String & new_status) = 0;
virtual void setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts) = 0;
virtual void setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms) = 0;
virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0;
virtual Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts) = 0;
virtual Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts, UInt64 timeout_ms) = 0;
struct PartNameAndChecksum
{

View File

@ -14,9 +14,9 @@ public:
virtual ~IRestoreCoordination() = default;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
virtual void setStatus(const String & current_host, const String & new_status) = 0;
virtual void setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts) = 0;
virtual void setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms) = 0;
virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0;
virtual Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts) = 0;
virtual Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts, UInt64 timeout_ms) = 0;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
virtual bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) = 0;

View File

@ -26,19 +26,19 @@ void RestoreCoordinationDistributed::createRootNodes()
zookeeper->createIfNotExists(zookeeper_path + "/repl_access_storages_acquired", "");
}
void RestoreCoordinationDistributed::setStatus(const String & current_host, const String & new_status)
void RestoreCoordinationDistributed::setStatus(const String & current_host, const String & new_status, const String & message)
{
status_sync.set(current_host, new_status);
status_sync.set(current_host, new_status, message);
}
void RestoreCoordinationDistributed::setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts)
Strings RestoreCoordinationDistributed::setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
status_sync.setAndWait(current_host, new_status, other_hosts);
return status_sync.setAndWait(current_host, new_status, message, all_hosts);
}
void RestoreCoordinationDistributed::setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms)
Strings RestoreCoordinationDistributed::setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
status_sync.setAndWaitFor(current_host, new_status, other_hosts, timeout_ms);
return status_sync.setAndWaitFor(current_host, new_status, message, all_hosts, timeout_ms);
}
bool RestoreCoordinationDistributed::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)

View File

@ -15,9 +15,9 @@ public:
~RestoreCoordinationDistributed() override;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
void setStatus(const String & current_host, const String & new_status) override;
void setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts) override;
void setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms) override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts) override;
Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -7,16 +7,18 @@ namespace DB
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setStatus(const String &, const String &)
void RestoreCoordinationLocal::setStatus(const String &, const String &, const String &)
{
}
void RestoreCoordinationLocal::setStatusAndWait(const String &, const String &, const Strings &)
Strings RestoreCoordinationLocal::setStatusAndWait(const String &, const String &, const String &, const Strings &)
{
return {};
}
void RestoreCoordinationLocal::setStatusAndWaitFor(const String &, const String &, const Strings &, UInt64)
Strings RestoreCoordinationLocal::setStatusAndWaitFor(const String &, const String &, const String &, const Strings &, UInt64)
{
return {};
}
bool RestoreCoordinationLocal::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)

View File

@ -18,9 +18,9 @@ public:
~RestoreCoordinationLocal() override;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
void setStatus(const String & current_host, const String & new_status) override;
void setStatusAndWait(const String & current_host, const String & new_status, const Strings & other_hosts) override;
void setStatusAndWaitFor(const String & current_host, const String & new_status, const Strings & other_hosts, UInt64 timeout_ms) override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts) override;
Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -1,5 +1,6 @@
#include <Backups/RestorerFromBackup.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupSettings.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
@ -39,23 +40,20 @@ namespace ErrorCodes
namespace
{
/// Initial status.
constexpr const char kPreparingStatus[] = "preparing";
/// Finding databases and tables in the backup which we're going to restore.
constexpr const char kFindingTablesInBackupStatus[] = "finding tables in backup";
constexpr const char * kFindingTablesInBackupStatus = "finding tables in backup";
/// Creating databases or finding them and checking their definitions.
constexpr const char kCreatingDatabasesStatus[] = "creating databases";
constexpr const char * kCreatingDatabasesStatus = "creating databases";
/// Creating tables or finding them and checking their definition.
constexpr const char kCreatingTablesStatus[] = "creating tables";
constexpr const char * kCreatingTablesStatus = "creating tables";
/// Inserting restored data to tables.
constexpr const char kInsertingDataToTablesStatus[] = "inserting data to tables";
constexpr const char * kInsertingDataToTablesStatus = "inserting data to tables";
/// Prefix for error statuses.
constexpr const char kErrorStatus[] = "error: ";
/// Error status.
constexpr const char * kErrorStatus = BackupCoordinationStatusSync::kErrorStatus;
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
@ -107,46 +105,37 @@ RestorerFromBackup::RestorerFromBackup(
const RestoreSettings & restore_settings_,
std::shared_ptr<IRestoreCoordination> restore_coordination_,
const BackupPtr & backup_,
const ContextMutablePtr & context_,
std::chrono::seconds timeout_)
const ContextMutablePtr & context_)
: restore_query_elements(restore_query_elements_)
, restore_settings(restore_settings_)
, restore_coordination(restore_coordination_)
, backup(backup_)
, context(context_)
, timeout(timeout_)
, create_table_timeout_ms(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, log(&Poco::Logger::get("RestorerFromBackup"))
, current_status(kPreparingStatus)
{
}
RestorerFromBackup::~RestorerFromBackup() = default;
void RestorerFromBackup::restoreMetadata()
{
run(/* only_check_access= */ false);
}
void RestorerFromBackup::checkAccessOnly()
{
run(/* only_check_access= */ true);
}
void RestorerFromBackup::run(bool only_check_access)
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
{
try
{
/// restoreMetadata() must not be called multiple times.
if (current_status != kPreparingStatus)
/// run() can be called onle once.
if (!current_status.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Already restoring");
/// Calculate the root path in the backup for restoring, it's either empty or has the format "shards/<shard_num>/replicas/<replica_num>/".
findRootPathsInBackup();
/// Find other hosts working along with us to execute this ON CLUSTER query.
all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
/// Do renaming in the create queries according to the renaming config.
renaming_map = makeRenamingMapFromBackupQuery(restore_query_elements);
/// Calculate the root path in the backup for restoring, it's either empty or has the format "shards/<shard_num>/replicas/<replica_num>/".
findRootPathsInBackup();
/// Find all the databases and tables which we will read from the backup.
setStatus(kFindingTablesInBackupStatus);
findDatabasesAndTablesInBackup();
@ -154,8 +143,8 @@ void RestorerFromBackup::run(bool only_check_access)
/// Check access rights.
checkAccessForObjectsFoundInBackup();
if (only_check_access)
return;
if (mode == Mode::CHECK_ACCESS_ONLY)
return {};
/// Create databases using the create queries read from the backup.
setStatus(kCreatingDatabasesStatus);
@ -168,13 +157,14 @@ void RestorerFromBackup::run(bool only_check_access)
/// All what's left is to insert data to tables.
/// No more data restoring tasks are allowed after this point.
setStatus(kInsertingDataToTablesStatus);
return getDataRestoreTasks();
}
catch (...)
{
try
{
/// Other hosts should know that we've encountered an error.
setStatus(kErrorStatus + getCurrentExceptionMessage(false));
setStatus(kErrorStatus, getCurrentExceptionMessage(false));
}
catch (...)
{
@ -183,55 +173,20 @@ void RestorerFromBackup::run(bool only_check_access)
}
}
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::getDataRestoreTasks()
void RestorerFromBackup::setStatus(const String & new_status, const String & message)
{
if (current_status != kInsertingDataToTablesStatus)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata wasn't restored");
if (data_restore_tasks.empty() && !access_restore_task)
return {};
LOG_TRACE(log, "Will insert data to tables");
/// Storages and table locks must exist while we're executing data restoring tasks.
auto storages = std::make_shared<std::vector<StoragePtr>>();
auto table_locks = std::make_shared<std::vector<TableLockHolder>>();
storages->reserve(table_infos.size());
table_locks->reserve(table_infos.size());
for (const auto & table_info : table_infos | boost::adaptors::map_values)
if (new_status == kErrorStatus)
{
storages->push_back(table_info.storage);
table_locks->push_back(table_info.table_lock);
}
DataRestoreTasks res_tasks;
for (const auto & task : data_restore_tasks)
res_tasks.push_back([task, storages, table_locks] { task(); });
if (access_restore_task)
res_tasks.push_back([task = access_restore_task, access_control = &context->getAccessControl()] { task->restore(*access_control); });
return res_tasks;
}
void RestorerFromBackup::setStatus(const String & new_status)
{
bool is_error_status = new_status.starts_with(kErrorStatus);
if (is_error_status)
{
LOG_ERROR(log, "{} failed with {}", toUpperFirst(current_status), new_status);
LOG_ERROR(log, "{} failed with {}", toUpperFirst(current_status), message);
if (restore_coordination)
restore_coordination->setStatus(restore_settings.host_id, new_status);
restore_coordination->setStatus(restore_settings.host_id, new_status, message);
}
else
{
LOG_TRACE(log, "{}", toUpperFirst(new_status));
current_status = new_status;
auto all_hosts
= BackupSettings::Util::filterHostIDs(restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
if (restore_coordination)
restore_coordination->setStatusAndWait(restore_settings.host_id, new_status, all_hosts);
restore_coordination->setStatusAndWait(restore_settings.host_id, new_status, message, all_hosts);
}
}
@ -677,13 +632,18 @@ void RestorerFromBackup::createTables()
create_table_query = create_table_query->clone();
create_table_query->as<ASTCreateQuery &>().if_not_exists = true;
}
LOG_TRACE(
log,
"Creating {}: {}",
tableNameWithTypeToString(table_name.database, table_name.table, false),
serializeAST(*create_table_query));
database->createTableRestoredFromBackup(create_table_query, context, restore_coordination, create_table_timeout_ms);
database->createTableRestoredFromBackup(
create_table_query,
context,
restore_coordination,
std::chrono::duration_cast<std::chrono::milliseconds>(create_table_timeout).count());
}
table_info.created = true;
@ -799,6 +759,34 @@ void RestorerFromBackup::checkPathInBackupIsRegisteredToRestoreAccess(const Stri
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path to restore access was not added");
}
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::getDataRestoreTasks()
{
if (data_restore_tasks.empty() && !access_restore_task)
return {};
LOG_TRACE(log, "Will insert data to tables");
/// Storages and table locks must exist while we're executing data restoring tasks.
auto storages = std::make_shared<std::vector<StoragePtr>>();
auto table_locks = std::make_shared<std::vector<TableLockHolder>>();
storages->reserve(table_infos.size());
table_locks->reserve(table_infos.size());
for (const auto & table_info : table_infos | boost::adaptors::map_values)
{
storages->push_back(table_info.storage);
table_locks->push_back(table_info.table_lock);
}
DataRestoreTasks res_tasks;
for (const auto & task : data_restore_tasks)
res_tasks.push_back([task, storages, table_locks] { task(); });
if (access_restore_task)
res_tasks.push_back([task = access_restore_task, access_control = &context->getAccessControl()] { task->restore(*access_control); });
return res_tasks;
}
void RestorerFromBackup::throwPartitionsNotSupported(const StorageID & storage_id, const String & table_engine)
{
throw Exception(

View File

@ -26,27 +26,29 @@ public:
const RestoreSettings & restore_settings_,
std::shared_ptr<IRestoreCoordination> restore_coordination_,
const BackupPtr & backup_,
const ContextMutablePtr & context_,
std::chrono::seconds timeout_);
const ContextMutablePtr & context_);
~RestorerFromBackup();
/// Restores the definition of databases and tables and prepares tasks to restore the data of the tables.
/// restoreMetadata() checks access rights internally so checkAccessRightsOnly() shouldn't be called first.
void restoreMetadata();
enum Mode
{
/// Restores databases and tables.
RESTORE,
/// Only checks access rights without restoring anything.
void checkAccessOnly();
/// Only checks access rights without restoring anything.
CHECK_ACCESS_ONLY
};
using DataRestoreTask = std::function<void()>;
using DataRestoreTasks = std::vector<DataRestoreTask>;
DataRestoreTasks getDataRestoreTasks();
/// Restores the metadata of databases and tables and returns tasks to restore the data of tables.
DataRestoreTasks run(Mode mode);
BackupPtr getBackup() const { return backup; }
const RestoreSettings & getRestoreSettings() const { return restore_settings; }
bool isNonEmptyTableAllowed() const { return getRestoreSettings().allow_non_empty_tables; }
std::shared_ptr<IRestoreCoordination> getRestoreCoordination() const { return restore_coordination; }
std::chrono::seconds getTimeout() const { return timeout; }
ContextMutablePtr getContext() const { return context; }
/// Adds a data restore task which will be later returned by getDataRestoreTasks().
@ -69,15 +71,12 @@ private:
std::shared_ptr<IRestoreCoordination> restore_coordination;
BackupPtr backup;
ContextMutablePtr context;
std::chrono::seconds timeout;
UInt64 create_table_timeout_ms;
std::chrono::milliseconds create_table_timeout;
Poco::Logger * log;
String current_status;
std::vector<std::filesystem::path> root_paths_in_backup;
Strings all_hosts;
DDLRenamingMap renaming_map;
void run(bool only_check_access);
std::vector<std::filesystem::path> root_paths_in_backup;
void findRootPathsInBackup();
@ -91,7 +90,9 @@ private:
void createDatabases();
void createTables();
void setStatus(const String & new_status);
DataRestoreTasks getDataRestoreTasks();
void setStatus(const String & new_status, const String & message = "");
struct DatabaseInfo
{
@ -111,6 +112,7 @@ private:
std::vector<QualifiedTableName> findTablesWithoutDependencies() const;
String current_status;
std::unordered_map<String, DatabaseInfo> database_infos;
std::map<QualifiedTableName, TableInfo> table_infos;
std::vector<DataRestoreTask> data_restore_tasks;