diff --git a/src/Access/ReplicatedAccessStorage.cpp b/src/Access/ReplicatedAccessStorage.cpp index a7cb2b6e08e..ddc5e8bfed1 100644 --- a/src/Access/ReplicatedAccessStorage.cpp +++ b/src/Access/ReplicatedAccessStorage.cpp @@ -674,18 +674,16 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col backup_entries_collector.getContext()->getAccessControl()); auto backup_coordination = backup_entries_collector.getBackupCoordination(); - String current_host_id = backup_entries_collector.getBackupSettings().host_id; - backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, current_host_id, backup_entry_with_path.first); + backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, backup_entry_with_path.first); backup_entries_collector.addPostTask( [backup_entry = backup_entry_with_path.second, zookeeper_path = zookeeper_path, type, - current_host_id, &backup_entries_collector, backup_coordination] { - for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type, current_host_id)) + for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type)) backup_entries_collector.addBackupEntry(path, backup_entry); }); } diff --git a/src/Backups/BackupCoordinationLocal.cpp b/src/Backups/BackupCoordinationLocal.cpp index 2d0bf978b83..90f64f15d97 100644 --- a/src/Backups/BackupCoordinationLocal.cpp +++ b/src/Backups/BackupCoordinationLocal.cpp @@ -13,20 +13,20 @@ using FileInfo = IBackupCoordination::FileInfo; BackupCoordinationLocal::BackupCoordinationLocal() = default; BackupCoordinationLocal::~BackupCoordinationLocal() = default; -void BackupCoordinationLocal::setStage(const String &, const String &, const String &) +void BackupCoordinationLocal::setStage(const String &, const String &) { } -void BackupCoordinationLocal::setError(const String &, const Exception &) +void BackupCoordinationLocal::setError(const Exception &) { } -Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &) +Strings BackupCoordinationLocal::waitForStage(const String &) { return {}; } -Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds) +Strings BackupCoordinationLocal::waitForStage(const String &, std::chrono::milliseconds) { return {}; } @@ -70,29 +70,29 @@ Strings BackupCoordinationLocal::getReplicatedDataPaths(const String & table_sha } -void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) +void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) { std::lock_guard lock{mutex}; - replicated_access.addFilePath(access_zk_path, access_entity_type, host_id, file_path); + replicated_access.addFilePath(access_zk_path, access_entity_type, "", file_path); } -Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const +Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const { std::lock_guard lock{mutex}; - return replicated_access.getFilePaths(access_zk_path, access_entity_type, host_id); + return replicated_access.getFilePaths(access_zk_path, access_entity_type, ""); } -void BackupCoordinationLocal::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) +void BackupCoordinationLocal::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) { std::lock_guard lock{mutex}; - replicated_sql_objects.addDirectory(loader_zk_path, object_type, host_id, dir_path); + replicated_sql_objects.addDirectory(loader_zk_path, object_type, "", dir_path); } -Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const +Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const { std::lock_guard lock{mutex}; - return replicated_sql_objects.getDirectories(loader_zk_path, object_type, host_id); + return replicated_sql_objects.getDirectories(loader_zk_path, object_type, ""); } diff --git a/src/Backups/BackupCoordinationLocal.h b/src/Backups/BackupCoordinationLocal.h index edfa6c9973b..21db165be67 100644 --- a/src/Backups/BackupCoordinationLocal.h +++ b/src/Backups/BackupCoordinationLocal.h @@ -21,10 +21,10 @@ public: BackupCoordinationLocal(); ~BackupCoordinationLocal() override; - void setStage(const String & current_host, const String & new_stage, const String & message) override; - void setError(const String & current_host, const Exception & exception) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; + void setStage(const String & new_stage, const String & message) override; + void setError(const Exception & exception) override; + Strings waitForStage(const String & stage_to_wait) override; + Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector & part_names_and_checksums) override; @@ -37,11 +37,11 @@ public: void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override; Strings getReplicatedDataPaths(const String & table_shared_id) const override; - void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override; - Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override; + void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override; + Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override; - void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) override; - Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const override; + void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override; + Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override; void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override; void updateFileInfo(const FileInfo & file_info) override; diff --git a/src/Backups/BackupCoordinationRemote.cpp b/src/Backups/BackupCoordinationRemote.cpp index e7ea9becd24..5ad95490c95 100644 --- a/src/Backups/BackupCoordinationRemote.cpp +++ b/src/Backups/BackupCoordinationRemote.cpp @@ -166,17 +166,30 @@ namespace } } +size_t BackupCoordinationRemote::findCurrentHostIndex(const Strings & all_hosts, const String & current_host) +{ + auto it = std::find(all_hosts.begin(), all_hosts.end(), current_host); + if (it == all_hosts.end()) + return 0; + return it - all_hosts.begin(); +} + BackupCoordinationRemote::BackupCoordinationRemote( - const BackupKeeperSettings & keeper_settings_, - const String & root_zookeeper_path_, - const String & backup_uuid_, zkutil::GetZooKeeper get_zookeeper_, + const String & root_zookeeper_path_, + const BackupKeeperSettings & keeper_settings_, + const String & backup_uuid_, + const Strings & all_hosts_, + const String & current_host_, bool is_internal_) - : keeper_settings(keeper_settings_) + : get_zookeeper(get_zookeeper_) , root_zookeeper_path(root_zookeeper_path_) , zookeeper_path(root_zookeeper_path_ + "/backup-" + backup_uuid_) + , keeper_settings(keeper_settings_) , backup_uuid(backup_uuid_) - , get_zookeeper(get_zookeeper_) + , all_hosts(all_hosts_) + , current_host(current_host_) + , current_host_index(findCurrentHostIndex(all_hosts, current_host)) , is_internal(is_internal_) { zookeeper_retries_info = ZooKeeperRetriesInfo( @@ -251,22 +264,22 @@ void BackupCoordinationRemote::removeAllNodes() } -void BackupCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message) +void BackupCoordinationRemote::setStage(const String & new_stage, const String & message) { stage_sync->set(current_host, new_stage, message); } -void BackupCoordinationRemote::setError(const String & current_host, const Exception & exception) +void BackupCoordinationRemote::setError(const Exception & exception) { stage_sync->setError(current_host, exception); } -Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait) +Strings BackupCoordinationRemote::waitForStage(const String & stage_to_wait) { return stage_sync->wait(all_hosts, stage_to_wait); } -Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) +Strings BackupCoordinationRemote::waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) { return stage_sync->waitFor(all_hosts, stage_to_wait, timeout); } @@ -403,7 +416,7 @@ void BackupCoordinationRemote::prepareReplicatedTables() const } -void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) +void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) { { std::lock_guard lock{mutex}; @@ -416,15 +429,15 @@ void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access zk->createIfNotExists(path, ""); path += "/" + AccessEntityTypeInfo::get(access_entity_type).name; zk->createIfNotExists(path, ""); - path += "/" + host_id; + path += "/" + current_host; zk->createIfNotExists(path, file_path); } -Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const +Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const { std::lock_guard lock{mutex}; prepareReplicatedAccess(); - return replicated_access->getFilePaths(access_zk_path, access_entity_type, host_id); + return replicated_access->getFilePaths(access_zk_path, access_entity_type, current_host); } void BackupCoordinationRemote::prepareReplicatedAccess() const @@ -453,7 +466,7 @@ void BackupCoordinationRemote::prepareReplicatedAccess() const } } -void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) +void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) { { std::lock_guard lock{mutex}; @@ -474,15 +487,15 @@ void BackupCoordinationRemote::addReplicatedSQLObjectsDir(const String & loader_ } zk->createIfNotExists(path, ""); - path += "/" + host_id; + path += "/" + current_host; zk->createIfNotExists(path, dir_path); } -Strings BackupCoordinationRemote::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const +Strings BackupCoordinationRemote::getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const { std::lock_guard lock{mutex}; prepareReplicatedSQLObjects(); - return replicated_sql_objects->getDirectories(loader_zk_path, object_type, host_id); + return replicated_sql_objects->getDirectories(loader_zk_path, object_type, current_host); } void BackupCoordinationRemote::prepareReplicatedSQLObjects() const @@ -827,5 +840,4 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic &) return false; } - } diff --git a/src/Backups/BackupCoordinationRemote.h b/src/Backups/BackupCoordinationRemote.h index 74c6b58341a..268f20b9e39 100644 --- a/src/Backups/BackupCoordinationRemote.h +++ b/src/Backups/BackupCoordinationRemote.h @@ -27,17 +27,20 @@ public: }; BackupCoordinationRemote( - const BackupKeeperSettings & keeper_settings_, - const String & root_zookeeper_path_, - const String & backup_uuid_, zkutil::GetZooKeeper get_zookeeper_, + const String & root_zookeeper_path_, + const BackupKeeperSettings & keeper_settings_, + const String & backup_uuid_, + const Strings & all_hosts_, + const String & current_host_, bool is_internal_); + ~BackupCoordinationRemote() override; - void setStage(const String & current_host, const String & new_stage, const String & message) override; - void setError(const String & current_host, const Exception & exception) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; + void setStage(const String & new_stage, const String & message) override; + void setError(const Exception & exception) override; + Strings waitForStage(const String & stage_to_wait) override; + Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; void addReplicatedPartNames( const String & table_shared_id, @@ -58,11 +61,11 @@ public: void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override; Strings getReplicatedDataPaths(const String & table_shared_id) const override; - void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override; - Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override; + void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override; + Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override; - void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) override; - Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const override; + void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override; + Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override; void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override; void updateFileInfo(const FileInfo & file_info) override; @@ -78,6 +81,8 @@ public: bool hasConcurrentBackups(const std::atomic & num_active_backups) const override; + static size_t findCurrentHostIndex(const Strings & all_hosts, const String & current_host); + private: zkutil::ZooKeeperPtr getZooKeeper() const; zkutil::ZooKeeperPtr getZooKeeperNoLock() const; @@ -91,11 +96,14 @@ private: void prepareReplicatedAccess() const; void prepareReplicatedSQLObjects() const; - const BackupKeeperSettings keeper_settings; + const zkutil::GetZooKeeper get_zookeeper; const String root_zookeeper_path; const String zookeeper_path; + const BackupKeeperSettings keeper_settings; const String backup_uuid; - const zkutil::GetZooKeeper get_zookeeper; + const Strings all_hosts; + const String current_host; + const size_t current_host_index; const bool is_internal; mutable ZooKeeperRetriesInfo zookeeper_retries_info; diff --git a/src/Backups/BackupEntriesCollector.cpp b/src/Backups/BackupEntriesCollector.cpp index 92526f0094e..fc3eab5b8f0 100644 --- a/src/Backups/BackupEntriesCollector.cpp +++ b/src/Backups/BackupEntriesCollector.cpp @@ -133,22 +133,22 @@ Strings BackupEntriesCollector::setStage(const String & new_stage, const String LOG_TRACE(log, fmt::runtime(toUpperFirst(new_stage))); current_stage = new_stage; - backup_coordination->setStage(backup_settings.host_id, new_stage, message); + backup_coordination->setStage(new_stage, message); if (new_stage == Stage::formatGatheringMetadata(1)) { - return backup_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout); + return backup_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout); } else if (new_stage.starts_with(Stage::GATHERING_METADATA)) { auto current_time = std::chrono::steady_clock::now(); auto end_of_timeout = std::max(current_time, consistent_metadata_snapshot_end_time); return backup_coordination->waitForStage( - all_hosts, new_stage, std::chrono::duration_cast(end_of_timeout - current_time)); + new_stage, std::chrono::duration_cast(end_of_timeout - current_time)); } else { - return backup_coordination->waitForStage(all_hosts, new_stage); + return backup_coordination->waitForStage(new_stage); } } diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index 7699641a974..123a5ca93f1 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -38,14 +38,33 @@ namespace Stage = BackupCoordinationStage; namespace { - std::shared_ptr makeBackupCoordination(std::optional keeper_settings, String & root_zk_path, const String & backup_uuid, const ContextPtr & context, bool is_internal_backup) + std::shared_ptr makeBackupCoordination(const ContextPtr & context, const BackupSettings & backup_settings, bool remote) { - if (!root_zk_path.empty()) + if (remote) { - if (!keeper_settings.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Parameter keeper_settings is empty while root_zk_path is not. This is bug"); + String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); + auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); }; - return std::make_shared(*keeper_settings, root_zk_path, backup_uuid, get_zookeeper, is_internal_backup); + + BackupCoordinationRemote::BackupKeeperSettings keeper_settings + { + .keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries, + .keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms, + .keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms, + .batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread, + }; + + auto all_hosts = BackupSettings::Util::filterHostIDs( + backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num); + + return std::make_shared( + get_zookeeper, + root_zk_path, + keeper_settings, + toString(*backup_settings.backup_uuid), + all_hosts, + backup_settings.host_id, + backup_settings.internal); } else { @@ -53,12 +72,19 @@ namespace } } - std::shared_ptr makeRestoreCoordination(const String & root_zk_path, const String & restore_uuid, const ContextPtr & context, bool is_internal_backup) + std::shared_ptr + makeRestoreCoordination(const ContextPtr & context, const RestoreSettings & restore_settings, bool remote) { - if (!root_zk_path.empty()) + if (remote) { + String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); + auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); }; - return std::make_shared(root_zk_path, restore_uuid, get_zookeeper, is_internal_backup); + + auto all_hosts = BackupSettings::Util::filterHostIDs( + restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num); + + return std::make_shared(get_zookeeper, root_zk_path, toString(*restore_settings.restore_uuid), all_hosts, restore_settings.host_id, restore_settings.internal); } else { @@ -68,12 +94,12 @@ namespace /// Sends information about an exception to IBackupCoordination or IRestoreCoordination. template - void sendExceptionToCoordination(std::shared_ptr coordination, const String & current_host, const Exception & exception) + void sendExceptionToCoordination(std::shared_ptr coordination, const Exception & exception) { try { if (coordination) - coordination->setError(current_host, exception); + coordination->setError(exception); } catch (...) { @@ -82,7 +108,7 @@ namespace /// Sends information about the current exception to IBackupCoordination or IRestoreCoordination. template - void sendCurrentExceptionToCoordination(std::shared_ptr coordination, const String & current_host) + void sendCurrentExceptionToCoordination(std::shared_ptr coordination) { try { @@ -90,12 +116,12 @@ namespace } catch (const Exception & e) { - sendExceptionToCoordination(coordination, current_host, e); + sendExceptionToCoordination(coordination, e); } catch (...) { if (coordination) - coordination->setError(current_host, Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode())); + coordination->setError(Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode())); } } @@ -162,24 +188,13 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context else backup_id = toString(*backup_settings.backup_uuid); - String root_zk_path; - std::shared_ptr backup_coordination; if (backup_settings.internal) { /// The following call of makeBackupCoordination() is not essential because doBackup() will later create a backup coordination /// if it's not created here. However to handle errors better it's better to make a coordination here because this way /// if an exception will be thrown in startMakingBackup() other hosts will know about that. - root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); - - BackupCoordinationRemote::BackupKeeperSettings keeper_settings - { - .keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries, - .keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms, - .keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms, - .batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread, - }; - backup_coordination = makeBackupCoordination(keeper_settings, root_zk_path, toString(*backup_settings.backup_uuid), context, backup_settings.internal); + backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ true); } auto backup_info = BackupInfo::fromAST(*backup_query->backup_name); @@ -238,7 +253,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging)); /// Something bad happened, the backup has not built. setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED); - sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id); + sendCurrentExceptionToCoordination(backup_coordination); throw; } } @@ -274,19 +289,9 @@ void BackupsWorker::doBackup( if (!on_cluster) context->checkAccess(required_access); - String root_zk_path; - std::optional keeper_settings; ClusterPtr cluster; if (on_cluster) { - keeper_settings = BackupCoordinationRemote::BackupKeeperSettings - { - .keeper_max_retries = context->getSettingsRef().backup_keeper_max_retries, - .keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_keeper_retry_initial_backoff_ms, - .keeper_retry_max_backoff_ms = context->getSettingsRef().backup_keeper_retry_max_backoff_ms, - .batch_size_for_keeper_multiread = context->getSettingsRef().backup_batch_size_for_keeper_multiread, - }; - root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); backup_query->cluster = context->getMacros()->expand(backup_query->cluster); cluster = context->getCluster(backup_query->cluster); backup_settings.cluster_host_ids = cluster->getHostIDs(); @@ -294,7 +299,7 @@ void BackupsWorker::doBackup( /// Make a backup coordination. if (!backup_coordination) - backup_coordination = makeBackupCoordination(keeper_settings, root_zk_path, toString(*backup_settings.backup_uuid), context, backup_settings.internal); + backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ on_cluster); if (!allow_concurrent_backups && backup_coordination->hasConcurrentBackups(std::ref(num_active_backups))) throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'"); @@ -330,9 +335,7 @@ void BackupsWorker::doBackup( executeDDLQueryOnCluster(backup_query, mutable_context, params); /// Wait until all the hosts have written their backup entries. - auto all_hosts = BackupSettings::Util::filterHostIDs( - backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num); - backup_coordination->waitForStage(all_hosts, Stage::COMPLETED); + backup_coordination->waitForStage(Stage::COMPLETED); } else { @@ -349,7 +352,7 @@ void BackupsWorker::doBackup( writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool); /// We have written our backup entries, we need to tell other hosts (they could be waiting for it). - backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, ""); + backup_coordination->setStage(Stage::COMPLETED, ""); } size_t num_files = 0; @@ -383,7 +386,7 @@ void BackupsWorker::doBackup( { tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging)); setStatusSafe(backup_id, BackupStatus::BACKUP_FAILED); - sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id); + sendCurrentExceptionToCoordination(backup_coordination); } else { @@ -417,8 +420,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt /// The following call of makeRestoreCoordination() is not essential because doRestore() will later create a restore coordination /// if it's not created here. However to handle errors better it's better to make a coordination here because this way /// if an exception will be thrown in startRestoring() other hosts will know about that. - auto root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); - restore_coordination = makeRestoreCoordination(root_zk_path, toString(*restore_settings.restore_uuid), context, restore_settings.internal); + restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ true); } try @@ -474,7 +476,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt { /// Something bad happened, the backup has not built. setStatusSafe(restore_id, BackupStatus::RESTORE_FAILED); - sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id); + sendCurrentExceptionToCoordination(restore_coordination); throw; } } @@ -509,14 +511,12 @@ void BackupsWorker::doRestore( BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params); String current_database = context->getCurrentDatabase(); - String root_zk_path; /// Checks access rights if this is ON CLUSTER query. /// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.) ClusterPtr cluster; bool on_cluster = !restore_query->cluster.empty(); if (on_cluster) { - root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); restore_query->cluster = context->getMacros()->expand(restore_query->cluster); cluster = context->getCluster(restore_query->cluster); restore_settings.cluster_host_ids = cluster->getHostIDs(); @@ -539,7 +539,7 @@ void BackupsWorker::doRestore( /// Make a restore coordination. if (!restore_coordination) - restore_coordination = makeRestoreCoordination(root_zk_path, toString(*restore_settings.restore_uuid), context, restore_settings.internal); + restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster); if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores))) throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'"); @@ -561,9 +561,7 @@ void BackupsWorker::doRestore( executeDDLQueryOnCluster(restore_query, context, params); /// Wait until all the hosts have written their backup entries. - auto all_hosts = BackupSettings::Util::filterHostIDs( - restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num); - restore_coordination->waitForStage(all_hosts, Stage::COMPLETED); + restore_coordination->waitForStage(Stage::COMPLETED); } else { @@ -581,7 +579,7 @@ void BackupsWorker::doRestore( restoreTablesData(std::move(data_restore_tasks), restores_thread_pool); /// We have restored everything, we need to tell other hosts (they could be waiting for it). - restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, ""); + restore_coordination->setStage(Stage::COMPLETED, ""); } LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging); @@ -603,7 +601,7 @@ void BackupsWorker::doRestore( { tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging)); setStatusSafe(restore_id, BackupStatus::RESTORE_FAILED); - sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id); + sendCurrentExceptionToCoordination(restore_coordination); } else { diff --git a/src/Backups/IBackupCoordination.h b/src/Backups/IBackupCoordination.h index f87aa8b8f41..26f101f29e5 100644 --- a/src/Backups/IBackupCoordination.h +++ b/src/Backups/IBackupCoordination.h @@ -22,10 +22,10 @@ public: virtual ~IBackupCoordination() = default; /// Sets the current stage and waits for other hosts to come to this stage too. - virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0; - virtual void setError(const String & current_host, const Exception & exception) = 0; - virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0; - virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; + virtual void setStage(const String & new_stage, const String & message) = 0; + virtual void setError(const Exception & exception) = 0; + virtual Strings waitForStage(const String & stage_to_wait) = 0; + virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; struct PartNameAndChecksum { @@ -66,12 +66,12 @@ public: virtual Strings getReplicatedDataPaths(const String & table_shared_id) const = 0; /// Adds a path to access.txt file keeping access entities of a ReplicatedAccessStorage. - virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) = 0; - virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const = 0; + virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) = 0; + virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const = 0; /// Adds a path to a directory with user-defined SQL objects inside the backup. - virtual void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path) = 0; - virtual Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const = 0; + virtual void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) = 0; + virtual Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const = 0; struct FileInfo { diff --git a/src/Backups/IRestoreCoordination.h b/src/Backups/IRestoreCoordination.h index 61aee533dd2..2f9e8d171f6 100644 --- a/src/Backups/IRestoreCoordination.h +++ b/src/Backups/IRestoreCoordination.h @@ -18,10 +18,10 @@ public: virtual ~IRestoreCoordination() = default; /// Sets the current stage and waits for other hosts to come to this stage too. - virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0; - virtual void setError(const String & current_host, const Exception & exception) = 0; - virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0; - virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; + virtual void setStage(const String & new_stage, const String & message) = 0; + virtual void setError(const Exception & exception) = 0; + virtual Strings waitForStage(const String & stage_to_wait) = 0; + virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; static constexpr const char * kErrorStatus = "error"; diff --git a/src/Backups/RestoreCoordinationLocal.cpp b/src/Backups/RestoreCoordinationLocal.cpp index d5334e2c6aa..191cde40aa1 100644 --- a/src/Backups/RestoreCoordinationLocal.cpp +++ b/src/Backups/RestoreCoordinationLocal.cpp @@ -7,20 +7,20 @@ namespace DB RestoreCoordinationLocal::RestoreCoordinationLocal() = default; RestoreCoordinationLocal::~RestoreCoordinationLocal() = default; -void RestoreCoordinationLocal::setStage(const String &, const String &, const String &) +void RestoreCoordinationLocal::setStage(const String &, const String &) { } -void RestoreCoordinationLocal::setError(const String &, const Exception &) +void RestoreCoordinationLocal::setError(const Exception &) { } -Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &) +Strings RestoreCoordinationLocal::waitForStage(const String &) { return {}; } -Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds) +Strings RestoreCoordinationLocal::waitForStage(const String &, std::chrono::milliseconds) { return {}; } diff --git a/src/Backups/RestoreCoordinationLocal.h b/src/Backups/RestoreCoordinationLocal.h index cb3a8c55d22..bbe76cdf5fd 100644 --- a/src/Backups/RestoreCoordinationLocal.h +++ b/src/Backups/RestoreCoordinationLocal.h @@ -19,10 +19,10 @@ public: ~RestoreCoordinationLocal() override; /// Sets the current stage and waits for other hosts to come to this stage too. - void setStage(const String & current_host, const String & new_stage, const String & message) override; - void setError(const String & current_host, const Exception & exception) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; + void setStage(const String & new_stage, const String & message) override; + void setError(const Exception & exception) override; + Strings waitForStage(const String & stage_to_wait) override; + Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; /// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table. bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override; diff --git a/src/Backups/RestoreCoordinationRemote.cpp b/src/Backups/RestoreCoordinationRemote.cpp index 646c2c68d3b..10d085a696a 100644 --- a/src/Backups/RestoreCoordinationRemote.cpp +++ b/src/Backups/RestoreCoordinationRemote.cpp @@ -11,11 +11,19 @@ namespace DB namespace Stage = BackupCoordinationStage; RestoreCoordinationRemote::RestoreCoordinationRemote( - const String & root_zookeeper_path_, const String & restore_uuid_, zkutil::GetZooKeeper get_zookeeper_, bool is_internal_) - : root_zookeeper_path(root_zookeeper_path_) - , zookeeper_path(root_zookeeper_path_ + "/restore-" + restore_uuid_) + zkutil::GetZooKeeper get_zookeeper_, + const String & root_zookeeper_path_, + const String & restore_uuid_, + const Strings & all_hosts_, + const String & current_host_, + bool is_internal_) + : get_zookeeper(get_zookeeper_) + , root_zookeeper_path(root_zookeeper_path_) , restore_uuid(restore_uuid_) - , get_zookeeper(get_zookeeper_) + , zookeeper_path(root_zookeeper_path_ + "/restore-" + restore_uuid_) + , all_hosts(all_hosts_) + , current_host(current_host_) + , current_host_index(BackupCoordinationRemote::findCurrentHostIndex(all_hosts, current_host)) , is_internal(is_internal_) { createRootNodes(); @@ -63,22 +71,22 @@ void RestoreCoordinationRemote::createRootNodes() } -void RestoreCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message) +void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message) { stage_sync->set(current_host, new_stage, message); } -void RestoreCoordinationRemote::setError(const String & current_host, const Exception & exception) +void RestoreCoordinationRemote::setError(const Exception & exception) { stage_sync->setError(current_host, exception); } -Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait) +Strings RestoreCoordinationRemote::waitForStage(const String & stage_to_wait) { return stage_sync->wait(all_hosts, stage_to_wait); } -Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) +Strings RestoreCoordinationRemote::waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) { return stage_sync->waitFor(all_hosts, stage_to_wait, timeout); } diff --git a/src/Backups/RestoreCoordinationRemote.h b/src/Backups/RestoreCoordinationRemote.h index 4ee87796e8e..b78c2e96f9e 100644 --- a/src/Backups/RestoreCoordinationRemote.h +++ b/src/Backups/RestoreCoordinationRemote.h @@ -11,14 +11,21 @@ namespace DB class RestoreCoordinationRemote : public IRestoreCoordination { public: - RestoreCoordinationRemote(const String & root_zookeeper_path_, const String & restore_uuid_, zkutil::GetZooKeeper get_zookeeper_, bool is_internal_); + RestoreCoordinationRemote( + zkutil::GetZooKeeper get_zookeeper_, + const String & root_zookeeper_path_, + const String & restore_uuid_, + const Strings & all_hosts_, + const String & current_host_, + bool is_internal_); + ~RestoreCoordinationRemote() override; /// Sets the current stage and waits for other hosts to come to this stage too. - void setStage(const String & current_host, const String & new_stage, const String & message) override; - void setError(const String & current_host, const Exception & exception) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; - Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; + void setStage(const String & new_stage, const String & message) override; + void setError(const Exception & exception) override; + Strings waitForStage(const String & stage_to_wait) override; + Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; /// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table. bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override; @@ -44,10 +51,13 @@ private: class ReplicatedDatabasesMetadataSync; - const String root_zookeeper_path; - const String zookeeper_path; - const String restore_uuid; const zkutil::GetZooKeeper get_zookeeper; + const String root_zookeeper_path; + const String restore_uuid; + const String zookeeper_path; + const Strings all_hosts; + const String current_host; + const size_t current_host_index; const bool is_internal; std::optional stage_sync; diff --git a/src/Backups/RestorerFromBackup.cpp b/src/Backups/RestorerFromBackup.cpp index 68a68379f79..77f7512e0d1 100644 --- a/src/Backups/RestorerFromBackup.cpp +++ b/src/Backups/RestorerFromBackup.cpp @@ -150,11 +150,11 @@ void RestorerFromBackup::setStage(const String & new_stage, const String & messa if (restore_coordination) { - restore_coordination->setStage(restore_settings.host_id, new_stage, message); + restore_coordination->setStage(new_stage, message); if (new_stage == Stage::FINDING_TABLES_IN_BACKUP) - restore_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout); + restore_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout); else - restore_coordination->waitForStage(all_hosts, new_stage); + restore_coordination->waitForStage(new_stage); } } diff --git a/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp b/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp index 0fc86bf39ba..53d8ebc7b61 100644 --- a/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp +++ b/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp @@ -48,10 +48,9 @@ void backupUserDefinedSQLObjects( } String replication_id = loader.getReplicationID(); - String current_host_id = backup_entries_collector.getBackupSettings().host_id; auto backup_coordination = backup_entries_collector.getBackupCoordination(); - backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, current_host_id, data_path_in_backup); + backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, data_path_in_backup); // On the stage of running post tasks, all directories will already be added to the backup coordination object. // They will only be returned for one of the hosts below, for the rest an empty list. @@ -60,11 +59,10 @@ void backupUserDefinedSQLObjects( [backup_entries = std::move(backup_entries), replication_id = std::move(replication_id), object_type, - current_host_id = std::move(current_host_id), &backup_entries_collector, backup_coordination] { - auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type, current_host_id); + auto dirs = backup_coordination->getReplicatedSQLObjectsDirs(replication_id, object_type); for (const auto & dir : dirs) {