Removed setStageForCluster and added option all_hosts to set stage for cluster

This commit is contained in:
Smita Kulkarni 2023-05-08 14:51:04 +02:00
parent f20901d9d3
commit 49ecba63af
13 changed files with 40 additions and 60 deletions

View File

@ -19,10 +19,6 @@ void BackupCoordinationLocal::setStage(const String &, const String &)
{
}
void BackupCoordinationLocal::setStageForCluster(const String &)
{
}
void BackupCoordinationLocal::setError(const Exception &)
{
}

View File

@ -22,8 +22,7 @@ public:
BackupCoordinationLocal(bool plain_backup_);
~BackupCoordinationLocal() override;
void setStage(const String & new_stage, const String & message) override;
void setStageForCluster(const String & new_stage) override; /// Sets stage for cluster
void setStage(const String & new_stage, const String & message = "") override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;

View File

@ -254,17 +254,14 @@ void BackupCoordinationRemote::removeAllNodes()
void BackupCoordinationRemote::setStage(const String & new_stage, const String & message)
{
stage_sync->set(current_host, new_stage, message);
}
void BackupCoordinationRemote::setStageForCluster(const String & new_stage)
{
stage_sync->setStageForCluster(new_stage);
if (is_internal)
stage_sync->set(current_host, new_stage, message);
else
stage_sync->set(current_host, new_stage, /* message */ "", /* all_hosts */ true);
}
void BackupCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setStageForCluster(Stage::ERROR);
stage_sync->setError(current_host, exception);
}

View File

@ -33,8 +33,7 @@ public:
~BackupCoordinationRemote() override;
void setStage(const String & new_stage, const String & message) override;
void setStageForCluster(const String & new_stage) override; /// Sets stage for cluster
void setStage(const String & new_stage, const String & message = "") override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;

View File

@ -8,11 +8,13 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Backups/BackupCoordinationStage.h>
namespace DB
{
namespace Stage = BackupCoordinationStage;
namespace ErrorCodes
{
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
@ -42,7 +44,7 @@ void BackupCoordinationStageSync::createRootNodes()
});
}
void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message)
void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message, const bool & all_hosts)
{
auto holder = with_retries.createRetriesControlHolder("set");
holder.retries_ctl.retryLoop(
@ -50,29 +52,24 @@ void BackupCoordinationStageSync::set(const String & current_host, const String
{
with_retries.renewZooKeeper(zookeeper);
/// Make an ephemeral node so the initiator can track if the current host is still working.
String alive_node_path = zookeeper_path + "/alive|" + current_host;
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
throw zkutil::KeeperException(code, alive_node_path);
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message);
});
}
void BackupCoordinationStageSync::setStageForCluster(const String & new_stage)
{
auto holder = with_retries.createRetriesControlHolder("setStageForCluster");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
if (all_hosts)
{
with_retries.renewZooKeeper(zookeeper);
zookeeper->trySet(zookeeper_path, new_stage);
auto code = zookeeper->trySet(zookeeper_path, new_stage);
if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, zookeeper_path);
});
}
else
{
/// Make an ephemeral node so the initiator can track if the current host is still working.
String alive_node_path = zookeeper_path + "/alive|" + current_host;
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
throw zkutil::KeeperException(code, alive_node_path);
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message);
}
});
}
void BackupCoordinationStageSync::setError(const String & current_host, const Exception & exception)
@ -87,6 +84,10 @@ void BackupCoordinationStageSync::setError(const String & current_host, const Ex
writeStringBinary(current_host, buf);
writeException(exception, buf, true);
zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str());
auto code = zookeeper->trySet(zookeeper_path, Stage::ERROR);
if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, zookeeper_path);
});
}

View File

@ -15,8 +15,7 @@ public:
Poco::Logger * log_);
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
void set(const String & current_host, const String & new_stage, const String & message);
void setStageForCluster(const String & new_stage);
void set(const String & current_host, const String & new_stage, const String & message, const bool & all_hosts = false);
void setError(const String & current_host, const Exception & exception);
/// Sets the stage of the current host and waits until all hosts come to the same stage.

View File

@ -368,7 +368,7 @@ void BackupsWorker::doBackup(
/// Wait until all the hosts have written their backup entries.
backup_coordination->waitForStage(Stage::COMPLETED);
backup_coordination->setStageForCluster(Stage::COMPLETED);
backup_coordination->setStage(Stage::COMPLETED);
}
else
{
@ -386,7 +386,7 @@ void BackupsWorker::doBackup(
writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(Stage::COMPLETED, "");
backup_coordination->setStage(Stage::COMPLETED);
}
size_t num_files = 0;
@ -709,7 +709,7 @@ void BackupsWorker::doRestore(
/// Wait until all the hosts have written their backup entries.
restore_coordination->waitForStage(Stage::COMPLETED);
restore_coordination->setStageForCluster(Stage::COMPLETED);
restore_coordination->setStage(Stage::COMPLETED);
}
else
{

View File

@ -21,8 +21,7 @@ public:
virtual ~IBackupCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setStageForCluster(const String & new_stage) = 0;
virtual void setStage(const String & new_stage, const String & message = "") = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;

View File

@ -18,8 +18,7 @@ public:
virtual ~IRestoreCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setStageForCluster(const String & new_stage) = 0; /// Sets stage for cluster
virtual void setStage(const String & new_stage, const String & message = "") = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;

View File

@ -15,10 +15,6 @@ void RestoreCoordinationLocal::setStage(const String &, const String &)
{
}
void RestoreCoordinationLocal::setStageForCluster(const String &)
{
}
void RestoreCoordinationLocal::setError(const Exception &)
{
}

View File

@ -19,8 +19,7 @@ public:
~RestoreCoordinationLocal() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & new_stage, const String & message) override;
void setStageForCluster(const String & new_stage) override;
void setStage(const String & new_stage, const String & message = "") override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;

View File

@ -93,17 +93,14 @@ void RestoreCoordinationRemote::createRootNodes()
void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message)
{
stage_sync->set(current_host, new_stage, message);
}
void RestoreCoordinationRemote::setStageForCluster(const String & new_stage)
{
stage_sync->setStageForCluster(new_stage);
if (is_internal)
stage_sync->set(current_host, new_stage, message);
else
stage_sync->set(current_host, new_stage, /* message */ "", /* all_hosts */ true);
}
void RestoreCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setStageForCluster(Stage::ERROR);
stage_sync->setError(current_host, exception);
}

View File

@ -26,8 +26,7 @@ public:
~RestoreCoordinationRemote() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & new_stage, const String & message) override;
void setStageForCluster(const String & new_stage) override;
void setStage(const String & new_stage, const String & message = "") override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;