Corrections after reworking synchronization of ON CLUSTER BACKUPs/RESTOREs #2.

This commit is contained in:
Vitaly Baranov 2024-11-14 12:18:56 +01:00
parent 19bcc5550b
commit 380aea0fc3
2 changed files with 20 additions and 14 deletions

View File

@ -121,8 +121,7 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(
try
{
concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_);
createStartAndAliveNodes();
createStartAndAliveNodesAndCheckConcurrency(concurrency_counters_);
startWatchingThread();
}
catch (...)
@ -221,7 +220,7 @@ void BackupCoordinationStageSync::createRootNodes()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected path in ZooKeeper specified: {}", zookeeper_path);
}
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createRootNodes", WithRetries::kInitialization);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::createRootNodes", WithRetries::kInitialization);
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
@ -232,18 +231,22 @@ void BackupCoordinationStageSync::createRootNodes()
}
void BackupCoordinationStageSync::createStartAndAliveNodes()
void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(BackupConcurrencyCounters & concurrency_counters_)
{
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createStartAndAliveNodes", WithRetries::kInitialization);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::createStartAndAliveNodes", WithRetries::kInitialization);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
createStartAndAliveNodes(zookeeper);
createStartAndAliveNodesAndCheckConcurrency(zookeeper);
});
/// The local concurrency check should be done here after BackupCoordinationStageSync::checkConcurrency() checked that
/// there are no 'alive' nodes corresponding to other backups or restores.
local_concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_);
}
void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
{
/// The "num_hosts" node keeps the number of hosts which started (created the "started" node)
/// but not yet finished (not created the "finished" node).
@ -464,7 +467,7 @@ void BackupCoordinationStageSync::watchingThread()
try
{
/// Recreate the 'alive' node if necessary and read a new state from ZooKeeper.
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::watchingThread");
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::watchingThread");
auto & zookeeper = holder.faulty_zookeeper;
with_retries.renewZooKeeper(zookeeper);
@ -496,6 +499,9 @@ void BackupCoordinationStageSync::watchingThread()
tryLogCurrentException(log, "Caught exception while watching");
}
if (should_stop())
return;
zk_nodes_changed->tryWait(sync_period_ms.count());
}
}
@ -769,7 +775,7 @@ void BackupCoordinationStageSync::setStage(const String & stage, const String &
stopWatchingThread();
}
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setStage");
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::setStage");
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
@ -938,7 +944,7 @@ bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::K
try
{
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::finish", retries_kind);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::finish", retries_kind);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
@ -1309,7 +1315,7 @@ bool BackupCoordinationStageSync::setError(const Exception & exception, bool thr
}
}
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setError", WithRetries::kErrorHandling);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::setError", WithRetries::kErrorHandling);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);

View File

@ -72,8 +72,8 @@ private:
void createRootNodes();
/// Atomically creates both 'start' and 'alive' nodes and also checks that there is no concurrent backup or restore if `allow_concurrency` is false.
void createStartAndAliveNodes();
void createStartAndAliveNodes(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
void createStartAndAliveNodesAndCheckConcurrency(BackupConcurrencyCounters & concurrency_counters_);
void createStartAndAliveNodesAndCheckConcurrency(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
/// Deserialize the version of a node stored in the 'start' node.
int parseStartNode(const String & start_node_contents, const String & host) const;
@ -171,7 +171,7 @@ private:
const String alive_node_path;
const String alive_tracker_node_path;
std::optional<BackupConcurrencyCheck> concurrency_check;
std::optional<BackupConcurrencyCheck> local_concurrency_check;
std::shared_ptr<Poco::Event> zk_nodes_changed;