diff --git a/src/Backups/BackupCoordinationLocal.cpp b/src/Backups/BackupCoordinationLocal.cpp index 5b7ee37618b..27e0f173cf3 100644 --- a/src/Backups/BackupCoordinationLocal.cpp +++ b/src/Backups/BackupCoordinationLocal.cpp @@ -19,10 +19,6 @@ void BackupCoordinationLocal::setStage(const String &, const String &) { } -void BackupCoordinationLocal::setStageForCluster(const String &) -{ -} - void BackupCoordinationLocal::setError(const Exception &) { } diff --git a/src/Backups/BackupCoordinationLocal.h b/src/Backups/BackupCoordinationLocal.h index f1ffa8e8517..a7b05fbb83c 100644 --- a/src/Backups/BackupCoordinationLocal.h +++ b/src/Backups/BackupCoordinationLocal.h @@ -22,8 +22,7 @@ public: BackupCoordinationLocal(bool plain_backup_); ~BackupCoordinationLocal() override; - void setStage(const String & new_stage, const String & message) override; - void setStageForCluster(const String & new_stage) override; /// Sets stage for cluster + void setStage(const String & new_stage, const String & message = "") override; void setError(const Exception & exception) override; Strings waitForStage(const String & stage_to_wait) override; Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; diff --git a/src/Backups/BackupCoordinationRemote.cpp b/src/Backups/BackupCoordinationRemote.cpp index af88b15d622..27e7d23ce5f 100644 --- a/src/Backups/BackupCoordinationRemote.cpp +++ b/src/Backups/BackupCoordinationRemote.cpp @@ -254,17 +254,14 @@ void BackupCoordinationRemote::removeAllNodes() void BackupCoordinationRemote::setStage(const String & new_stage, const String & message) { - stage_sync->set(current_host, new_stage, message); -} - -void BackupCoordinationRemote::setStageForCluster(const String & new_stage) -{ - stage_sync->setStageForCluster(new_stage); + if (is_internal) + stage_sync->set(current_host, new_stage, message); + else + stage_sync->set(current_host, new_stage, /* message */ "", /* all_hosts */ true); } void BackupCoordinationRemote::setError(const Exception & exception) { - stage_sync->setStageForCluster(Stage::ERROR); stage_sync->setError(current_host, exception); } diff --git a/src/Backups/BackupCoordinationRemote.h b/src/Backups/BackupCoordinationRemote.h index c659cb0d459..5671079fa27 100644 --- a/src/Backups/BackupCoordinationRemote.h +++ b/src/Backups/BackupCoordinationRemote.h @@ -33,8 +33,7 @@ public: ~BackupCoordinationRemote() override; - void setStage(const String & new_stage, const String & message) override; - void setStageForCluster(const String & new_stage) override; /// Sets stage for cluster + void setStage(const String & new_stage, const String & message = "") override; void setError(const Exception & exception) override; Strings waitForStage(const String & stage_to_wait) override; Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; diff --git a/src/Backups/BackupCoordinationStageSync.cpp b/src/Backups/BackupCoordinationStageSync.cpp index 5cbeec0ec76..3d8c283f084 100644 --- a/src/Backups/BackupCoordinationStageSync.cpp +++ b/src/Backups/BackupCoordinationStageSync.cpp @@ -8,11 +8,13 @@ #include #include #include - +#include namespace DB { +namespace Stage = BackupCoordinationStage; + namespace ErrorCodes { extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE; @@ -42,7 +44,7 @@ void BackupCoordinationStageSync::createRootNodes() }); } -void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message) +void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message, const bool & all_hosts) { auto holder = with_retries.createRetriesControlHolder("set"); holder.retries_ctl.retryLoop( @@ -50,29 +52,24 @@ void BackupCoordinationStageSync::set(const String & current_host, const String { with_retries.renewZooKeeper(zookeeper); - /// Make an ephemeral node so the initiator can track if the current host is still working. - String alive_node_path = zookeeper_path + "/alive|" + current_host; - auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral); - if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) - throw zkutil::KeeperException(code, alive_node_path); - - zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, ""); - zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message); - }); -} - -void BackupCoordinationStageSync::setStageForCluster(const String & new_stage) -{ - auto holder = with_retries.createRetriesControlHolder("setStageForCluster"); - holder.retries_ctl.retryLoop( - [&, &zookeeper = holder.faulty_zookeeper]() + if (all_hosts) { - with_retries.renewZooKeeper(zookeeper); - zookeeper->trySet(zookeeper_path, new_stage); auto code = zookeeper->trySet(zookeeper_path, new_stage); if (code != Coordination::Error::ZOK) throw zkutil::KeeperException(code, zookeeper_path); - }); + } + else + { + /// Make an ephemeral node so the initiator can track if the current host is still working. + String alive_node_path = zookeeper_path + "/alive|" + current_host; + auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) + throw zkutil::KeeperException(code, alive_node_path); + + zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, ""); + zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message); + } + }); } void BackupCoordinationStageSync::setError(const String & current_host, const Exception & exception) @@ -87,6 +84,10 @@ void BackupCoordinationStageSync::setError(const String & current_host, const Ex writeStringBinary(current_host, buf); writeException(exception, buf, true); zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str()); + + auto code = zookeeper->trySet(zookeeper_path, Stage::ERROR); + if (code != Coordination::Error::ZOK) + throw zkutil::KeeperException(code, zookeeper_path); }); } diff --git a/src/Backups/BackupCoordinationStageSync.h b/src/Backups/BackupCoordinationStageSync.h index 9dde4e3095f..2efaec46b3a 100644 --- a/src/Backups/BackupCoordinationStageSync.h +++ b/src/Backups/BackupCoordinationStageSync.h @@ -15,8 +15,7 @@ public: Poco::Logger * log_); /// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that. - void set(const String & current_host, const String & new_stage, const String & message); - void setStageForCluster(const String & new_stage); + void set(const String & current_host, const String & new_stage, const String & message, const bool & all_hosts = false); void setError(const String & current_host, const Exception & exception); /// Sets the stage of the current host and waits until all hosts come to the same stage. diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index de05cc2b092..720ca994a40 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -368,7 +368,7 @@ void BackupsWorker::doBackup( /// Wait until all the hosts have written their backup entries. backup_coordination->waitForStage(Stage::COMPLETED); - backup_coordination->setStageForCluster(Stage::COMPLETED); + backup_coordination->setStage(Stage::COMPLETED); } else { @@ -386,7 +386,7 @@ void BackupsWorker::doBackup( writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal); /// We have written our backup entries, we need to tell other hosts (they could be waiting for it). - backup_coordination->setStage(Stage::COMPLETED, ""); + backup_coordination->setStage(Stage::COMPLETED); } size_t num_files = 0; @@ -709,7 +709,7 @@ void BackupsWorker::doRestore( /// Wait until all the hosts have written their backup entries. restore_coordination->waitForStage(Stage::COMPLETED); - restore_coordination->setStageForCluster(Stage::COMPLETED); + restore_coordination->setStage(Stage::COMPLETED); } else { diff --git a/src/Backups/IBackupCoordination.h b/src/Backups/IBackupCoordination.h index 6caae1dd741..68a13ab7846 100644 --- a/src/Backups/IBackupCoordination.h +++ b/src/Backups/IBackupCoordination.h @@ -21,8 +21,7 @@ public: virtual ~IBackupCoordination() = default; /// Sets the current stage and waits for other hosts to come to this stage too. - virtual void setStage(const String & new_stage, const String & message) = 0; - virtual void setStageForCluster(const String & new_stage) = 0; + virtual void setStage(const String & new_stage, const String & message = "") = 0; virtual void setError(const Exception & exception) = 0; virtual Strings waitForStage(const String & stage_to_wait) = 0; virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; diff --git a/src/Backups/IRestoreCoordination.h b/src/Backups/IRestoreCoordination.h index a5c8db84c86..b4df9491c4c 100644 --- a/src/Backups/IRestoreCoordination.h +++ b/src/Backups/IRestoreCoordination.h @@ -18,8 +18,7 @@ public: virtual ~IRestoreCoordination() = default; /// Sets the current stage and waits for other hosts to come to this stage too. - virtual void setStage(const String & new_stage, const String & message) = 0; - virtual void setStageForCluster(const String & new_stage) = 0; /// Sets stage for cluster + virtual void setStage(const String & new_stage, const String & message = "") = 0; virtual void setError(const Exception & exception) = 0; virtual Strings waitForStage(const String & stage_to_wait) = 0; virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; diff --git a/src/Backups/RestoreCoordinationLocal.cpp b/src/Backups/RestoreCoordinationLocal.cpp index 513204c931c..068c4fe7e52 100644 --- a/src/Backups/RestoreCoordinationLocal.cpp +++ b/src/Backups/RestoreCoordinationLocal.cpp @@ -15,10 +15,6 @@ void RestoreCoordinationLocal::setStage(const String &, const String &) { } -void RestoreCoordinationLocal::setStageForCluster(const String &) -{ -} - void RestoreCoordinationLocal::setError(const Exception &) { } diff --git a/src/Backups/RestoreCoordinationLocal.h b/src/Backups/RestoreCoordinationLocal.h index 0e4f4f01846..2240a25ef3d 100644 --- a/src/Backups/RestoreCoordinationLocal.h +++ b/src/Backups/RestoreCoordinationLocal.h @@ -19,8 +19,7 @@ public: ~RestoreCoordinationLocal() override; /// Sets the current stage and waits for other hosts to come to this stage too. - void setStage(const String & new_stage, const String & message) override; - void setStageForCluster(const String & new_stage) override; + void setStage(const String & new_stage, const String & message = "") override; void setError(const Exception & exception) override; Strings waitForStage(const String & stage_to_wait) override; Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override; diff --git a/src/Backups/RestoreCoordinationRemote.cpp b/src/Backups/RestoreCoordinationRemote.cpp index a3541614f36..c4ecee4aaa6 100644 --- a/src/Backups/RestoreCoordinationRemote.cpp +++ b/src/Backups/RestoreCoordinationRemote.cpp @@ -93,17 +93,14 @@ void RestoreCoordinationRemote::createRootNodes() void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message) { - stage_sync->set(current_host, new_stage, message); -} - -void RestoreCoordinationRemote::setStageForCluster(const String & new_stage) -{ - stage_sync->setStageForCluster(new_stage); + if (is_internal) + stage_sync->set(current_host, new_stage, message); + else + stage_sync->set(current_host, new_stage, /* message */ "", /* all_hosts */ true); } void RestoreCoordinationRemote::setError(const Exception & exception) { - stage_sync->setStageForCluster(Stage::ERROR); stage_sync->setError(current_host, exception); } diff --git a/src/Backups/RestoreCoordinationRemote.h b/src/Backups/RestoreCoordinationRemote.h index 947d08a66e5..989b1c1b727 100644 --- a/src/Backups/RestoreCoordinationRemote.h +++ b/src/Backups/RestoreCoordinationRemote.h @@ -26,8 +26,7 @@ public: ~RestoreCoordinationRemote() override; /// Sets the current stage and waits for other hosts to come to this stage too. - void setStage(const String & new_stage, const String & message) override; - void setStageForCluster(const String & new_stage) override; + void setStage(const String & new_stage, const String & message = "") override; void setError(const Exception & exception) override; Strings waitForStage(const String & stage_to_wait) override; Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;