diff --git a/src/Backups/BackupCoordinationStageSync.cpp b/src/Backups/BackupCoordinationStageSync.cpp index fcf09d7c315..df5f08091ba 100644 --- a/src/Backups/BackupCoordinationStageSync.cpp +++ b/src/Backups/BackupCoordinationStageSync.cpp @@ -121,8 +121,7 @@ BackupCoordinationStageSync::BackupCoordinationStageSync( try { - concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_); - createStartAndAliveNodes(); + createStartAndAliveNodesAndCheckConcurrency(concurrency_counters_); startWatchingThread(); } catch (...) @@ -221,7 +220,7 @@ void BackupCoordinationStageSync::createRootNodes() throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected path in ZooKeeper specified: {}", zookeeper_path); } - auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createRootNodes", WithRetries::kInitialization); + auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::createRootNodes", WithRetries::kInitialization); holder.retries_ctl.retryLoop( [&, &zookeeper = holder.faulty_zookeeper]() { @@ -232,18 +231,22 @@ void BackupCoordinationStageSync::createRootNodes() } -void BackupCoordinationStageSync::createStartAndAliveNodes() +void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(BackupConcurrencyCounters & concurrency_counters_) { - auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createStartAndAliveNodes", WithRetries::kInitialization); + auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::createStartAndAliveNodes", WithRetries::kInitialization); holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]() { with_retries.renewZooKeeper(zookeeper); - createStartAndAliveNodes(zookeeper); + createStartAndAliveNodesAndCheckConcurrency(zookeeper); }); + + /// The local concurrency check should be done here after BackupCoordinationStageSync::checkConcurrency() checked that + /// there are no 'alive' nodes corresponding to other backups or restores. + local_concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_); } -void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper) +void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper) { /// The "num_hosts" node keeps the number of hosts which started (created the "started" node) /// but not yet finished (not created the "finished" node). @@ -464,7 +467,7 @@ void BackupCoordinationStageSync::watchingThread() try { /// Recreate the 'alive' node if necessary and read a new state from ZooKeeper. - auto holder = with_retries.createRetriesControlHolder("BackupStageSync::watchingThread"); + auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::watchingThread"); auto & zookeeper = holder.faulty_zookeeper; with_retries.renewZooKeeper(zookeeper); @@ -496,6 +499,9 @@ void BackupCoordinationStageSync::watchingThread() tryLogCurrentException(log, "Caught exception while watching"); } + if (should_stop()) + return; + zk_nodes_changed->tryWait(sync_period_ms.count()); } } @@ -769,7 +775,7 @@ void BackupCoordinationStageSync::setStage(const String & stage, const String & stopWatchingThread(); } - auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setStage"); + auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::setStage"); holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]() { with_retries.renewZooKeeper(zookeeper); @@ -938,7 +944,7 @@ bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::K try { - auto holder = with_retries.createRetriesControlHolder("BackupStageSync::finish", retries_kind); + auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::finish", retries_kind); holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]() { with_retries.renewZooKeeper(zookeeper); @@ -1309,7 +1315,7 @@ bool BackupCoordinationStageSync::setError(const Exception & exception, bool thr } } - auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setError", WithRetries::kErrorHandling); + auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::setError", WithRetries::kErrorHandling); holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]() { with_retries.renewZooKeeper(zookeeper); diff --git a/src/Backups/BackupCoordinationStageSync.h b/src/Backups/BackupCoordinationStageSync.h index 11d3d1cf6f4..879b2422b84 100644 --- a/src/Backups/BackupCoordinationStageSync.h +++ b/src/Backups/BackupCoordinationStageSync.h @@ -72,8 +72,8 @@ private: void createRootNodes(); /// Atomically creates both 'start' and 'alive' nodes and also checks that there is no concurrent backup or restore if `allow_concurrency` is false. - void createStartAndAliveNodes(); - void createStartAndAliveNodes(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper); + void createStartAndAliveNodesAndCheckConcurrency(BackupConcurrencyCounters & concurrency_counters_); + void createStartAndAliveNodesAndCheckConcurrency(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper); /// Deserialize the version of a node stored in the 'start' node. int parseStartNode(const String & start_node_contents, const String & host) const; @@ -171,7 +171,7 @@ private: const String alive_node_path; const String alive_tracker_node_path; - std::optional concurrency_check; + std::optional local_concurrency_check; std::shared_ptr zk_nodes_changed;