Updated to add error or completed status in zookeeper for a cluster for backup/restore, to avoid interpreting previously failed backup/restore when zookeeper is unable to remove nodes

This commit is contained in:
Smita Kulkarni 2023-04-12 20:26:57 +02:00
parent 17aecb797c
commit 49c95a535a
14 changed files with 59 additions and 25 deletions

View File

@ -15,7 +15,7 @@ BackupCoordinationLocal::BackupCoordinationLocal(bool plain_backup_)
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
void BackupCoordinationLocal::setStage(const String &, const String &)
void BackupCoordinationLocal::setStage(const String &, const String &, const bool &)
{
}

View File

@ -22,7 +22,7 @@ public:
BackupCoordinationLocal(bool plain_backup_);
~BackupCoordinationLocal() override;
void setStage(const String & new_stage, const String & message) override;
void setStage(const String & new_stage, const String & message, const bool & for_cluster = false) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;

View File

@ -252,13 +252,17 @@ void BackupCoordinationRemote::removeAllNodes()
}
void BackupCoordinationRemote::setStage(const String & new_stage, const String & message)
void BackupCoordinationRemote::setStage(const String & new_stage, const String & message, const bool & for_cluster)
{
stage_sync->set(current_host, new_stage, message);
if (for_cluster)
stage_sync->setStageForCluster(new_stage);
else
stage_sync->set(current_host, new_stage, message);
}
void BackupCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setStageForCluster(Stage::ERROR);
stage_sync->setError(current_host, exception);
}
@ -777,8 +781,8 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
String status;
if (zk->tryGet(root_zookeeper_path + "/" + existing_backup_path + "/stage", status))
{
/// If status is not COMPLETED it could be because the backup failed, check if 'error' exists
if (status != Stage::COMPLETED && !zk->exists(root_zookeeper_path + "/" + existing_backup_path + "/error"))
/// Check if some other restore is in progress
if (status == Stage::SCHEDULED_TO_START)
{
LOG_WARNING(log, "Found a concurrent backup: {}, current backup: {}", existing_backup_uuid, toString(backup_uuid));
result = true;

View File

@ -33,7 +33,7 @@ public:
~BackupCoordinationRemote() override;
void setStage(const String & new_stage, const String & message) override;
void setStage(const String & new_stage, const String & message, const bool & for_cluster = false) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;

View File

@ -43,6 +43,10 @@ namespace BackupCoordinationStage
/// Coordination stage meaning that a host finished its work.
constexpr const char * COMPLETED = "completed";
/// Coordination stage meaning that backup/restore has failed due to an error
/// Check '/error' for the error message
constexpr const char * ERROR = "error";
}
}

View File

@ -61,6 +61,20 @@ void BackupCoordinationStageSync::set(const String & current_host, const String
});
}
void BackupCoordinationStageSync::setStageForCluster(const String & new_stage)
{
auto holder = with_retries.createRetriesControlHolder("setStageForCluster");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
zookeeper->trySet(zookeeper_path, new_stage);
auto code = zookeeper->trySet(zookeeper_path, new_stage);
if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException(code, zookeeper_path);
});
}
void BackupCoordinationStageSync::setError(const String & current_host, const Exception & exception)
{
auto holder = with_retries.createRetriesControlHolder("setError");

View File

@ -16,6 +16,7 @@ public:
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
void set(const String & current_host, const String & new_stage, const String & message);
void setStageForCluster(const String & new_stage);
void setError(const String & current_host, const Exception & exception);
/// Sets the stage of the current host and waits until all hosts come to the same stage.

View File

@ -368,6 +368,7 @@ void BackupsWorker::doBackup(
/// Wait until all the hosts have written their backup entries.
backup_coordination->waitForStage(Stage::COMPLETED);
backup_coordination->setStage(Stage::COMPLETED, /* message */ "", /* for_cluster */ true);
}
else
{
@ -654,12 +655,26 @@ void BackupsWorker::doRestore(
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
bool on_cluster = !restore_query->cluster.empty();
if (on_cluster)
{
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
cluster = context->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
}
/// Make a restore coordination.
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster);
if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores)))
throw Exception(
ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED,
"Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
if (on_cluster)
{
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
/// because different replicas can contain different set of tables and so the required access rights can differ too.
/// So the right way is pass through the entire cluster and check access for each host.
@ -676,15 +691,6 @@ void BackupsWorker::doRestore(
}
}
/// Make a restore coordination.
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster);
if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores)))
throw Exception(
ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED,
"Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
/// Do RESTORE.
if (on_cluster)
{
@ -703,6 +709,7 @@ void BackupsWorker::doRestore(
/// Wait until all the hosts have written their backup entries.
restore_coordination->waitForStage(Stage::COMPLETED);
restore_coordination->setStage(Stage::COMPLETED, /* message */ "", /* for_cluster */ true);
}
else
{

View File

@ -21,7 +21,7 @@ public:
virtual ~IBackupCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setStage(const String & new_stage, const String & message, const bool & for_cluster = false) = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;

View File

@ -18,7 +18,7 @@ public:
virtual ~IRestoreCoordination() = default;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual void setStage(const String & new_stage, const String & message) = 0;
virtual void setStage(const String & new_stage, const String & message, const bool & for_cluster = false) = 0;
virtual void setError(const Exception & exception) = 0;
virtual Strings waitForStage(const String & stage_to_wait) = 0;
virtual Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;

View File

@ -11,7 +11,7 @@ RestoreCoordinationLocal::RestoreCoordinationLocal() : log(&Poco::Logger::get("R
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setStage(const String &, const String &)
void RestoreCoordinationLocal::setStage(const String &, const String &, const bool &)
{
}

View File

@ -19,7 +19,7 @@ public:
~RestoreCoordinationLocal() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & new_stage, const String & message) override;
void setStage(const String & new_stage, const String & message, const bool & for_cluster = false) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;

View File

@ -90,13 +90,17 @@ void RestoreCoordinationRemote::createRootNodes()
});
}
void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message)
void RestoreCoordinationRemote::setStage(const String & new_stage, const String & message, const bool & for_cluster)
{
stage_sync->set(current_host, new_stage, message);
if (for_cluster)
stage_sync->setStageForCluster(new_stage);
else
stage_sync->set(current_host, new_stage, message);
}
void RestoreCoordinationRemote::setError(const Exception & exception)
{
stage_sync->setStageForCluster(Stage::ERROR);
stage_sync->setError(current_host, exception);
}
@ -282,8 +286,8 @@ bool RestoreCoordinationRemote::hasConcurrentRestores(const std::atomic<size_t>
String status;
if (zk->tryGet(root_zookeeper_path + "/" + existing_restore_path + "/stage", status))
{
/// If status is not COMPLETED it could be because the restore failed, check if 'error' exists
if (status != Stage::COMPLETED && !zk->exists(root_zookeeper_path + "/" + existing_restore_path + "/error"))
/// Check if some other restore is in progress
if (status == Stage::SCHEDULED_TO_START)
{
LOG_WARNING(log, "Found a concurrent restore: {}, current restore: {}", existing_restore_uuid, toString(restore_uuid));
result = true;

View File

@ -26,7 +26,7 @@ public:
~RestoreCoordinationRemote() override;
/// Sets the current stage and waits for other hosts to come to this stage too.
void setStage(const String & new_stage, const String & message) override;
void setStage(const String & new_stage, const String & message, const bool & for_cluster = false) override;
void setError(const Exception & exception) override;
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;