Merge pull request #57764 from vitlibar/fix-retries-for-disconnected-nodes-for-backup-on-cluster

Fix retries for disconnected nodes for BACKUP/RESTORE ON CLUSTER
This commit is contained in:
Vitaly Baranov 2023-12-13 13:06:32 +01:00 committed by GitHub
commit 96bbff3af0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 91 deletions

View File

@ -184,12 +184,12 @@ BackupCoordinationRemote::BackupCoordinationRemote(
if (my_is_internal)
{
String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS)
zk->handleEphemeralNodeExistenceNoFailureInjection(alive_node_path, "");
else if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException::fromPath(code, alive_node_path);
/// Delete the ephemeral node from the previous connection so we don't have to wait for keeper to do it automatically.
zk->tryRemove(alive_node_path);
zk->createAncestors(alive_node_path);
zk->create(alive_node_path, "", zkutil::CreateMode::Ephemeral);
}
})
{

View File

@ -60,12 +60,6 @@ void BackupCoordinationStageSync::set(const String & current_host, const String
}
else
{
/// Make an ephemeral node so the initiator can track if the current host is still working.
String alive_node_path = zookeeper_path + "/alive|" + current_host;
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
throw zkutil::KeeperException::fromPath(code, alive_node_path);
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message);
}
@ -106,39 +100,36 @@ Strings BackupCoordinationStageSync::waitFor(const Strings & all_hosts, const St
namespace
{
struct UnreadyHostState
struct UnreadyHost
{
String host;
bool started = false;
bool alive = false;
};
}
struct BackupCoordinationStageSync::State
{
Strings results;
std::map<String, UnreadyHostState> unready_hosts;
std::optional<Strings> results;
std::optional<std::pair<String, Exception>> error;
std::optional<String> host_terminated;
std::optional<String> disconnected_host;
std::optional<UnreadyHost> unready_host;
};
BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState(
const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const
WithRetries::RetriesControlHolder & retries_control_holder,
const Strings & zk_nodes,
const Strings & all_hosts,
const String & stage_to_wait) const
{
auto zookeeper = retries_control_holder.faulty_zookeeper;
auto & retries_ctl = retries_control_holder.retries_ctl;
std::unordered_set<std::string_view> zk_nodes_set{zk_nodes.begin(), zk_nodes.end()};
State state;
if (zk_nodes_set.contains("error"))
{
String errors;
{
auto holder = with_retries.createRetriesControlHolder("readCurrentState");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
errors = zookeeper->get(zookeeper_path + "/error");
});
}
String errors = zookeeper->get(zookeeper_path + "/error");
ReadBufferFromOwnString buf{errors};
String host;
readStringBinary(host, buf);
@ -146,64 +137,50 @@ BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState
return state;
}
std::optional<UnreadyHost> unready_host;
for (const auto & host : all_hosts)
{
if (!zk_nodes_set.contains("current|" + host + "|" + stage_to_wait))
{
UnreadyHostState unready_host_state;
const String started_node_name = "started|" + host;
const String alive_node_name = "alive|" + host;
const String alive_node_path = zookeeper_path + "/" + alive_node_name;
unready_host_state.started = zk_nodes_set.contains(started_node_name);
/// Because we do retries everywhere we can't fully rely on ephemeral nodes anymore.
/// Though we recreate "alive" node when reconnecting it might be not enough and race condition is possible.
/// And everything we can do here - just retry.
/// In worst case when we won't manage to see the alive node for a long time we will just abort the backup.
unready_host_state.alive = zk_nodes_set.contains(alive_node_name);
if (!unready_host_state.alive)
bool started = zk_nodes_set.contains(started_node_name);
bool alive = zk_nodes_set.contains(alive_node_name);
if (!alive)
{
LOG_TRACE(log, "Seems like host ({}) is dead. Will retry the check to confirm", host);
auto holder = with_retries.createRetriesControlHolder("readCurrentState::checkAliveNode");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
if (zookeeper->existsNoFailureInjection(alive_node_path))
{
unready_host_state.alive = true;
return;
}
// Retry with backoff. We also check whether it is last retry or no, because we won't to rethrow an exception.
if (!holder.retries_ctl.isLastRetry())
holder.retries_ctl.setKeeperError(Coordination::Error::ZNONODE, "There is no alive node for host {}. Will retry", host);
});
/// If the "alive" node doesn't exist then we don't have connection to the corresponding host.
/// This node is ephemeral so probably it will be recreated soon. We use zookeeper retries to wait.
/// In worst case when we won't manage to see the alive node for a long time we will just abort the backup.
String message;
if (started)
message = fmt::format("Lost connection to host {}", host);
else
message = fmt::format("No connection to host {} yet", host);
if (!retries_ctl.isLastRetry())
message += ", will retry";
retries_ctl.setUserError(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, message);
state.disconnected_host = host;
return state;
}
LOG_TRACE(log, "Host ({}) appeared to be {}", host, unready_host_state.alive ? "alive" : "dead");
state.unready_hosts.emplace(host, unready_host_state);
if (!unready_host_state.alive && unready_host_state.started && !state.host_terminated)
state.host_terminated = host;
if (!unready_host)
unready_host.emplace(UnreadyHost{.host = host, .started = started});
}
}
if (state.host_terminated || !state.unready_hosts.empty())
return state;
auto holder = with_retries.createRetriesControlHolder("waitImpl::collectStagesToWait");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
if (unready_host)
{
with_retries.renewZooKeeper(zookeeper);
Strings results;
state.unready_host = std::move(unready_host);
return state;
}
for (const auto & host : all_hosts)
results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait));
state.results = std::move(results);
});
Strings results;
for (const auto & host : all_hosts)
results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait));
state.results = std::move(results);
return state;
}
@ -229,7 +206,7 @@ Strings BackupCoordinationStageSync::waitImpl(
auto watch = std::make_shared<Poco::Event>();
Strings zk_nodes;
{
auto holder = with_retries.createRetriesControlHolder("waitImpl::getChildren");
auto holder = with_retries.createRetriesControlHolder("waitImpl");
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
@ -237,17 +214,23 @@ Strings BackupCoordinationStageSync::waitImpl(
watch->reset();
/// Get zk nodes and subscribe on their changes.
zk_nodes = zookeeper->getChildren(zookeeper_path, nullptr, watch);
/// Read the current state of zk nodes.
state = readCurrentState(holder, zk_nodes, all_hosts, stage_to_wait);
});
}
/// Read and analyze the current state of zk nodes.
state = readCurrentState(zk_nodes, all_hosts, stage_to_wait);
if (state.error || state.host_terminated || state.unready_hosts.empty())
break; /// Error happened or everything is ready.
/// Analyze the current state of zk nodes.
chassert(state.results || state.error || state.disconnected_host || state.unready_host);
/// Log that we will wait
const auto & unready_host = state.unready_hosts.begin()->first;
LOG_INFO(log, "Waiting on ZooKeeper watch for any node to be changed (currently waiting for host {})", unready_host);
if (state.results || state.error || state.disconnected_host)
break; /// Everything is ready or error happened.
/// Log what we will wait.
const auto & unready_host = *state.unready_host;
LOG_INFO(log, "Waiting on ZooKeeper watch for any node to be changed (currently waiting for host {}{})",
unready_host.host,
(!unready_host.started ? " which didn't start the operation yet" : ""));
/// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed.
{
@ -270,23 +253,23 @@ Strings BackupCoordinationStageSync::waitImpl(
state.error->second.rethrow();
/// Another host terminated without errors.
if (state.host_terminated)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Host {} suddenly stopped working", *state.host_terminated);
if (state.disconnected_host)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "No connection to host {}", *state.disconnected_host);
/// Something's unready, timeout is probably not enough.
if (!state.unready_hosts.empty())
if (state.unready_host)
{
const auto & [unready_host, unready_host_state] = *state.unready_hosts.begin();
const auto & unready_host = *state.unready_host;
throw Exception(
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited for host {} too long (> {}){}",
unready_host,
unready_host.host,
to_string(*timeout),
unready_host_state.started ? "" : ": Operation didn't start");
unready_host.started ? "" : ": Operation didn't start");
}
LOG_TRACE(log, "Everything is Ok. All hosts achieved stage {}", stage_to_wait);
return state.results;
return std::move(*state.results);
}
}

View File

@ -29,7 +29,7 @@ private:
void createRootNodes();
struct State;
State readCurrentState(const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const;
State readCurrentState(WithRetries::RetriesControlHolder & retries_control_holder, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const;
Strings waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional<std::chrono::milliseconds> timeout) const;

View File

@ -43,12 +43,12 @@ RestoreCoordinationRemote::RestoreCoordinationRemote(
if (my_is_internal)
{
String alive_node_path = my_zookeeper_path + "/stage/alive|" + my_current_host;
auto code = zk->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNODEEXISTS)
zk->handleEphemeralNodeExistenceNoFailureInjection(alive_node_path, "");
else if (code != Coordination::Error::ZOK)
throw zkutil::KeeperException::fromPath(code, alive_node_path);
/// Delete the ephemeral node from the previous connection so we don't have to wait for keeper to do it automatically.
zk->tryRemove(alive_node_path);
zk->createAncestors(alive_node_path);
zk->create(alive_node_path, "", zkutil::CreateMode::Ephemeral);
}
})
{