Move information about current hosts and list of all hosts to BackupCoordination (#47971)

to simplify the code and help implementing other features.

Co-authored-by: Nikita Mikhaylov <mikhaylovnikitka@gmail.com>
This commit is contained in:
Vitaly Baranov 2023-03-24 17:38:19 +01:00 committed by GitHub
parent fd567e03a5
commit 1badc3cba0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 187 additions and 155 deletions

View File

@ -674,18 +674,16 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col
backup_entries_collector.getContext()->getAccessControl());
auto backup_coordination = backup_entries_collector.getBackupCoordination();
String current_host_id = backup_entries_collector.getBackupSettings().host_id;
backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, current_host_id, backup_entry_with_path.first);
backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, backup_entry_with_path.first);
backup_entries_collector.addPostTask(
[backup_entry = backup_entry_with_path.second,
zookeeper_path = zookeeper_path,
type,
current_host_id,
&backup_entries_collector,
backup_coordination]
{
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type, current_host_id))
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type))
backup_entries_collector.addBackupEntry(path, backup_entry);
});
}

View File

@ -13,20 +13,20 @@ using FileInfo = IBackupCoordination::FileInfo;
BackupCoordinationLocal::BackupCoordinationLocal() = default;
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
void BackupCoordinationLocal::setStage(const String &, const String &, const String &)
void BackupCoordinationLocal::setStage(const String &, const String &)
{
}
void BackupCoordinationLocal::setError(const String &, const Exception &)
void BackupCoordinationLocal::setError(const Exception &)
{
}
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &)
Strings BackupCoordinationLocal::waitForStage(const String &)
{
return {};
}
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
Strings BackupCoordinationLocal::waitForStage(const String &, std::chrono::milliseconds)
{
return {};
}
@ -70,29 +70,29 @@ Strings BackupCoordinationLocal::getReplicatedDataPaths(const String & table_sha
}
void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path)
{
std::lock_guard lock{mutex};
replicated_access.addFilePath(access_zk_path, access_entity_type, host_id, file_path);
replicated_access.addFilePath(access_zk_path, access_entity_type, "", file_path);
}
Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const
{
std::lock_guard lock{mutex};
return replicated_access.getFilePaths(access_zk_path, access_entity_type, host_id);
return replicated_access.getFilePaths(access_zk_path, access_entity_type, "");
}
void BackupCoordinationLocal::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path)
void BackupCoordinationLocal::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path)
{
std::lock_guard lock{mutex};
replicated_sql_objects.addDirectory(loader_zk_path, object_type, host_id, dir_path);
replicated_sql_objects.addDirectory(loader_zk_path, object_type, "", dir_path);
}
Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const
Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const
{
std::lock_guard lock{mutex};
return replicated_sql_objects.getDirectories(loader_zk_path, object_type, host_id);
return replicated_sql_objects.getDirectories(loader_zk_path, object_type, "");
}

View File

@ -21,10 +21,10 @@ public:
BackupCoordinationLocal();
~BackupCoordinationLocal() override;
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;
@ -37,11 +37,11 @@ public:
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;

View File

@ -166,17 +166,30 @@ namespace
}
}
size_t BackupCoordinationRemote::findCurrentHostIndex(const Strings & all_hosts, const String & current_host)
{
auto it = std::find(all_hosts.begin(), all_hosts.end(), current_host);
if (it == all_hosts.end())
return 0;
return it - all_hosts.begin();
}
BackupCoordinationRemote::BackupCoordinationRemote(
const BackupKeeperSettings & keeper_settings_,
const String & root_zookeeper_path_,
const String & backup_uuid_,
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const BackupKeeperSettings & keeper_settings_,
const String & backup_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_)
: keeper_settings(keeper_settings_)
: get_zookeeper(get_zookeeper_)
, root_zookeeper_path(root_zookeeper_path_)
, zookeeper_path(root_zookeeper_path_ + "/backup-" + backup_uuid_)
, keeper_settings(keeper_settings_)
, backup_uuid(backup_uuid_)
, get_zookeeper(get_zookeeper_)
, all_hosts(all_hosts_)
, current_host(current_host_)
, current_host_index(findCurrentHostIndex(all_hosts, current_host))
, is_internal(is_internal_)
{
zookeeper_retries_info = ZooKeeperRetriesInfo(
@ -251,22 +264,22 @@ void BackupCoordinationRemote::removeAllNodes()
}
void BackupCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
void BackupCoordinationRemote::setStage(const String & new_stage, const String & message)
{
stage_sync->set(current_host, new_stage, message);
}
void BackupCoordinationRemote::setError(const String & current_host, const Exception & exception)
void BackupCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setError(current_host, exception);
}
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
Strings BackupCoordinationRemote::waitForStage(const String & stage_to_wait)
{
return stage_sync->wait(all_hosts, stage_to_wait);
}
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
Strings BackupCoordinationRemote::waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return stage_sync->waitFor(all_hosts, stage_to_wait, timeout);
}
@ -403,7 +416,7 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
}
void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path)
{
{
std::lock_guard lock{mutex};
@ -416,15 +429,15 @@ void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access
zk->createIfNotExists(path, "");
path += "/" + AccessEntityTypeInfo::get(access_entity_type).name;
zk->createIfNotExists(path, "");
path += "/" + host_id;
path += "/" + current_host;
zk->createIfNotExists(path, file_path);
}
Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const
{
std::lock_guard lock{mutex};
prepareReplicatedAccess();
return replicated_access->getFilePaths(access_zk_path, access_entity_type, host_id);
return replicated_access->getFilePaths(access_zk_path, access_entity_type, current_host);
}
void BackupCoordinationRemote::prepareReplicatedAccess() const
@ -453,7 +466,7 @@ void BackupCoordinationRemote::prepareReplicatedAccess() const
}
}
void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path)
void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path)
{
{
std::lock_guard lock{mutex};
@ -474,15 +487,15 @@ void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_
}
zk->createIfNotExists(path, "");
path += "/" + host_id;
path += "/" + current_host;
zk->createIfNotExists(path, dir_path);
}
Strings BackupCoordinationRemote::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const
Strings BackupCoordinationRemote::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const
{
std::lock_guard lock{mutex};
prepareReplicatedSQLObjects();
return replicated_sql_objects->getDirectories(loader_zk_path, object_type, host_id);
return replicated_sql_objects->getDirectories(loader_zk_path, object_type, current_host);
}
void BackupCoordinationRemote::prepareReplicatedSQLObjects() const
@ -827,5 +840,4 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
return false;
}
}

View File

@ -27,17 +27,20 @@ public:
};
BackupCoordinationRemote(
const BackupKeeperSettings & keeper_settings_,
const String & root_zookeeper_path_,
const String & backup_uuid_,
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const BackupKeeperSettings & keeper_settings_,
const String & backup_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_);
~BackupCoordinationRemote() override;
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(
const String & table_shared_id,
@ -58,11 +61,11 @@ public:
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const override;
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;
@ -78,6 +81,8 @@ public:
bool hasConcurrentBackups(const std::atomic<size_t> & num_active_backups) const override;
static size_t findCurrentHostIndex(const Strings & all_hosts, const String & current_host);
private:
zkutil::ZooKeeperPtr getZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeperNoLock() const;
@ -91,11 +96,14 @@ private:
void prepareReplicatedAccess() const;
void prepareReplicatedSQLObjects() const;
const BackupKeeperSettings keeper_settings;
const zkutil::GetZooKeeper get_zookeeper;
const String root_zookeeper_path;
const String zookeeper_path;
const BackupKeeperSettings keeper_settings;
const String backup_uuid;
const zkutil::GetZooKeeper get_zookeeper;
const Strings all_hosts;
const String current_host;
const size_t current_host_index;
const bool is_internal;
mutable ZooKeeperRetriesInfo zookeeper_retries_info;

View File

@ -133,22 +133,22 @@ Strings BackupEntriesCollector::setStage(const String & new_stage, const String
LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage)));
current_stage = new_stage;
backup_coordination->setStage(backup_settings.host_id, new_stage, message);
backup_coordination->setStage(new_stage, message);
if (new_stage == Stage::formatGatheringMetadata(1))
{
return backup_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
return backup_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
}
else if (new_stage.starts_with(Stage::GATHERING_METADATA))
{
auto current_time = std::chrono::steady_clock::now();
auto end_of_timeout = std::max(current_time, consistent_metadata_snapshot_end_time);
return backup_coordination->waitForStage(
all_hosts, new_stage, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time));
new_stage, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time));
}
else
{
return backup_coordination->waitForStage(all_hosts, new_stage);
return backup_coordination->waitForStage(new_stage);
}
}

View File

@ -38,14 +38,33 @@ namespace Stage = BackupCoordinationStage;
namespace
{
std::shared_ptr<IBackupCoordination> makeBackupCoordination(std::optional<BackupCoordinationRemote::BackupKeeperSettings> keeper_settings, String & root_zk_path, const String & backup_uuid, const ContextPtr & context, bool is_internal_backup)
std::shared_ptr<IBackupCoordination> makeBackupCoordination(const ContextPtr & context, const BackupSettings & backup_settings, bool remote)
{
if (!root_zk_path.empty())
if (remote)
{
if (!keeper_settings.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parameter keeper_settings is empty while root_zk_path is not. This is bug");
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
return std::make_shared<BackupCoordinationRemote>(*keeper_settings, root_zk_path, backup_uuid, get_zookeeper, is_internal_backup);
BackupCoordinationRemote::BackupKeeperSettings keeper_settings
{
.keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread,
};
auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
return std::make_shared<BackupCoordinationRemote>(
get_zookeeper,
root_zk_path,
keeper_settings,
toString(*backup_settings.backup_uuid),
all_hosts,
backup_settings.host_id,
backup_settings.internal);
}
else
{
@ -53,12 +72,19 @@ namespace
}
}
std::shared_ptr<IRestoreCoordination> makeRestoreCoordination(const String & root_zk_path, const String & restore_uuid, const ContextPtr & context, bool is_internal_backup)
std::shared_ptr<IRestoreCoordination>
makeRestoreCoordination(const ContextPtr & context, const RestoreSettings & restore_settings, bool remote)
{
if (!root_zk_path.empty())
if (remote)
{
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
return std::make_shared<RestoreCoordinationRemote>(root_zk_path, restore_uuid, get_zookeeper, is_internal_backup);
auto all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
return std::make_shared<RestoreCoordinationRemote>(get_zookeeper, root_zk_path, toString(*restore_settings.restore_uuid), all_hosts, restore_settings.host_id, restore_settings.internal);
}
else
{
@ -68,12 +94,12 @@ namespace
/// Sends information about an exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host, const Exception & exception)
void sendExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const Exception & exception)
{
try
{
if (coordination)
coordination->setError(current_host, exception);
coordination->setError(exception);
}
catch (...)
{
@ -82,7 +108,7 @@ namespace
/// Sends information about the current exception to IBackupCoordination or IRestoreCoordination.
template <typename CoordinationType>
void sendCurrentExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host)
void sendCurrentExceptionToCoordination(std::shared_ptr<CoordinationType> coordination)
{
try
{
@ -90,12 +116,12 @@ namespace
}
catch (const Exception & e)
{
sendExceptionToCoordination(coordination, current_host, e);
sendExceptionToCoordination(coordination, e);
}
catch (...)
{
if (coordination)
coordination->setError(current_host, Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode()));
coordination->setError(Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode()));
}
}
@ -162,24 +188,13 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
else
backup_id = toString(*backup_settings.backup_uuid);
String root_zk_path;
std::shared_ptr<IBackupCoordination> backup_coordination;
if (backup_settings.internal)
{
/// The following call of makeBackupCoordination() is not essential because doBackup() will later create a backup coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startMakingBackup() other hosts will know about that.
root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
BackupCoordinationRemote::BackupKeeperSettings keeper_settings
{
.keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread,
};
backup_coordination = makeBackupCoordination(keeper_settings, root_zk_path, toString(*backup_settings.backup_uuid), context, backup_settings.internal);
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ true);
}
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
@ -238,7 +253,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
/// Something bad happened, the backup has not built.
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
sendCurrentExceptionToCoordination(backup_coordination);
throw;
}
}
@ -274,19 +289,9 @@ void BackupsWorker::doBackup(
if (!on_cluster)
context->checkAccess(required_access);
String root_zk_path;
std::optional<BackupCoordinationRemote::BackupKeeperSettings> keeper_settings;
ClusterPtr cluster;
if (on_cluster)
{
keeper_settings = BackupCoordinationRemote::BackupKeeperSettings
{
.keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries,
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms,
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms,
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread,
};
root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
backup_query->cluster = context->getMacros()->expand(backup_query->cluster);
cluster = context->getCluster(backup_query->cluster);
backup_settings.cluster_host_ids = cluster->getHostIDs();
@ -294,7 +299,7 @@ void BackupsWorker::doBackup(
/// Make a backup coordination.
if (!backup_coordination)
backup_coordination = makeBackupCoordination(keeper_settings, root_zk_path, toString(*backup_settings.backup_uuid), context, backup_settings.internal);
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ on_cluster);
if (!allow_concurrent_backups && backup_coordination->hasConcurrentBackups(std::ref(num_active_backups)))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
@ -330,9 +335,7 @@ void BackupsWorker::doBackup(
executeDDLQueryOnCluster(backup_query, mutable_context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
backup_coordination->waitForStage(all_hosts, Stage::COMPLETED);
backup_coordination->waitForStage(Stage::COMPLETED);
}
else
{
@ -349,7 +352,7 @@ void BackupsWorker::doBackup(
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, "");
backup_coordination->setStage(Stage::COMPLETED, "");
}
size_t num_files = 0;
@ -383,7 +386,7 @@ void BackupsWorker::doBackup(
{
tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED);
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
sendCurrentExceptionToCoordination(backup_coordination);
}
else
{
@ -417,8 +420,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
/// The following call of makeRestoreCoordination() is not essential because doRestore() will later create a restore coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startRestoring() other hosts will know about that.
auto root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
restore_coordination = makeRestoreCoordination(root_zk_path, toString(*restore_settings.restore_uuid), context, restore_settings.internal);
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ true);
}
try
@ -474,7 +476,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
{
/// Something bad happened, the backup has not built.
setStatusSafe(restore_id, BackupStatus::RESTORE_FAILED);
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
sendCurrentExceptionToCoordination(restore_coordination);
throw;
}
}
@ -509,14 +511,12 @@ void BackupsWorker::doRestore(
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context->getCurrentDatabase();
String root_zk_path;
/// Checks access rights if this is ON CLUSTER query.
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
bool on_cluster = !restore_query->cluster.empty();
if (on_cluster)
{
root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
cluster = context->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
@ -539,7 +539,7 @@ void BackupsWorker::doRestore(
/// Make a restore coordination.
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(root_zk_path, toString(*restore_settings.restore_uuid), context, restore_settings.internal);
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster);
if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores)))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
@ -561,9 +561,7 @@ void BackupsWorker::doRestore(
executeDDLQueryOnCluster(restore_query, context, params);
/// Wait until all the hosts have written their backup entries.
auto all_hosts = BackupSettings::Util::filterHostIDs(
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
restore_coordination->waitForStage(all_hosts, Stage::COMPLETED);
restore_coordination->waitForStage(Stage::COMPLETED);
}
else
{
@ -581,7 +579,7 @@ void BackupsWorker::doRestore(
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, "");
restore_coordination->setStage(Stage::COMPLETED, "");
}
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
@ -603,7 +601,7 @@ void BackupsWorker::doRestore(
{
tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
setStatusSafe(restore_id, BackupStatus::RESTORE_FAILED);
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
sendCurrentExceptionToCoordination(restore_coordination);
}
else
{

View File

@ -22,10 +22,10 @@ public:
virtual ~IBackupCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
virtual void setError(const String & current_host, const Exception & exception) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
struct PartNameAndChecksum
{
@ -66,12 +66,12 @@ public:
virtual Strings getReplicatedDataPaths(const String & table_shared_id) const = 0;
/// Adds a path to access.txt file keeping access entities of a ReplicatedAccessStorage.
virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) = 0;
virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const = 0;
virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) = 0;
virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const = 0;
/// Adds a path to a directory with user-defined SQL objects inside the backup.
virtual void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) = 0;
virtual Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const = 0;
virtual void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) = 0;
virtual Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const = 0;
struct FileInfo
{

View File

@ -18,10 +18,10 @@ public:
virtual ~IRestoreCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
virtual void setError(const String & current_host, const Exception & exception) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
static constexpr const char * kErrorStatus = "error";

View File

@ -7,20 +7,20 @@ namespace DB
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setStage(const String &, const String &, const String &)
void RestoreCoordinationLocal::setStage(const String &, const String &)
{
}
void RestoreCoordinationLocal::setError(const String &, const Exception &)
void RestoreCoordinationLocal::setError(const Exception &)
{
}
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &)
Strings RestoreCoordinationLocal::waitForStage(const String &)
{
return {};
}
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
Strings RestoreCoordinationLocal::waitForStage(const String &, std::chrono::milliseconds)
{
return {};
}

View File

@ -19,10 +19,10 @@ public:
~RestoreCoordinationLocal() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -11,11 +11,19 @@ namespace DB
namespace Stage = BackupCoordinationStage;
RestoreCoordinationRemote::RestoreCoordinationRemote(
const String & root_zookeeper_path_, const String & restore_uuid_, zkutil::GetZooKeeper get_zookeeper_, bool is_internal_)
: root_zookeeper_path(root_zookeeper_path_)
, zookeeper_path(root_zookeeper_path_ + "/restore-" + restore_uuid_)
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const String & restore_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_)
: get_zookeeper(get_zookeeper_)
, root_zookeeper_path(root_zookeeper_path_)
, restore_uuid(restore_uuid_)
, get_zookeeper(get_zookeeper_)
, zookeeper_path(root_zookeeper_path_ + "/restore-" + restore_uuid_)
, all_hosts(all_hosts_)
, current_host(current_host_)
, current_host_index(BackupCoordinationRemote::findCurrentHostIndex(all_hosts, current_host))
, is_internal(is_internal_)
{
createRootNodes();
@ -63,22 +71,22 @@ void RestoreCoordinationRemote::createRootNodes()
}
void RestoreCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message)
{
stage_sync->set(current_host, new_stage, message);
}
void RestoreCoordinationRemote::setError(const String & current_host, const Exception & exception)
void RestoreCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setError(current_host, exception);
}
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
Strings RestoreCoordinationRemote::waitForStage(const String & stage_to_wait)
{
return stage_sync->wait(all_hosts, stage_to_wait);
}
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
Strings RestoreCoordinationRemote::waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout)
{
return stage_sync->waitFor(all_hosts, stage_to_wait, timeout);
}

View File

@ -11,14 +11,21 @@ namespace DB
class RestoreCoordinationRemote : public IRestoreCoordination
{
public:
RestoreCoordinationRemote(const String & root_zookeeper_path_, const String & restore_uuid_, zkutil::GetZooKeeper get_zookeeper_, bool is_internal_);
RestoreCoordinationRemote(
zkutil::GetZooKeeper get_zookeeper_,
const String & root_zookeeper_path_,
const String & restore_uuid_,
const Strings & all_hosts_,
const String & current_host_,
bool is_internal_);
~RestoreCoordinationRemote() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & current_host, const String & new_stage, const String & message) override;
void setError(const String & current_host, const Exception & exception) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void setStage(const String & new_stage, const String & message) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
@ -44,10 +51,13 @@ private:
class ReplicatedDatabasesMetadataSync;
const String root_zookeeper_path;
const String zookeeper_path;
const String restore_uuid;
const zkutil::GetZooKeeper get_zookeeper;
const String root_zookeeper_path;
const String restore_uuid;
const String zookeeper_path;
const Strings all_hosts;
const String current_host;
const size_t current_host_index;
const bool is_internal;
std::optional<BackupCoordinationStageSync> stage_sync;

View File

@ -150,11 +150,11 @@ void RestorerFromBackup::setStage(const String & new_stage, const String & messa
if (restore_coordination)
{
restore_coordination->setStage(restore_settings.host_id, new_stage, message);
restore_coordination->setStage(new_stage, message);
if (new_stage == Stage::FINDING_TABLES_IN_BACKUP)
restore_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
restore_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
else
restore_coordination->waitForStage(all_hosts, new_stage);
restore_coordination->waitForStage(new_stage);
}
}

View File

@ -48,10 +48,9 @@ void backupUserDefinedSQLObjects(
}
String replication_id = loader.getReplicationID();
String current_host_id = backup_entries_collector.getBackupSettings().host_id;
auto backup_coordination = backup_entries_collector.getBackupCoordination();
backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, current_host_id, data_path_in_backup);
backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, data_path_in_backup);
// On the stage of running post tasks, all directories will already be added to the backup coordination object.
// They will only be returned for one of the hosts below, for the rest an empty list.
@ -60,11 +59,10 @@ void backupUserDefinedSQLObjects(
[backup_entries = std::move(backup_entries),
replication_id = std::move(replication_id),
object_type,
current_host_id = std::move(current_host_id),
&backup_entries_collector,
backup_coordination]
{
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type, current_host_id);
auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type);
for (const auto & dir : dirs)
{