mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
Improve synchronization between hosts in distributed backup.
Use ephemeral zk nodes to check other hosts for termination.
This commit is contained in:
parent
150e058be9
commit
dc392cd4d3
@ -13,20 +13,20 @@ using FileInfo = IBackupCoordination::FileInfo;
|
||||
BackupCoordinationLocal::BackupCoordinationLocal() = default;
|
||||
BackupCoordinationLocal::~BackupCoordinationLocal() = default;
|
||||
|
||||
void BackupCoordinationLocal::setStatus(const String &, const String &, const String &)
|
||||
void BackupCoordinationLocal::setStage(const String &, const String &, const String &)
|
||||
{
|
||||
}
|
||||
|
||||
void BackupCoordinationLocal::setErrorStatus(const String &, const Exception &)
|
||||
void BackupCoordinationLocal::setError(const String &, const Exception &)
|
||||
{
|
||||
}
|
||||
|
||||
Strings BackupCoordinationLocal::waitStatus(const Strings &, const String &)
|
||||
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
Strings BackupCoordinationLocal::waitStatusFor(const Strings &, const String &, UInt64)
|
||||
Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
@ -20,10 +20,10 @@ public:
|
||||
BackupCoordinationLocal();
|
||||
~BackupCoordinationLocal() override;
|
||||
|
||||
void setStatus(const String & current_host, const String & new_status, const String & message) override;
|
||||
void setErrorStatus(const String & current_host, const Exception & exception) override;
|
||||
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
|
||||
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
|
||||
void setStage(const String & current_host, const String & new_stage, const String & message) override;
|
||||
void setError(const String & current_host, const Exception & exception) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
|
||||
|
||||
void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
|
||||
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;
|
||||
|
@ -165,15 +165,28 @@ namespace
|
||||
constexpr size_t NUM_ATTEMPTS = 10;
|
||||
}
|
||||
|
||||
BackupCoordinationRemote::BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
|
||||
BackupCoordinationRemote::BackupCoordinationRemote(
|
||||
const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_)
|
||||
: zookeeper_path(zookeeper_path_)
|
||||
, get_zookeeper(get_zookeeper_)
|
||||
, status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("BackupCoordination"))
|
||||
, remove_zk_nodes_in_destructor(remove_zk_nodes_in_destructor_)
|
||||
, stage_sync(zookeeper_path_ + "/stage", get_zookeeper_, &Poco::Logger::get("BackupCoordination"))
|
||||
{
|
||||
createRootNodes();
|
||||
}
|
||||
|
||||
BackupCoordinationRemote::~BackupCoordinationRemote() = default;
|
||||
BackupCoordinationRemote::~BackupCoordinationRemote()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (remove_zk_nodes_in_destructor)
|
||||
removeAllNodes();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void BackupCoordinationRemote::createRootNodes()
|
||||
{
|
||||
@ -196,24 +209,24 @@ void BackupCoordinationRemote::removeAllNodes()
|
||||
}
|
||||
|
||||
|
||||
void BackupCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message)
|
||||
void BackupCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
|
||||
{
|
||||
status_sync.set(current_host, new_status, message);
|
||||
stage_sync.set(current_host, new_stage, message);
|
||||
}
|
||||
|
||||
void BackupCoordinationRemote::setErrorStatus(const String & current_host, const Exception & exception)
|
||||
void BackupCoordinationRemote::setError(const String & current_host, const Exception & exception)
|
||||
{
|
||||
status_sync.setError(current_host, exception);
|
||||
stage_sync.setError(current_host, exception);
|
||||
}
|
||||
|
||||
Strings BackupCoordinationRemote::waitStatus(const Strings & all_hosts, const String & status_to_wait)
|
||||
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
|
||||
{
|
||||
return status_sync.wait(all_hosts, status_to_wait);
|
||||
return stage_sync.wait(all_hosts, stage_to_wait);
|
||||
}
|
||||
|
||||
Strings BackupCoordinationRemote::waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms)
|
||||
Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
|
||||
{
|
||||
return status_sync.waitFor(all_hosts, status_to_wait, timeout_ms);
|
||||
return stage_sync.waitFor(all_hosts, stage_to_wait, timeout);
|
||||
}
|
||||
|
||||
|
||||
@ -565,9 +578,4 @@ Strings BackupCoordinationRemote::getAllArchiveSuffixes() const
|
||||
return node_names;
|
||||
}
|
||||
|
||||
void BackupCoordinationRemote::drop()
|
||||
{
|
||||
removeAllNodes();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Backups/IBackupCoordination.h>
|
||||
#include <Backups/BackupCoordinationReplicatedAccess.h>
|
||||
#include <Backups/BackupCoordinationReplicatedTables.h>
|
||||
#include <Backups/BackupCoordinationStatusSync.h>
|
||||
#include <Backups/BackupCoordinationStageSync.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -13,13 +13,13 @@ namespace DB
|
||||
class BackupCoordinationRemote : public IBackupCoordination
|
||||
{
|
||||
public:
|
||||
BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
|
||||
BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_);
|
||||
~BackupCoordinationRemote() override;
|
||||
|
||||
void setStatus(const String & current_host, const String & new_status, const String & message) override;
|
||||
void setErrorStatus(const String & current_host, const Exception & exception) override;
|
||||
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
|
||||
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
|
||||
void setStage(const String & current_host, const String & new_stage, const String & message) override;
|
||||
void setError(const String & current_host, const Exception & exception) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
|
||||
|
||||
void addReplicatedPartNames(
|
||||
const String & table_shared_id,
|
||||
@ -56,8 +56,6 @@ public:
|
||||
String getNextArchiveSuffix() override;
|
||||
Strings getAllArchiveSuffixes() const override;
|
||||
|
||||
void drop() override;
|
||||
|
||||
private:
|
||||
void createRootNodes();
|
||||
void removeAllNodes();
|
||||
@ -66,8 +64,9 @@ private:
|
||||
|
||||
const String zookeeper_path;
|
||||
const zkutil::GetZooKeeper get_zookeeper;
|
||||
const bool remove_zk_nodes_in_destructor;
|
||||
|
||||
BackupCoordinationStatusSync status_sync;
|
||||
BackupCoordinationStageSync stage_sync;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
mutable std::optional<BackupCoordinationReplicatedTables> replicated_tables;
|
||||
|
228
src/Backups/BackupCoordinationStageSync.cpp
Normal file
228
src/Backups/BackupCoordinationStageSync.cpp
Normal file
@ -0,0 +1,228 @@
|
||||
#include <Backups/BackupCoordinationStageSync.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <base/chrono_io.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
|
||||
}
|
||||
|
||||
|
||||
BackupCoordinationStageSync::BackupCoordinationStageSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_)
|
||||
: zookeeper_path(zookeeper_path_)
|
||||
, get_zookeeper(get_zookeeper_)
|
||||
, log(log_)
|
||||
{
|
||||
createRootNodes();
|
||||
}
|
||||
|
||||
void BackupCoordinationStageSync::createRootNodes()
|
||||
{
|
||||
auto zookeeper = get_zookeeper();
|
||||
zookeeper->createAncestors(zookeeper_path);
|
||||
zookeeper->createIfNotExists(zookeeper_path, "");
|
||||
}
|
||||
|
||||
void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message)
|
||||
{
|
||||
auto zookeeper = get_zookeeper();
|
||||
|
||||
/// Make an ephemeral node so the initiator can track if the current host is still working.
|
||||
String alive_node_path = zookeeper_path + "/alive|" + current_host;
|
||||
auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral);
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
|
||||
throw zkutil::KeeperException(code, alive_node_path);
|
||||
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, "");
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message);
|
||||
}
|
||||
|
||||
void BackupCoordinationStageSync::setError(const String & current_host, const Exception & exception)
|
||||
{
|
||||
auto zookeeper = get_zookeeper();
|
||||
WriteBufferFromOwnString buf;
|
||||
writeStringBinary(current_host, buf);
|
||||
writeException(exception, buf, true);
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str());
|
||||
}
|
||||
|
||||
Strings BackupCoordinationStageSync::wait(const Strings & all_hosts, const String & stage_to_wait)
|
||||
{
|
||||
return waitImpl(all_hosts, stage_to_wait, {});
|
||||
}
|
||||
|
||||
Strings BackupCoordinationStageSync::waitFor(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
|
||||
{
|
||||
return waitImpl(all_hosts, stage_to_wait, timeout);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
struct UnreadyHostState
|
||||
{
|
||||
bool started = false;
|
||||
bool alive = false;
|
||||
};
|
||||
}
|
||||
|
||||
struct BackupCoordinationStageSync::State
|
||||
{
|
||||
Strings results;
|
||||
std::map<String, UnreadyHostState> unready_hosts;
|
||||
std::optional<std::pair<String, Exception>> error;
|
||||
std::optional<String> host_terminated;
|
||||
};
|
||||
|
||||
BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState(
|
||||
zkutil::ZooKeeperPtr zookeeper, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const
|
||||
{
|
||||
std::unordered_set<std::string_view> zk_nodes_set{zk_nodes.begin(), zk_nodes.end()};
|
||||
|
||||
State state;
|
||||
if (zk_nodes_set.contains("error"))
|
||||
{
|
||||
ReadBufferFromOwnString buf{zookeeper->get(zookeeper_path + "/error")};
|
||||
String host;
|
||||
readStringBinary(host, buf);
|
||||
state.error = std::make_pair(host, readException(buf, fmt::format("Got error from {}", host)));
|
||||
return state;
|
||||
}
|
||||
|
||||
for (const auto & host : all_hosts)
|
||||
{
|
||||
if (!zk_nodes_set.contains("current|" + host + "|" + stage_to_wait))
|
||||
{
|
||||
UnreadyHostState unready_host_state;
|
||||
unready_host_state.started = zk_nodes_set.contains("started|" + host);
|
||||
unready_host_state.alive = zk_nodes_set.contains("alive|" + host);
|
||||
state.unready_hosts.emplace(host, unready_host_state);
|
||||
if (!unready_host_state.alive && unready_host_state.started && !state.host_terminated)
|
||||
state.host_terminated = host;
|
||||
}
|
||||
}
|
||||
|
||||
if (state.host_terminated || !state.unready_hosts.empty())
|
||||
return state;
|
||||
|
||||
state.results.reserve(all_hosts.size());
|
||||
for (const auto & host : all_hosts)
|
||||
state.results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait));
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
Strings BackupCoordinationStageSync::waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional<std::chrono::milliseconds> timeout) const
|
||||
{
|
||||
if (all_hosts.empty())
|
||||
return {};
|
||||
|
||||
/// Wait until all hosts are ready or an error happens or time is out.
|
||||
|
||||
auto zookeeper = get_zookeeper();
|
||||
|
||||
struct Watch
|
||||
{
|
||||
std::mutex mutex;
|
||||
std::condition_variable event;
|
||||
bool zk_nodes_changed = false;
|
||||
bool watch_set = false;
|
||||
};
|
||||
|
||||
/// shared_ptr because `watch_callback` can be called by ZooKeeper after leaving this function's scope.
|
||||
auto watch = std::make_shared<Watch>();
|
||||
|
||||
/// Called by ZooKepper when list of zk nodes have changed.
|
||||
auto watch_callback = [watch](const Coordination::WatchResponse &)
|
||||
{
|
||||
std::lock_guard lock{watch->mutex};
|
||||
watch->zk_nodes_changed = true;
|
||||
watch->watch_set = false; /// When it's triggered ZooKeeper resets the watch so we need to call getChildrenWatch() again.
|
||||
watch->event.notify_all();
|
||||
};
|
||||
|
||||
auto zk_nodes_changed = [watch] { return watch->zk_nodes_changed; };
|
||||
|
||||
bool use_timeout = timeout.has_value();
|
||||
std::chrono::steady_clock::time_point end_of_timeout;
|
||||
if (use_timeout)
|
||||
end_of_timeout = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(*timeout);
|
||||
|
||||
State state;
|
||||
|
||||
String previous_unready_host; /// Used for logging: we don't want to log the same unready host again.
|
||||
|
||||
for (;;)
|
||||
{
|
||||
/// Get zk nodes and subscribe on their changes.
|
||||
{
|
||||
std::lock_guard lock{watch->mutex};
|
||||
watch->watch_set = true;
|
||||
watch->zk_nodes_changed = false;
|
||||
}
|
||||
Strings zk_nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback);
|
||||
|
||||
/// Read and analyze the current state of zk nodes.
|
||||
state = readCurrentState(zookeeper, zk_nodes, all_hosts, stage_to_wait);
|
||||
if (state.error || state.host_terminated || state.unready_hosts.empty())
|
||||
break; /// Error happened or everything is ready.
|
||||
|
||||
/// Log that we will wait for another host.
|
||||
const auto & unready_host = state.unready_hosts.begin()->first;
|
||||
if (unready_host != previous_unready_host)
|
||||
{
|
||||
LOG_TRACE(log, "Waiting for host {}", unready_host);
|
||||
previous_unready_host = unready_host;
|
||||
}
|
||||
|
||||
/// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed.
|
||||
{
|
||||
std::unique_lock lock{watch->mutex};
|
||||
if (use_timeout)
|
||||
{
|
||||
auto current_time = std::chrono::steady_clock::now();
|
||||
if ((current_time > end_of_timeout) || !watch->event.wait_for(lock, end_of_timeout - current_time, zk_nodes_changed))
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
watch->event.wait(lock, zk_nodes_changed);
|
||||
}
|
||||
assert(watch->zk_nodes_changed);
|
||||
assert(!watch->watch_set);
|
||||
}
|
||||
}
|
||||
|
||||
/// Rethrow an error raised originally on another host.
|
||||
if (state.error)
|
||||
state.error->second.rethrow();
|
||||
|
||||
/// Another host terminated without errors.
|
||||
if (state.host_terminated)
|
||||
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Host {} suddenly stopped working", *state.host_terminated);
|
||||
|
||||
/// Something's unready, timeout is probably not enough.
|
||||
if (!state.unready_hosts.empty())
|
||||
{
|
||||
const auto & [unready_host, unready_host_state] = *state.unready_hosts.begin();
|
||||
throw Exception(
|
||||
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
|
||||
"Waited for host {} too long (> {}){}",
|
||||
unready_host,
|
||||
to_string(*timeout),
|
||||
unready_host_state.started ? "" : ": Operation didn't start");
|
||||
}
|
||||
|
||||
return state.results;
|
||||
}
|
||||
|
||||
}
|
39
src/Backups/BackupCoordinationStageSync.h
Normal file
39
src/Backups/BackupCoordinationStageSync.h
Normal file
@ -0,0 +1,39 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Used to coordinate hosts so all hosts would come to a specific stage at around the same time.
|
||||
class BackupCoordinationStageSync
|
||||
{
|
||||
public:
|
||||
BackupCoordinationStageSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
|
||||
|
||||
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
|
||||
void set(const String & current_host, const String & new_stage, const String & message);
|
||||
void setError(const String & current_host, const Exception & exception);
|
||||
|
||||
/// Sets the stage of the current host and waits until all hosts come to the same stage.
|
||||
/// The function returns the messages all hosts set when they come to the required stage.
|
||||
Strings wait(const Strings & all_hosts, const String & stage_to_wait);
|
||||
|
||||
/// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time.
|
||||
Strings waitFor(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout);
|
||||
|
||||
private:
|
||||
void createRootNodes();
|
||||
|
||||
struct State;
|
||||
State readCurrentState(zkutil::ZooKeeperPtr zookeeper, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const;
|
||||
|
||||
Strings waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional<std::chrono::milliseconds> timeout) const;
|
||||
|
||||
String zookeeper_path;
|
||||
zkutil::GetZooKeeper get_zookeeper;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
@ -1,182 +0,0 @@
|
||||
#include <Backups/BackupCoordinationStatusSync.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <base/chrono_io.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
|
||||
}
|
||||
|
||||
|
||||
BackupCoordinationStatusSync::BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_)
|
||||
: zookeeper_path(zookeeper_path_)
|
||||
, get_zookeeper(get_zookeeper_)
|
||||
, log(log_)
|
||||
{
|
||||
createRootNodes();
|
||||
}
|
||||
|
||||
void BackupCoordinationStatusSync::createRootNodes()
|
||||
{
|
||||
auto zookeeper = get_zookeeper();
|
||||
zookeeper->createAncestors(zookeeper_path);
|
||||
zookeeper->createIfNotExists(zookeeper_path, "");
|
||||
}
|
||||
|
||||
void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status, const String & message)
|
||||
{
|
||||
auto zookeeper = get_zookeeper();
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + new_status, message);
|
||||
}
|
||||
|
||||
void BackupCoordinationStatusSync::setError(const String & current_host, const Exception & exception)
|
||||
{
|
||||
auto zookeeper = get_zookeeper();
|
||||
|
||||
Exception exception2 = exception;
|
||||
exception2.addMessage("Host {}", current_host);
|
||||
WriteBufferFromOwnString buf;
|
||||
writeException(exception2, buf, true);
|
||||
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str());
|
||||
}
|
||||
|
||||
Strings BackupCoordinationStatusSync::wait(const Strings & all_hosts, const String & status_to_wait)
|
||||
{
|
||||
return waitImpl(all_hosts, status_to_wait, {});
|
||||
}
|
||||
|
||||
Strings BackupCoordinationStatusSync::waitFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms)
|
||||
{
|
||||
return waitImpl(all_hosts, status_to_wait, timeout_ms);
|
||||
}
|
||||
|
||||
Strings BackupCoordinationStatusSync::waitImpl(const Strings & all_hosts, const String & status_to_wait, std::optional<UInt64> timeout_ms)
|
||||
{
|
||||
if (all_hosts.empty())
|
||||
return {};
|
||||
|
||||
/// Wait for other hosts.
|
||||
|
||||
Strings ready_hosts_results;
|
||||
ready_hosts_results.resize(all_hosts.size());
|
||||
|
||||
std::map<String, std::vector<size_t> /* index in `ready_hosts_results` */> unready_hosts;
|
||||
for (size_t i = 0; i != all_hosts.size(); ++i)
|
||||
unready_hosts[all_hosts[i]].push_back(i);
|
||||
|
||||
std::optional<Exception> error;
|
||||
|
||||
auto zookeeper = get_zookeeper();
|
||||
|
||||
/// Process ZooKeeper's nodes and set `all_hosts_ready` or `unready_host` or `error_message`.
|
||||
auto process_zk_nodes = [&](const Strings & zk_nodes)
|
||||
{
|
||||
for (const String & zk_node : zk_nodes)
|
||||
{
|
||||
if (zk_node.starts_with("remove_watch-"))
|
||||
continue;
|
||||
|
||||
if (zk_node == "error")
|
||||
{
|
||||
ReadBufferFromOwnString buf{zookeeper->get(zookeeper_path + "/error")};
|
||||
error = readException(buf, "", true);
|
||||
break;
|
||||
}
|
||||
|
||||
size_t separator_pos = zk_node.find('|');
|
||||
if (separator_pos == String::npos)
|
||||
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Unexpected zk node {}", zookeeper_path + "/" + zk_node);
|
||||
|
||||
String host = zk_node.substr(0, separator_pos);
|
||||
String status = zk_node.substr(separator_pos + 1);
|
||||
|
||||
auto it = unready_hosts.find(host);
|
||||
if ((it != unready_hosts.end()) && (status == status_to_wait))
|
||||
{
|
||||
String result = zookeeper->get(zookeeper_path + "/" + zk_node);
|
||||
for (size_t i : it->second)
|
||||
ready_hosts_results[i] = result;
|
||||
unready_hosts.erase(it);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Wait until all hosts are ready or an error happens or time is out.
|
||||
std::atomic<bool> watch_set = false;
|
||||
std::condition_variable watch_triggered_event;
|
||||
|
||||
auto watch_callback = [&](const Coordination::WatchResponse &)
|
||||
{
|
||||
watch_set = false; /// After it's triggered it's not set until we call getChildrenWatch() again.
|
||||
watch_triggered_event.notify_all();
|
||||
};
|
||||
|
||||
auto watch_triggered = [&] { return !watch_set; };
|
||||
|
||||
bool use_timeout = timeout_ms.has_value();
|
||||
std::chrono::milliseconds timeout{timeout_ms.value_or(0)};
|
||||
std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
|
||||
std::chrono::steady_clock::duration elapsed;
|
||||
std::mutex dummy_mutex;
|
||||
String previous_unready_host;
|
||||
|
||||
while (!unready_hosts.empty() && !error)
|
||||
{
|
||||
watch_set = true;
|
||||
Strings nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback);
|
||||
process_zk_nodes(nodes);
|
||||
|
||||
if (!unready_hosts.empty() && !error)
|
||||
{
|
||||
const auto & unready_host = unready_hosts.begin()->first;
|
||||
if (unready_host != previous_unready_host)
|
||||
{
|
||||
LOG_TRACE(log, "Waiting for host {}", unready_host);
|
||||
previous_unready_host = unready_host;
|
||||
}
|
||||
|
||||
std::unique_lock dummy_lock{dummy_mutex};
|
||||
if (use_timeout)
|
||||
{
|
||||
elapsed = std::chrono::steady_clock::now() - start_time;
|
||||
if ((elapsed > timeout) || !watch_triggered_event.wait_for(dummy_lock, timeout - elapsed, watch_triggered))
|
||||
break;
|
||||
}
|
||||
else
|
||||
watch_triggered_event.wait(dummy_lock, watch_triggered);
|
||||
}
|
||||
}
|
||||
|
||||
if (watch_set)
|
||||
{
|
||||
/// Remove watch by triggering it.
|
||||
zookeeper->create(zookeeper_path + "/remove_watch-", "", zkutil::CreateMode::EphemeralSequential);
|
||||
std::unique_lock dummy_lock{dummy_mutex};
|
||||
watch_triggered_event.wait(dummy_lock, watch_triggered);
|
||||
}
|
||||
|
||||
if (error)
|
||||
error->rethrow();
|
||||
|
||||
if (!unready_hosts.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
|
||||
"Waited for host {} too long ({})",
|
||||
unready_hosts.begin()->first,
|
||||
to_string(elapsed));
|
||||
}
|
||||
|
||||
return ready_hosts_results;
|
||||
}
|
||||
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Used to coordinate hosts so all hosts would come to a specific status at around the same time.
|
||||
class BackupCoordinationStatusSync
|
||||
{
|
||||
public:
|
||||
BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
|
||||
|
||||
/// Sets the status of the current host and signal other hosts if there were other hosts waiting for that.
|
||||
void set(const String & current_host, const String & new_status, const String & message);
|
||||
void setError(const String & current_host, const Exception & exception);
|
||||
|
||||
/// Sets the status of the current host and waits until all hosts come to the same status.
|
||||
/// The function returns the messages all hosts set when they come to the required status.
|
||||
Strings wait(const Strings & all_hosts, const String & status_to_wait);
|
||||
|
||||
/// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time.
|
||||
Strings waitFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms);
|
||||
|
||||
static constexpr const char * kErrorStatus = "error";
|
||||
|
||||
private:
|
||||
void createRootNodes();
|
||||
Strings waitImpl(const Strings & all_hosts, const String & status_to_wait, std::optional<UInt64> timeout_ms);
|
||||
|
||||
String zookeeper_path;
|
||||
zkutil::GetZooKeeper get_zookeeper;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
@ -34,16 +34,21 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
/// Finding all tables and databases which we're going to put to the backup and collecting their metadata.
|
||||
constexpr const char * kGatheringMetadataStatus = "gathering metadata";
|
||||
constexpr const char * kGatheringMetadataStage = "gathering metadata";
|
||||
|
||||
String formatGatheringMetadataStage(size_t pass)
|
||||
{
|
||||
return fmt::format("{} ({})", kGatheringMetadataStage, pass);
|
||||
}
|
||||
|
||||
/// Making temporary hard links and prepare backup entries.
|
||||
constexpr const char * kExtractingDataFromTablesStatus = "extracting data from tables";
|
||||
constexpr const char * kExtractingDataFromTablesStage = "extracting data from tables";
|
||||
|
||||
/// Running special tasks for replicated tables which can also prepare some backup entries.
|
||||
constexpr const char * kRunningPostTasksStatus = "running post-tasks";
|
||||
constexpr const char * kRunningPostTasksStage = "running post-tasks";
|
||||
|
||||
/// Writing backup entries to the backup and removing temporary hard links.
|
||||
constexpr const char * kWritingBackupStatus = "writing backup";
|
||||
constexpr const char * kWritingBackupStage = "writing backup";
|
||||
|
||||
/// Uppercases the first character of a passed string.
|
||||
String toUpperFirst(const String & str)
|
||||
@ -90,7 +95,8 @@ BackupEntriesCollector::BackupEntriesCollector(
|
||||
, backup_settings(backup_settings_)
|
||||
, backup_coordination(backup_coordination_)
|
||||
, context(context_)
|
||||
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 300000))
|
||||
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
|
||||
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000))
|
||||
, log(&Poco::Logger::get("BackupEntriesCollector"))
|
||||
{
|
||||
}
|
||||
@ -100,7 +106,7 @@ BackupEntriesCollector::~BackupEntriesCollector() = default;
|
||||
BackupEntries BackupEntriesCollector::run()
|
||||
{
|
||||
/// run() can be called onle once.
|
||||
if (!current_status.empty())
|
||||
if (!current_stage.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Already making backup entries");
|
||||
|
||||
/// Find other hosts working along with us to execute this ON CLUSTER query.
|
||||
@ -123,36 +129,40 @@ BackupEntries BackupEntriesCollector::run()
|
||||
makeBackupEntriesForTablesDefs();
|
||||
|
||||
/// Make backup entries for the data of the found tables.
|
||||
setStatus(kExtractingDataFromTablesStatus);
|
||||
setStage(kExtractingDataFromTablesStage);
|
||||
makeBackupEntriesForTablesData();
|
||||
|
||||
/// Run all the tasks added with addPostCollectingTask().
|
||||
setStatus(kRunningPostTasksStatus);
|
||||
setStage(kRunningPostTasksStage);
|
||||
runPostTasks();
|
||||
|
||||
/// No more backup entries or tasks are allowed after this point.
|
||||
setStatus(kWritingBackupStatus);
|
||||
setStage(kWritingBackupStage);
|
||||
|
||||
return std::move(backup_entries);
|
||||
}
|
||||
|
||||
Strings BackupEntriesCollector::setStatus(const String & new_status, const String & message)
|
||||
Strings BackupEntriesCollector::setStage(const String & new_stage, const String & message)
|
||||
{
|
||||
LOG_TRACE(log, "{}", toUpperFirst(new_status));
|
||||
current_status = new_status;
|
||||
LOG_TRACE(log, "{}", toUpperFirst(new_stage));
|
||||
current_stage = new_stage;
|
||||
|
||||
backup_coordination->setStatus(backup_settings.host_id, new_status, message);
|
||||
backup_coordination->setStage(backup_settings.host_id, new_stage, message);
|
||||
|
||||
if (new_status.starts_with(kGatheringMetadataStatus))
|
||||
if (new_stage == formatGatheringMetadataStage(1))
|
||||
{
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
auto end_of_timeout = std::max(now, consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout);
|
||||
return backup_coordination->waitStatusFor(
|
||||
all_hosts, new_status, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - now).count());
|
||||
return backup_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
|
||||
}
|
||||
else if (new_stage.starts_with(kGatheringMetadataStage))
|
||||
{
|
||||
auto current_time = std::chrono::steady_clock::now();
|
||||
auto end_of_timeout = std::max(current_time, consistent_metadata_snapshot_end_time);
|
||||
return backup_coordination->waitForStage(
|
||||
all_hosts, new_stage, std::chrono::duration_cast<std::chrono::milliseconds>(end_of_timeout - current_time));
|
||||
}
|
||||
else
|
||||
{
|
||||
return backup_coordination->waitStatus(all_hosts, new_status);
|
||||
return backup_coordination->waitForStage(all_hosts, new_stage);
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,18 +183,18 @@ void BackupEntriesCollector::calculateRootPathInBackup()
|
||||
/// Finds databases and tables which we will put to the backup.
|
||||
void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
|
||||
{
|
||||
consistent_metadata_snapshot_start_time = std::chrono::steady_clock::now();
|
||||
auto end_of_timeout = consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout;
|
||||
setStatus(fmt::format("{} ({})", kGatheringMetadataStatus, 1));
|
||||
setStage(formatGatheringMetadataStage(1));
|
||||
|
||||
consistent_metadata_snapshot_end_time = std::chrono::steady_clock::now() + consistent_metadata_snapshot_timeout;
|
||||
|
||||
for (size_t pass = 1;; ++pass)
|
||||
{
|
||||
String new_status = fmt::format("{} ({})", kGatheringMetadataStatus, pass + 1);
|
||||
String next_stage = formatGatheringMetadataStage(pass + 1);
|
||||
std::optional<Exception> inconsistency_error;
|
||||
if (tryGatherMetadataAndCompareWithPrevious(inconsistency_error))
|
||||
{
|
||||
/// Gathered metadata and checked consistency, cool! But we have to check that other hosts cope with that too.
|
||||
auto all_hosts_results = setStatus(new_status, "consistent");
|
||||
auto all_hosts_results = setStage(next_stage, "consistent");
|
||||
|
||||
std::optional<String> host_with_inconsistency;
|
||||
std::optional<String> inconsistency_error_on_other_host;
|
||||
@ -210,13 +220,13 @@ void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
|
||||
else
|
||||
{
|
||||
/// Failed to gather metadata or something wasn't consistent. We'll let other hosts know that and try again.
|
||||
setStatus(new_status, inconsistency_error->displayText());
|
||||
setStage(next_stage, inconsistency_error->displayText());
|
||||
}
|
||||
|
||||
/// Two passes is minimum (we need to compare with table names with previous ones to be sure we don't miss anything).
|
||||
if (pass >= 2)
|
||||
{
|
||||
if (std::chrono::steady_clock::now() > end_of_timeout)
|
||||
if (std::chrono::steady_clock::now() > consistent_metadata_snapshot_end_time)
|
||||
inconsistency_error->rethrow();
|
||||
else
|
||||
LOG_WARNING(log, "{}", inconsistency_error->displayText());
|
||||
@ -713,7 +723,7 @@ void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableN
|
||||
|
||||
void BackupEntriesCollector::addBackupEntry(const String & file_name, BackupEntryPtr backup_entry)
|
||||
{
|
||||
if (current_status == kWritingBackupStatus)
|
||||
if (current_stage == kWritingBackupStage)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding backup entries is not allowed");
|
||||
backup_entries.emplace_back(file_name, backup_entry);
|
||||
}
|
||||
@ -725,21 +735,21 @@ void BackupEntriesCollector::addBackupEntry(const std::pair<String, BackupEntryP
|
||||
|
||||
void BackupEntriesCollector::addBackupEntries(const BackupEntries & backup_entries_)
|
||||
{
|
||||
if (current_status == kWritingBackupStatus)
|
||||
if (current_stage == kWritingBackupStage)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of backup entries is not allowed");
|
||||
insertAtEnd(backup_entries, backup_entries_);
|
||||
}
|
||||
|
||||
void BackupEntriesCollector::addBackupEntries(BackupEntries && backup_entries_)
|
||||
{
|
||||
if (current_status == kWritingBackupStatus)
|
||||
if (current_stage == kWritingBackupStage)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of backup entries is not allowed");
|
||||
insertAtEnd(backup_entries, std::move(backup_entries_));
|
||||
}
|
||||
|
||||
void BackupEntriesCollector::addPostTask(std::function<void()> task)
|
||||
{
|
||||
if (current_status == kWritingBackupStatus)
|
||||
if (current_stage == kWritingBackupStage)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of post tasks is not allowed");
|
||||
post_tasks.push(std::move(task));
|
||||
}
|
||||
|
@ -86,12 +86,13 @@ private:
|
||||
|
||||
void runPostTasks();
|
||||
|
||||
Strings setStatus(const String & new_status, const String & message = "");
|
||||
Strings setStage(const String & new_stage, const String & message = "");
|
||||
|
||||
const ASTBackupQuery::Elements backup_query_elements;
|
||||
const BackupSettings backup_settings;
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||
ContextPtr context;
|
||||
std::chrono::milliseconds on_cluster_first_sync_timeout;
|
||||
std::chrono::milliseconds consistent_metadata_snapshot_timeout;
|
||||
Poco::Logger * log;
|
||||
|
||||
@ -129,8 +130,8 @@ private:
|
||||
std::optional<ASTs> partitions;
|
||||
};
|
||||
|
||||
String current_status;
|
||||
std::chrono::steady_clock::time_point consistent_metadata_snapshot_start_time;
|
||||
String current_stage;
|
||||
std::chrono::steady_clock::time_point consistent_metadata_snapshot_end_time;
|
||||
std::unordered_map<String, DatabaseInfo> database_infos;
|
||||
std::unordered_map<QualifiedTableName, TableInfo> table_infos;
|
||||
std::vector<std::pair<String, String>> previous_databases_metadata;
|
||||
|
@ -18,37 +18,86 @@
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
/// Coordination status meaning that a host finished its work.
|
||||
constexpr const char * kCompletedCoordinationStatus = "completed";
|
||||
constexpr const char * kCompletedStage = "completed";
|
||||
|
||||
/// Sends information about the current exception to IBackupCoordination or IRestoreCoordination.
|
||||
template <typename CoordinationType>
|
||||
void sendErrorToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host)
|
||||
std::shared_ptr<IBackupCoordination> makeBackupCoordination(const String & coordination_zk_path, const ContextPtr & context, bool is_internal_backup)
|
||||
{
|
||||
if (!coordination_zk_path.empty())
|
||||
{
|
||||
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
|
||||
return std::make_shared<BackupCoordinationRemote>(coordination_zk_path, get_zookeeper, !is_internal_backup);
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::make_shared<BackupCoordinationLocal>();
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<IRestoreCoordination> makeRestoreCoordination(const String & coordination_zk_path, const ContextPtr & context, bool is_internal_backup)
|
||||
{
|
||||
if (!coordination_zk_path.empty())
|
||||
{
|
||||
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
|
||||
return std::make_shared<RestoreCoordinationRemote>(coordination_zk_path, get_zookeeper, !is_internal_backup);
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::make_shared<RestoreCoordinationLocal>();
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends information about an exception to IBackupCoordination or IRestoreCoordination.
|
||||
template <typename CoordinationType>
|
||||
void sendExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host, const Exception & exception)
|
||||
{
|
||||
if (!coordination)
|
||||
return;
|
||||
try
|
||||
{
|
||||
coordination->setErrorStatus(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
|
||||
if (coordination)
|
||||
coordination->setError(current_host, exception);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends information about the current exception to IBackupCoordination or IRestoreCoordination.
|
||||
template <typename CoordinationType>
|
||||
void sendCurrentExceptionToCoordination(std::shared_ptr<CoordinationType> coordination, const String & current_host)
|
||||
{
|
||||
try
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
sendExceptionToCoordination(coordination, current_host, e);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)});
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to change num_active_backups.
|
||||
size_t getNumActiveBackupsChange(BackupStatus status)
|
||||
{
|
||||
return status == BackupStatus::MAKING_BACKUP;
|
||||
}
|
||||
|
||||
/// Used to change num_active_restores.
|
||||
size_t getNumActiveRestoresChange(BackupStatus status)
|
||||
{
|
||||
return status == BackupStatus::RESTORING;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -60,6 +109,7 @@ BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threa
|
||||
/// We set max_free_threads = 0 because we don't want to keep any threads if there is no BACKUP or RESTORE query running right now.
|
||||
}
|
||||
|
||||
|
||||
UUID BackupsWorker::start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context)
|
||||
{
|
||||
const ASTBackupQuery & backup_query = typeid_cast<const ASTBackupQuery &>(*backup_or_restore_query);
|
||||
@ -74,308 +124,359 @@ UUID BackupsWorker::startMakingBackup(const ASTPtr & query, const ContextPtr & c
|
||||
{
|
||||
auto backup_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
|
||||
auto backup_settings = BackupSettings::fromBackupQuery(*backup_query);
|
||||
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
|
||||
bool on_cluster = !backup_query->cluster.empty();
|
||||
|
||||
if (!backup_settings.backup_uuid)
|
||||
backup_settings.backup_uuid = UUIDHelpers::generateV4();
|
||||
UUID backup_uuid = *backup_settings.backup_uuid;
|
||||
|
||||
/// Prepare context to use.
|
||||
ContextPtr context_in_use = context;
|
||||
ContextMutablePtr mutable_context;
|
||||
if (on_cluster || backup_settings.async)
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||
if (!backup_settings.coordination_zk_path.empty())
|
||||
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
|
||||
|
||||
try
|
||||
{
|
||||
/// For ON CLUSTER queries we will need to change some settings.
|
||||
/// For ASYNC queries we have to clone the context anyway.
|
||||
context_in_use = mutable_context = Context::createCopy(context);
|
||||
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
|
||||
addInfo(backup_uuid, backup_info.toString(), BackupStatus::MAKING_BACKUP, backup_settings.internal);
|
||||
|
||||
/// Prepare context to use.
|
||||
ContextPtr context_in_use = context;
|
||||
ContextMutablePtr mutable_context;
|
||||
bool on_cluster = !backup_query->cluster.empty();
|
||||
if (on_cluster || backup_settings.async)
|
||||
{
|
||||
/// For ON CLUSTER queries we will need to change some settings.
|
||||
/// For ASYNC queries we have to clone the context anyway.
|
||||
context_in_use = mutable_context = Context::createCopy(context);
|
||||
}
|
||||
|
||||
if (backup_settings.async)
|
||||
{
|
||||
backups_thread_pool.scheduleOrThrowOnError(
|
||||
[this, backup_uuid, backup_query, backup_settings, backup_info, backup_coordination, context_in_use, mutable_context] {
|
||||
doBackup(
|
||||
backup_uuid,
|
||||
backup_query,
|
||||
backup_settings,
|
||||
backup_info,
|
||||
backup_coordination,
|
||||
context_in_use,
|
||||
mutable_context,
|
||||
true);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
doBackup(backup_uuid, backup_query, backup_settings, backup_info, backup_coordination, context_in_use, mutable_context, false);
|
||||
}
|
||||
|
||||
return backup_uuid;
|
||||
}
|
||||
|
||||
addInfo(backup_uuid, backup_info.toString(), BackupStatus::MAKING_BACKUP, backup_settings.internal);
|
||||
|
||||
auto job = [this,
|
||||
backup_uuid,
|
||||
backup_query,
|
||||
backup_settings,
|
||||
backup_info,
|
||||
on_cluster,
|
||||
context_in_use,
|
||||
mutable_context](bool async) mutable
|
||||
catch (...)
|
||||
{
|
||||
std::optional<CurrentThread::QueryScope> query_scope;
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||
SCOPE_EXIT_SAFE(if (backup_coordination && !backup_settings.internal) backup_coordination->drop(););
|
||||
/// Something bad happened, the backup has not built.
|
||||
setStatus(backup_uuid, BackupStatus::FAILED_TO_BACKUP);
|
||||
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
|
||||
void BackupsWorker::doBackup(
|
||||
const UUID & backup_uuid,
|
||||
const std::shared_ptr<ASTBackupQuery> & backup_query,
|
||||
BackupSettings backup_settings,
|
||||
const BackupInfo & backup_info,
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination,
|
||||
const ContextPtr & context,
|
||||
ContextMutablePtr mutable_context,
|
||||
bool called_async)
|
||||
{
|
||||
std::optional<CurrentThread::QueryScope> query_scope;
|
||||
try
|
||||
{
|
||||
if (called_async)
|
||||
{
|
||||
if (async)
|
||||
{
|
||||
query_scope.emplace(mutable_context);
|
||||
setThreadName("BackupWorker");
|
||||
}
|
||||
|
||||
/// Checks access rights if this is not ON CLUSTER query.
|
||||
/// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.)
|
||||
auto required_access = getRequiredAccessToBackup(backup_query->elements);
|
||||
if (!on_cluster)
|
||||
context_in_use->checkAccess(required_access);
|
||||
|
||||
ClusterPtr cluster;
|
||||
if (on_cluster)
|
||||
{
|
||||
backup_query->cluster = context_in_use->getMacros()->expand(backup_query->cluster);
|
||||
cluster = context_in_use->getCluster(backup_query->cluster);
|
||||
backup_settings.cluster_host_ids = cluster->getHostIDs();
|
||||
if (backup_settings.coordination_zk_path.empty())
|
||||
{
|
||||
String root_zk_path = context_in_use->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
|
||||
backup_settings.coordination_zk_path = root_zk_path + "/backup-" + toString(backup_uuid);
|
||||
}
|
||||
}
|
||||
|
||||
/// Make a backup coordination.
|
||||
if (!backup_settings.coordination_zk_path.empty())
|
||||
{
|
||||
backup_coordination = std::make_shared<BackupCoordinationRemote>(
|
||||
backup_settings.coordination_zk_path,
|
||||
[global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); });
|
||||
}
|
||||
else
|
||||
{
|
||||
backup_coordination = std::make_shared<BackupCoordinationLocal>();
|
||||
}
|
||||
|
||||
/// Opens a backup for writing.
|
||||
BackupFactory::CreateParams backup_create_params;
|
||||
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
|
||||
backup_create_params.context = context_in_use;
|
||||
backup_create_params.backup_info = backup_info;
|
||||
backup_create_params.base_backup_info = backup_settings.base_backup_info;
|
||||
backup_create_params.compression_method = backup_settings.compression_method;
|
||||
backup_create_params.compression_level = backup_settings.compression_level;
|
||||
backup_create_params.password = backup_settings.password;
|
||||
backup_create_params.is_internal_backup = backup_settings.internal;
|
||||
backup_create_params.backup_coordination = backup_coordination;
|
||||
backup_create_params.backup_uuid = backup_uuid;
|
||||
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
|
||||
|
||||
/// Write the backup.
|
||||
if (on_cluster)
|
||||
{
|
||||
DDLQueryOnClusterParams params;
|
||||
params.cluster = cluster;
|
||||
params.only_shard_num = backup_settings.shard_num;
|
||||
params.only_replica_num = backup_settings.replica_num;
|
||||
params.access_to_check = required_access;
|
||||
backup_settings.copySettingsToQuery(*backup_query);
|
||||
|
||||
// executeDDLQueryOnCluster() will return without waiting for completion
|
||||
mutable_context->setSetting("distributed_ddl_task_timeout", Field{0});
|
||||
mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"});
|
||||
executeDDLQueryOnCluster(backup_query, mutable_context, params);
|
||||
|
||||
/// Wait until all the hosts have written their backup entries.
|
||||
auto all_hosts = BackupSettings::Util::filterHostIDs(
|
||||
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
|
||||
backup_coordination->waitStatus(all_hosts, kCompletedCoordinationStatus);
|
||||
}
|
||||
else
|
||||
{
|
||||
backup_query->setCurrentDatabase(context_in_use->getCurrentDatabase());
|
||||
|
||||
/// Prepare backup entries.
|
||||
BackupEntries backup_entries;
|
||||
{
|
||||
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context_in_use};
|
||||
backup_entries = backup_entries_collector.run();
|
||||
}
|
||||
|
||||
/// Write the backup entries to the backup.
|
||||
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
|
||||
|
||||
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
|
||||
backup_coordination->setStatus(backup_settings.host_id, kCompletedCoordinationStatus, "");
|
||||
}
|
||||
|
||||
/// Finalize backup (write its metadata).
|
||||
if (!backup_settings.internal)
|
||||
backup->finalizeWriting();
|
||||
|
||||
/// Close the backup.
|
||||
backup.reset();
|
||||
|
||||
setStatus(backup_uuid, BackupStatus::BACKUP_COMPLETE);
|
||||
query_scope.emplace(mutable_context);
|
||||
setThreadName("BackupWorker");
|
||||
}
|
||||
catch (...)
|
||||
|
||||
bool on_cluster = !backup_query->cluster.empty();
|
||||
assert(mutable_context || (!on_cluster && !called_async));
|
||||
|
||||
/// Checks access rights if this is not ON CLUSTER query.
|
||||
/// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.)
|
||||
auto required_access = getRequiredAccessToBackup(backup_query->elements);
|
||||
if (!on_cluster)
|
||||
context->checkAccess(required_access);
|
||||
|
||||
ClusterPtr cluster;
|
||||
if (on_cluster)
|
||||
{
|
||||
/// Something bad happened, the backup has not built.
|
||||
backup_query->cluster = context->getMacros()->expand(backup_query->cluster);
|
||||
cluster = context->getCluster(backup_query->cluster);
|
||||
backup_settings.cluster_host_ids = cluster->getHostIDs();
|
||||
if (backup_settings.coordination_zk_path.empty())
|
||||
{
|
||||
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
|
||||
backup_settings.coordination_zk_path = root_zk_path + "/backup-" + toString(backup_uuid);
|
||||
}
|
||||
}
|
||||
|
||||
/// Make a backup coordination.
|
||||
if (!backup_coordination)
|
||||
backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal);
|
||||
|
||||
/// Opens a backup for writing.
|
||||
BackupFactory::CreateParams backup_create_params;
|
||||
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
|
||||
backup_create_params.context = context;
|
||||
backup_create_params.backup_info = backup_info;
|
||||
backup_create_params.base_backup_info = backup_settings.base_backup_info;
|
||||
backup_create_params.compression_method = backup_settings.compression_method;
|
||||
backup_create_params.compression_level = backup_settings.compression_level;
|
||||
backup_create_params.password = backup_settings.password;
|
||||
backup_create_params.is_internal_backup = backup_settings.internal;
|
||||
backup_create_params.backup_coordination = backup_coordination;
|
||||
backup_create_params.backup_uuid = backup_uuid;
|
||||
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
|
||||
|
||||
/// Write the backup.
|
||||
if (on_cluster)
|
||||
{
|
||||
DDLQueryOnClusterParams params;
|
||||
params.cluster = cluster;
|
||||
params.only_shard_num = backup_settings.shard_num;
|
||||
params.only_replica_num = backup_settings.replica_num;
|
||||
params.access_to_check = required_access;
|
||||
backup_settings.copySettingsToQuery(*backup_query);
|
||||
|
||||
// executeDDLQueryOnCluster() will return without waiting for completion
|
||||
mutable_context->setSetting("distributed_ddl_task_timeout", Field{0});
|
||||
mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"});
|
||||
executeDDLQueryOnCluster(backup_query, mutable_context, params);
|
||||
|
||||
/// Wait until all the hosts have written their backup entries.
|
||||
auto all_hosts = BackupSettings::Util::filterHostIDs(
|
||||
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
|
||||
backup_coordination->waitForStage(all_hosts, kCompletedStage);
|
||||
}
|
||||
else
|
||||
{
|
||||
backup_query->setCurrentDatabase(context->getCurrentDatabase());
|
||||
|
||||
/// Prepare backup entries.
|
||||
BackupEntries backup_entries;
|
||||
{
|
||||
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context};
|
||||
backup_entries = backup_entries_collector.run();
|
||||
}
|
||||
|
||||
/// Write the backup entries to the backup.
|
||||
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool);
|
||||
|
||||
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
|
||||
backup_coordination->setStage(backup_settings.host_id, kCompletedStage, "");
|
||||
}
|
||||
|
||||
/// Finalize backup (write its metadata).
|
||||
if (!backup_settings.internal)
|
||||
backup->finalizeWriting();
|
||||
|
||||
/// Close the backup.
|
||||
backup.reset();
|
||||
|
||||
LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_info.toString());
|
||||
setStatus(backup_uuid, BackupStatus::BACKUP_COMPLETE);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Something bad happened, the backup has not built.
|
||||
if (called_async)
|
||||
{
|
||||
tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
|
||||
setStatus(backup_uuid, BackupStatus::FAILED_TO_BACKUP);
|
||||
sendErrorToCoordination(backup_coordination, backup_settings.host_id);
|
||||
if (!async)
|
||||
throw;
|
||||
sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id);
|
||||
}
|
||||
};
|
||||
|
||||
if (backup_settings.async)
|
||||
backups_thread_pool.scheduleOrThrowOnError([job]() mutable { job(true); });
|
||||
else
|
||||
job(false);
|
||||
|
||||
return backup_uuid;
|
||||
else
|
||||
{
|
||||
/// setStatus() and sendCurrentExceptionToCoordination() will be called by startMakingBackup().
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
UUID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context)
|
||||
{
|
||||
UUID restore_uuid = UUIDHelpers::generateV4();
|
||||
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
|
||||
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
|
||||
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
|
||||
bool on_cluster = !restore_query->cluster.empty();
|
||||
UUID restore_uuid = UUIDHelpers::generateV4();
|
||||
|
||||
/// Prepare context to use.
|
||||
ContextMutablePtr context_in_use = context;
|
||||
if (restore_settings.async || on_cluster)
|
||||
std::shared_ptr<IRestoreCoordination> restore_coordination;
|
||||
if (!restore_settings.coordination_zk_path.empty())
|
||||
restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal);
|
||||
|
||||
try
|
||||
{
|
||||
/// For ON CLUSTER queries we will need to change some settings.
|
||||
/// For ASYNC queries we have to clone the context anyway.
|
||||
context_in_use = Context::createCopy(context);
|
||||
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
|
||||
addInfo(restore_uuid, backup_info.toString(), BackupStatus::RESTORING, restore_settings.internal);
|
||||
|
||||
/// Prepare context to use.
|
||||
ContextMutablePtr context_in_use = context;
|
||||
bool on_cluster = !restore_query->cluster.empty();
|
||||
if (restore_settings.async || on_cluster)
|
||||
{
|
||||
/// For ON CLUSTER queries we will need to change some settings.
|
||||
/// For ASYNC queries we have to clone the context anyway.
|
||||
context_in_use = Context::createCopy(context);
|
||||
}
|
||||
|
||||
if (restore_settings.async)
|
||||
{
|
||||
backups_thread_pool.scheduleOrThrowOnError(
|
||||
[this, restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use]
|
||||
{ doRestore(restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use, true); });
|
||||
}
|
||||
else
|
||||
{
|
||||
doRestore(restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use, false);
|
||||
}
|
||||
|
||||
return restore_uuid;
|
||||
}
|
||||
|
||||
addInfo(restore_uuid, backup_info.toString(), BackupStatus::RESTORING, restore_settings.internal);
|
||||
|
||||
auto job = [this,
|
||||
restore_uuid,
|
||||
restore_query,
|
||||
restore_settings,
|
||||
backup_info,
|
||||
on_cluster,
|
||||
context_in_use](bool async) mutable
|
||||
catch (...)
|
||||
{
|
||||
std::optional<CurrentThread::QueryScope> query_scope;
|
||||
std::shared_ptr<IRestoreCoordination> restore_coordination;
|
||||
SCOPE_EXIT_SAFE(if (restore_coordination && !restore_settings.internal) restore_coordination->drop(););
|
||||
/// Something bad happened, the backup has not built.
|
||||
setStatus(restore_uuid, BackupStatus::FAILED_TO_RESTORE);
|
||||
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
|
||||
void BackupsWorker::doRestore(
|
||||
const UUID & restore_uuid,
|
||||
const std::shared_ptr<ASTBackupQuery> & restore_query,
|
||||
RestoreSettings restore_settings,
|
||||
const BackupInfo & backup_info,
|
||||
std::shared_ptr<IRestoreCoordination> restore_coordination,
|
||||
ContextMutablePtr context,
|
||||
bool called_async)
|
||||
{
|
||||
std::optional<CurrentThread::QueryScope> query_scope;
|
||||
try
|
||||
{
|
||||
if (called_async)
|
||||
{
|
||||
if (async)
|
||||
{
|
||||
query_scope.emplace(context_in_use);
|
||||
setThreadName("RestoreWorker");
|
||||
}
|
||||
|
||||
/// Open the backup for reading.
|
||||
BackupFactory::CreateParams backup_open_params;
|
||||
backup_open_params.open_mode = IBackup::OpenMode::READ;
|
||||
backup_open_params.context = context_in_use;
|
||||
backup_open_params.backup_info = backup_info;
|
||||
backup_open_params.base_backup_info = restore_settings.base_backup_info;
|
||||
backup_open_params.password = restore_settings.password;
|
||||
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
|
||||
|
||||
String current_database = context_in_use->getCurrentDatabase();
|
||||
|
||||
/// Checks access rights if this is ON CLUSTER query.
|
||||
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
|
||||
ClusterPtr cluster;
|
||||
if (on_cluster)
|
||||
{
|
||||
restore_query->cluster = context_in_use->getMacros()->expand(restore_query->cluster);
|
||||
cluster = context_in_use->getCluster(restore_query->cluster);
|
||||
restore_settings.cluster_host_ids = cluster->getHostIDs();
|
||||
|
||||
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
|
||||
/// because different replicas can contain different set of tables and so the required access rights can differ too.
|
||||
/// So the right way is pass through the entire cluster and check access for each host.
|
||||
auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num);
|
||||
for (const auto * address : addresses)
|
||||
{
|
||||
restore_settings.host_id = address->toString();
|
||||
auto restore_elements = restore_query->elements;
|
||||
String addr_database = address->default_database.empty() ? current_database : address->default_database;
|
||||
for (auto & element : restore_elements)
|
||||
element.setCurrentDatabase(addr_database);
|
||||
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context_in_use};
|
||||
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
|
||||
}
|
||||
}
|
||||
|
||||
/// Make a restore coordination.
|
||||
if (on_cluster && restore_settings.coordination_zk_path.empty())
|
||||
{
|
||||
String root_zk_path = context_in_use->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
|
||||
restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_uuid);
|
||||
}
|
||||
|
||||
if (!restore_settings.coordination_zk_path.empty())
|
||||
{
|
||||
restore_coordination = std::make_shared<RestoreCoordinationRemote>(
|
||||
restore_settings.coordination_zk_path,
|
||||
[global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); });
|
||||
}
|
||||
else
|
||||
{
|
||||
restore_coordination = std::make_shared<RestoreCoordinationLocal>();
|
||||
}
|
||||
|
||||
/// Do RESTORE.
|
||||
if (on_cluster)
|
||||
{
|
||||
|
||||
DDLQueryOnClusterParams params;
|
||||
params.cluster = cluster;
|
||||
params.only_shard_num = restore_settings.shard_num;
|
||||
params.only_replica_num = restore_settings.replica_num;
|
||||
restore_settings.copySettingsToQuery(*restore_query);
|
||||
|
||||
// executeDDLQueryOnCluster() will return without waiting for completion
|
||||
context_in_use->setSetting("distributed_ddl_task_timeout", Field{0});
|
||||
context_in_use->setSetting("distributed_ddl_output_mode", Field{"none"});
|
||||
|
||||
executeDDLQueryOnCluster(restore_query, context_in_use, params);
|
||||
|
||||
/// Wait until all the hosts have written their backup entries.
|
||||
auto all_hosts = BackupSettings::Util::filterHostIDs(
|
||||
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
|
||||
restore_coordination->waitStatus(all_hosts, kCompletedCoordinationStatus);
|
||||
}
|
||||
else
|
||||
{
|
||||
restore_query->setCurrentDatabase(current_database);
|
||||
|
||||
/// Restore metadata and prepare data restoring tasks.
|
||||
DataRestoreTasks data_restore_tasks;
|
||||
{
|
||||
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
|
||||
backup, context_in_use};
|
||||
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
|
||||
}
|
||||
|
||||
/// Execute the data restoring tasks.
|
||||
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);
|
||||
|
||||
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
|
||||
restore_coordination->setStatus(restore_settings.host_id, kCompletedCoordinationStatus, "");
|
||||
}
|
||||
|
||||
setStatus(restore_uuid, BackupStatus::RESTORED);
|
||||
query_scope.emplace(context);
|
||||
setThreadName("RestoreWorker");
|
||||
}
|
||||
catch (...)
|
||||
|
||||
/// Open the backup for reading.
|
||||
BackupFactory::CreateParams backup_open_params;
|
||||
backup_open_params.open_mode = IBackup::OpenMode::READ;
|
||||
backup_open_params.context = context;
|
||||
backup_open_params.backup_info = backup_info;
|
||||
backup_open_params.base_backup_info = restore_settings.base_backup_info;
|
||||
backup_open_params.password = restore_settings.password;
|
||||
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
|
||||
|
||||
String current_database = context->getCurrentDatabase();
|
||||
|
||||
/// Checks access rights if this is ON CLUSTER query.
|
||||
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
|
||||
ClusterPtr cluster;
|
||||
bool on_cluster = !restore_query->cluster.empty();
|
||||
if (on_cluster)
|
||||
{
|
||||
/// Something bad happened, the backup has not built.
|
||||
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
|
||||
cluster = context->getCluster(restore_query->cluster);
|
||||
restore_settings.cluster_host_ids = cluster->getHostIDs();
|
||||
|
||||
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
|
||||
/// because different replicas can contain different set of tables and so the required access rights can differ too.
|
||||
/// So the right way is pass through the entire cluster and check access for each host.
|
||||
auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num);
|
||||
for (const auto * address : addresses)
|
||||
{
|
||||
restore_settings.host_id = address->toString();
|
||||
auto restore_elements = restore_query->elements;
|
||||
String addr_database = address->default_database.empty() ? current_database : address->default_database;
|
||||
for (auto & element : restore_elements)
|
||||
element.setCurrentDatabase(addr_database);
|
||||
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context};
|
||||
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
|
||||
}
|
||||
}
|
||||
|
||||
/// Make a restore coordination.
|
||||
if (on_cluster && restore_settings.coordination_zk_path.empty())
|
||||
{
|
||||
String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups");
|
||||
restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_uuid);
|
||||
}
|
||||
|
||||
if (!restore_coordination)
|
||||
restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal);
|
||||
|
||||
/// Do RESTORE.
|
||||
if (on_cluster)
|
||||
{
|
||||
|
||||
DDLQueryOnClusterParams params;
|
||||
params.cluster = cluster;
|
||||
params.only_shard_num = restore_settings.shard_num;
|
||||
params.only_replica_num = restore_settings.replica_num;
|
||||
restore_settings.copySettingsToQuery(*restore_query);
|
||||
|
||||
// executeDDLQueryOnCluster() will return without waiting for completion
|
||||
context->setSetting("distributed_ddl_task_timeout", Field{0});
|
||||
context->setSetting("distributed_ddl_output_mode", Field{"none"});
|
||||
|
||||
executeDDLQueryOnCluster(restore_query, context, params);
|
||||
|
||||
/// Wait until all the hosts have written their backup entries.
|
||||
auto all_hosts = BackupSettings::Util::filterHostIDs(
|
||||
restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num);
|
||||
restore_coordination->waitForStage(all_hosts, kCompletedStage);
|
||||
}
|
||||
else
|
||||
{
|
||||
restore_query->setCurrentDatabase(current_database);
|
||||
|
||||
/// Restore metadata and prepare data restoring tasks.
|
||||
DataRestoreTasks data_restore_tasks;
|
||||
{
|
||||
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
|
||||
backup, context};
|
||||
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
|
||||
}
|
||||
|
||||
/// Execute the data restoring tasks.
|
||||
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool);
|
||||
|
||||
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
|
||||
restore_coordination->setStage(restore_settings.host_id, kCompletedStage, "");
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_info.toString());
|
||||
setStatus(restore_uuid, BackupStatus::RESTORED);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Something bad happened, the backup has not built.
|
||||
if (called_async)
|
||||
{
|
||||
tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_info.toString()));
|
||||
setStatus(restore_uuid, BackupStatus::FAILED_TO_RESTORE);
|
||||
sendErrorToCoordination(restore_coordination, restore_settings.host_id);
|
||||
if (!async)
|
||||
throw;
|
||||
sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id);
|
||||
}
|
||||
};
|
||||
|
||||
if (restore_settings.async)
|
||||
backups_thread_pool.scheduleOrThrowOnError([job]() mutable { job(true); });
|
||||
else
|
||||
job(false);
|
||||
|
||||
return restore_uuid;
|
||||
else
|
||||
{
|
||||
/// setStatus() and sendCurrentExceptionToCoordination() will be called by startRestoring().
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -387,37 +488,28 @@ void BackupsWorker::addInfo(const UUID & uuid, const String & backup_name, Backu
|
||||
info.status = status;
|
||||
info.status_changed_time = time(nullptr);
|
||||
info.internal = internal;
|
||||
|
||||
std::lock_guard lock{infos_mutex};
|
||||
infos[uuid] = std::move(info);
|
||||
|
||||
num_active_backups += getNumActiveBackupsChange(status);
|
||||
num_active_restores += getNumActiveRestoresChange(status);
|
||||
}
|
||||
|
||||
|
||||
void BackupsWorker::setStatus(const UUID & uuid, BackupStatus status)
|
||||
{
|
||||
std::lock_guard lock{infos_mutex};
|
||||
auto & info = infos.at(uuid);
|
||||
auto it = infos.find(uuid);
|
||||
if (it == infos.end())
|
||||
return;
|
||||
|
||||
auto & info = it->second;
|
||||
auto old_status = info.status;
|
||||
info.status = status;
|
||||
info.status_changed_time = time(nullptr);
|
||||
|
||||
if (status == BackupStatus::BACKUP_COMPLETE)
|
||||
{
|
||||
LOG_INFO(log, "{} {} was created successfully", (info.internal ? "Internal backup" : "Backup"), info.backup_name);
|
||||
}
|
||||
else if (status == BackupStatus::RESTORED)
|
||||
{
|
||||
LOG_INFO(log, "Restored from {} {} successfully", (info.internal ? "internal backup" : "backup"), info.backup_name);
|
||||
}
|
||||
else if ((status == BackupStatus::FAILED_TO_BACKUP) || (status == BackupStatus::FAILED_TO_RESTORE))
|
||||
{
|
||||
String start_of_message;
|
||||
if (status == BackupStatus::FAILED_TO_BACKUP)
|
||||
start_of_message = fmt::format("Failed to create {} {}", (info.internal ? "internal backup" : "backup"), info.backup_name);
|
||||
else
|
||||
start_of_message = fmt::format("Failed to restore from {} {}", (info.internal ? "internal backup" : "backup"), info.backup_name);
|
||||
tryLogCurrentException(log, start_of_message);
|
||||
|
||||
info.error_message = getCurrentExceptionMessage(false);
|
||||
info.exception = std::current_exception();
|
||||
}
|
||||
num_active_backups += getNumActiveBackupsChange(status) - getNumActiveBackupsChange(old_status);
|
||||
num_active_restores += getNumActiveRestoresChange(status) - getNumActiveRestoresChange(old_status);
|
||||
}
|
||||
|
||||
|
||||
@ -428,7 +520,7 @@ void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool rethrow_excep
|
||||
{
|
||||
auto it = infos.find(backup_or_restore_uuid);
|
||||
if (it == infos.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "BackupsWorker: Unknown UUID {}", toString(backup_or_restore_uuid));
|
||||
return true;
|
||||
const auto & info = it->second;
|
||||
auto current_status = info.status;
|
||||
if (rethrow_exception && ((current_status == BackupStatus::FAILED_TO_BACKUP) || (current_status == BackupStatus::FAILED_TO_RESTORE)))
|
||||
@ -437,12 +529,12 @@ void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool rethrow_excep
|
||||
});
|
||||
}
|
||||
|
||||
BackupsWorker::Info BackupsWorker::getInfo(const UUID & backup_or_restore_uuid) const
|
||||
std::optional<BackupsWorker::Info> BackupsWorker::tryGetInfo(const UUID & backup_or_restore_uuid) const
|
||||
{
|
||||
std::lock_guard lock{infos_mutex};
|
||||
auto it = infos.find(backup_or_restore_uuid);
|
||||
if (it == infos.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "BackupsWorker: Unknown UUID {}", toString(backup_or_restore_uuid));
|
||||
return std::nullopt;
|
||||
return it->second;
|
||||
}
|
||||
|
||||
@ -457,14 +549,15 @@ std::vector<BackupsWorker::Info> BackupsWorker::getAllInfos() const
|
||||
|
||||
void BackupsWorker::shutdown()
|
||||
{
|
||||
size_t num_active_backups = backups_thread_pool.active();
|
||||
size_t num_active_restores = restores_thread_pool.active();
|
||||
if (!num_active_backups && !num_active_restores)
|
||||
return;
|
||||
LOG_INFO(log, "Waiting for {} backup and {} restore tasks to be finished", num_active_backups, num_active_restores);
|
||||
bool has_active_backups_or_restores = (num_active_backups || num_active_restores);
|
||||
if (has_active_backups_or_restores)
|
||||
LOG_INFO(log, "Waiting for {} backups and {} restores to be finished", num_active_backups, num_active_restores);
|
||||
|
||||
backups_thread_pool.wait();
|
||||
restores_thread_pool.wait();
|
||||
LOG_INFO(log, "All backup and restore tasks have finished");
|
||||
|
||||
if (has_active_backups_or_restores)
|
||||
LOG_INFO(log, "All backup and restore tasks have finished");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -11,6 +11,13 @@ namespace Poco::Util { class AbstractConfiguration; }
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ASTBackupQuery;
|
||||
struct BackupSettings;
|
||||
struct RestoreSettings;
|
||||
struct BackupInfo;
|
||||
class IBackupCoordination;
|
||||
class IRestoreCoordination;
|
||||
|
||||
/// Manager of backups and restores: executes backups and restores' threads in the background.
|
||||
/// Keeps information about backups and restores started in this session.
|
||||
class BackupsWorker
|
||||
@ -47,12 +54,21 @@ public:
|
||||
bool internal = false;
|
||||
};
|
||||
|
||||
Info getInfo(const UUID & backup_or_restore_uuid) const;
|
||||
std::optional<Info> tryGetInfo(const UUID & backup_or_restore_uuid) const;
|
||||
std::vector<Info> getAllInfos() const;
|
||||
|
||||
private:
|
||||
UUID startMakingBackup(const ASTPtr & query, const ContextPtr & context);
|
||||
|
||||
void doBackup(const UUID & backup_uuid, const std::shared_ptr<ASTBackupQuery> & backup_query, BackupSettings backup_settings,
|
||||
const BackupInfo & backup_info, std::shared_ptr<IBackupCoordination> backup_coordination, const ContextPtr & context,
|
||||
ContextMutablePtr mutable_context, bool called_async);
|
||||
|
||||
UUID startRestoring(const ASTPtr & query, ContextMutablePtr context);
|
||||
|
||||
void doRestore(const UUID & restore_uuid, const std::shared_ptr<ASTBackupQuery> & restore_query, RestoreSettings restore_settings,
|
||||
const BackupInfo & backup_info, std::shared_ptr<IRestoreCoordination> restore_coordination, ContextMutablePtr context,
|
||||
bool called_async);
|
||||
|
||||
void addInfo(const UUID & uuid, const String & backup_name, BackupStatus status, bool internal);
|
||||
void setStatus(const UUID & uuid, BackupStatus status);
|
||||
@ -62,6 +78,8 @@ private:
|
||||
|
||||
std::unordered_map<UUID, Info> infos;
|
||||
std::condition_variable status_changed;
|
||||
std::atomic<size_t> num_active_backups = 0;
|
||||
std::atomic<size_t> num_active_restores = 0;
|
||||
mutable std::mutex infos_mutex;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
@ -18,11 +18,11 @@ class IBackupCoordination
|
||||
public:
|
||||
virtual ~IBackupCoordination() = default;
|
||||
|
||||
/// Sets the current status and waits for other hosts to come to this status too.
|
||||
virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0;
|
||||
virtual void setErrorStatus(const String & current_host, const Exception & exception) = 0;
|
||||
virtual Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) = 0;
|
||||
virtual Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) = 0;
|
||||
/// Sets the current stage and waits for other hosts to come to this stage too.
|
||||
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
|
||||
virtual void setError(const String & current_host, const Exception & exception) = 0;
|
||||
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
|
||||
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
|
||||
|
||||
struct PartNameAndChecksum
|
||||
{
|
||||
@ -115,9 +115,6 @@ public:
|
||||
|
||||
/// Returns the list of all the archive suffixes which were generated.
|
||||
virtual Strings getAllArchiveSuffixes() const = 0;
|
||||
|
||||
/// Removes remotely stored information.
|
||||
virtual void drop() {}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -16,11 +16,11 @@ class IRestoreCoordination
|
||||
public:
|
||||
virtual ~IRestoreCoordination() = default;
|
||||
|
||||
/// Sets the current status and waits for other hosts to come to this status too.
|
||||
virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0;
|
||||
virtual void setErrorStatus(const String & current_host, const Exception & exception) = 0;
|
||||
virtual Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) = 0;
|
||||
virtual Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) = 0;
|
||||
/// Sets the current stage and waits for other hosts to come to this stage too.
|
||||
virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0;
|
||||
virtual void setError(const String & current_host, const Exception & exception) = 0;
|
||||
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0;
|
||||
virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0;
|
||||
|
||||
static constexpr const char * kErrorStatus = "error";
|
||||
|
||||
@ -34,9 +34,6 @@ public:
|
||||
/// Sets that this replica is going to restore a ReplicatedAccessStorage.
|
||||
/// The function returns false if this access storage is being already restored by another replica.
|
||||
virtual bool acquireReplicatedAccessStorage(const String & access_storage_zk_path) = 0;
|
||||
|
||||
/// Removes remotely stored information.
|
||||
virtual void drop() {}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -7,20 +7,20 @@ namespace DB
|
||||
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
|
||||
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
|
||||
|
||||
void RestoreCoordinationLocal::setStatus(const String &, const String &, const String &)
|
||||
void RestoreCoordinationLocal::setStage(const String &, const String &, const String &)
|
||||
{
|
||||
}
|
||||
|
||||
void RestoreCoordinationLocal::setErrorStatus(const String &, const Exception &)
|
||||
void RestoreCoordinationLocal::setError(const String &, const Exception &)
|
||||
{
|
||||
}
|
||||
|
||||
Strings RestoreCoordinationLocal::waitStatus(const Strings &, const String &)
|
||||
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
Strings RestoreCoordinationLocal::waitStatusFor(const Strings &, const String &, UInt64)
|
||||
Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
@ -18,11 +18,11 @@ public:
|
||||
RestoreCoordinationLocal();
|
||||
~RestoreCoordinationLocal() override;
|
||||
|
||||
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
|
||||
void setStatus(const String & current_host, const String & new_status, const String & message) override;
|
||||
void setErrorStatus(const String & current_host, const Exception & exception) override;
|
||||
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
|
||||
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
|
||||
/// Sets the current stage and waits for other hosts to come to this stage too.
|
||||
void setStage(const String & current_host, const String & new_stage, const String & message) override;
|
||||
void setError(const String & current_host, const Exception & exception) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
|
||||
|
||||
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
|
||||
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
|
||||
|
@ -6,15 +6,27 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
RestoreCoordinationRemote::RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
|
||||
RestoreCoordinationRemote::RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_)
|
||||
: zookeeper_path(zookeeper_path_)
|
||||
, get_zookeeper(get_zookeeper_)
|
||||
, status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("RestoreCoordination"))
|
||||
, remove_zk_nodes_in_destructor(remove_zk_nodes_in_destructor_)
|
||||
, stage_sync(zookeeper_path_ + "/stage", get_zookeeper_, &Poco::Logger::get("RestoreCoordination"))
|
||||
{
|
||||
createRootNodes();
|
||||
}
|
||||
|
||||
RestoreCoordinationRemote::~RestoreCoordinationRemote() = default;
|
||||
RestoreCoordinationRemote::~RestoreCoordinationRemote()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (remove_zk_nodes_in_destructor)
|
||||
removeAllNodes();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void RestoreCoordinationRemote::createRootNodes()
|
||||
{
|
||||
@ -27,24 +39,24 @@ void RestoreCoordinationRemote::createRootNodes()
|
||||
}
|
||||
|
||||
|
||||
void RestoreCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message)
|
||||
void RestoreCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message)
|
||||
{
|
||||
status_sync.set(current_host, new_status, message);
|
||||
stage_sync.set(current_host, new_stage, message);
|
||||
}
|
||||
|
||||
void RestoreCoordinationRemote::setErrorStatus(const String & current_host, const Exception & exception)
|
||||
void RestoreCoordinationRemote::setError(const String & current_host, const Exception & exception)
|
||||
{
|
||||
status_sync.setError(current_host, exception);
|
||||
stage_sync.setError(current_host, exception);
|
||||
}
|
||||
|
||||
Strings RestoreCoordinationRemote::waitStatus(const Strings & all_hosts, const String & status_to_wait)
|
||||
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait)
|
||||
{
|
||||
return status_sync.wait(all_hosts, status_to_wait);
|
||||
return stage_sync.wait(all_hosts, stage_to_wait);
|
||||
}
|
||||
|
||||
Strings RestoreCoordinationRemote::waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms)
|
||||
Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout)
|
||||
{
|
||||
return status_sync.waitFor(all_hosts, status_to_wait, timeout_ms);
|
||||
return stage_sync.waitFor(all_hosts, stage_to_wait, timeout);
|
||||
}
|
||||
|
||||
|
||||
@ -93,9 +105,4 @@ void RestoreCoordinationRemote::removeAllNodes()
|
||||
zookeeper->removeRecursive(zookeeper_path);
|
||||
}
|
||||
|
||||
void RestoreCoordinationRemote::drop()
|
||||
{
|
||||
removeAllNodes();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Backups/IRestoreCoordination.h>
|
||||
#include <Backups/BackupCoordinationStatusSync.h>
|
||||
#include <Backups/BackupCoordinationStageSync.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -11,14 +11,14 @@ namespace DB
|
||||
class RestoreCoordinationRemote : public IRestoreCoordination
|
||||
{
|
||||
public:
|
||||
RestoreCoordinationRemote(const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper);
|
||||
RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_);
|
||||
~RestoreCoordinationRemote() override;
|
||||
|
||||
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
|
||||
void setStatus(const String & current_host, const String & new_status, const String & message) override;
|
||||
void setErrorStatus(const String & current_host, const Exception & exception) override;
|
||||
Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override;
|
||||
Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override;
|
||||
/// Sets the current stage and waits for other hosts to come to this stage too.
|
||||
void setStage(const String & current_host, const String & new_stage, const String & message) override;
|
||||
void setError(const String & current_host, const Exception & exception) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override;
|
||||
Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override;
|
||||
|
||||
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
|
||||
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
|
||||
@ -31,9 +31,6 @@ public:
|
||||
/// The function returns false if this access storage is being already restored by another replica.
|
||||
bool acquireReplicatedAccessStorage(const String & access_storage_zk_path) override;
|
||||
|
||||
/// Removes remotely stored information.
|
||||
void drop() override;
|
||||
|
||||
private:
|
||||
void createRootNodes();
|
||||
void removeAllNodes();
|
||||
@ -42,7 +39,9 @@ private:
|
||||
|
||||
const String zookeeper_path;
|
||||
const zkutil::GetZooKeeper get_zookeeper;
|
||||
BackupCoordinationStatusSync status_sync;
|
||||
const bool remove_zk_nodes_in_destructor;
|
||||
|
||||
BackupCoordinationStageSync stage_sync;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -41,16 +41,16 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
/// Finding databases and tables in the backup which we're going to restore.
|
||||
constexpr const char * kFindingTablesInBackupStatus = "finding tables in backup";
|
||||
constexpr const char * kFindingTablesInBackupStage = "finding tables in backup";
|
||||
|
||||
/// Creating databases or finding them and checking their definitions.
|
||||
constexpr const char * kCreatingDatabasesStatus = "creating databases";
|
||||
constexpr const char * kCreatingDatabasesStage = "creating databases";
|
||||
|
||||
/// Creating tables or finding them and checking their definition.
|
||||
constexpr const char * kCreatingTablesStatus = "creating tables";
|
||||
constexpr const char * kCreatingTablesStage = "creating tables";
|
||||
|
||||
/// Inserting restored data to tables.
|
||||
constexpr const char * kInsertingDataToTablesStatus = "inserting data to tables";
|
||||
constexpr const char * kInsertingDataToTablesStage = "inserting data to tables";
|
||||
|
||||
/// Uppercases the first character of a passed string.
|
||||
String toUpperFirst(const String & str)
|
||||
@ -102,6 +102,7 @@ RestorerFromBackup::RestorerFromBackup(
|
||||
, restore_coordination(restore_coordination_)
|
||||
, backup(backup_)
|
||||
, context(context_)
|
||||
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
|
||||
, create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
|
||||
, log(&Poco::Logger::get("RestorerFromBackup"))
|
||||
{
|
||||
@ -112,7 +113,7 @@ RestorerFromBackup::~RestorerFromBackup() = default;
|
||||
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
|
||||
{
|
||||
/// run() can be called onle once.
|
||||
if (!current_status.empty())
|
||||
if (!current_stage.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Already restoring");
|
||||
|
||||
/// Find other hosts working along with us to execute this ON CLUSTER query.
|
||||
@ -126,7 +127,7 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
|
||||
findRootPathsInBackup();
|
||||
|
||||
/// Find all the databases and tables which we will read from the backup.
|
||||
setStatus(kFindingTablesInBackupStatus);
|
||||
setStage(kFindingTablesInBackupStage);
|
||||
findDatabasesAndTablesInBackup();
|
||||
|
||||
/// Check access rights.
|
||||
@ -136,27 +137,31 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
|
||||
return {};
|
||||
|
||||
/// Create databases using the create queries read from the backup.
|
||||
setStatus(kCreatingDatabasesStatus);
|
||||
setStage(kCreatingDatabasesStage);
|
||||
createDatabases();
|
||||
|
||||
/// Create tables using the create queries read from the backup.
|
||||
setStatus(kCreatingTablesStatus);
|
||||
setStage(kCreatingTablesStage);
|
||||
createTables();
|
||||
|
||||
/// All what's left is to insert data to tables.
|
||||
/// No more data restoring tasks are allowed after this point.
|
||||
setStatus(kInsertingDataToTablesStatus);
|
||||
setStage(kInsertingDataToTablesStage);
|
||||
return getDataRestoreTasks();
|
||||
}
|
||||
|
||||
void RestorerFromBackup::setStatus(const String & new_status, const String & message)
|
||||
void RestorerFromBackup::setStage(const String & new_stage, const String & message)
|
||||
{
|
||||
LOG_TRACE(log, "{}", toUpperFirst(new_status));
|
||||
current_status = new_status;
|
||||
LOG_TRACE(log, "{}", toUpperFirst(new_stage));
|
||||
current_stage = new_stage;
|
||||
|
||||
if (restore_coordination)
|
||||
{
|
||||
restore_coordination->setStatus(restore_settings.host_id, new_status, message);
|
||||
restore_coordination->waitStatus(all_hosts, new_status);
|
||||
restore_coordination->setStage(restore_settings.host_id, new_stage, message);
|
||||
if (new_stage == kFindingTablesInBackupStage)
|
||||
restore_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout);
|
||||
else
|
||||
restore_coordination->waitForStage(all_hosts, new_stage);
|
||||
}
|
||||
}
|
||||
|
||||
@ -814,14 +819,14 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
|
||||
|
||||
void RestorerFromBackup::addDataRestoreTask(DataRestoreTask && new_task)
|
||||
{
|
||||
if (current_status == kInsertingDataToTablesStatus)
|
||||
if (current_stage == kInsertingDataToTablesStage)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
|
||||
data_restore_tasks.push_back(std::move(new_task));
|
||||
}
|
||||
|
||||
void RestorerFromBackup::addDataRestoreTasks(DataRestoreTasks && new_tasks)
|
||||
{
|
||||
if (current_status == kInsertingDataToTablesStatus)
|
||||
if (current_stage == kInsertingDataToTablesStage)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
|
||||
insertAtEnd(data_restore_tasks, std::move(new_tasks));
|
||||
}
|
||||
|
@ -73,6 +73,7 @@ private:
|
||||
std::shared_ptr<IRestoreCoordination> restore_coordination;
|
||||
BackupPtr backup;
|
||||
ContextMutablePtr context;
|
||||
std::chrono::milliseconds on_cluster_first_sync_timeout;
|
||||
std::chrono::milliseconds create_table_timeout;
|
||||
Poco::Logger * log;
|
||||
|
||||
@ -100,7 +101,7 @@ private:
|
||||
|
||||
DataRestoreTasks getDataRestoreTasks();
|
||||
|
||||
void setStatus(const String & new_status, const String & message = "");
|
||||
void setStage(const String & new_stage, const String & message = "");
|
||||
|
||||
struct DatabaseInfo
|
||||
{
|
||||
@ -124,7 +125,7 @@ private:
|
||||
|
||||
std::vector<QualifiedTableName> findTablesWithoutDependencies() const;
|
||||
|
||||
String current_status;
|
||||
String current_stage;
|
||||
std::unordered_map<String, DatabaseInfo> database_infos;
|
||||
std::map<QualifiedTableName, TableInfo> table_infos;
|
||||
std::vector<DataRestoreTask> data_restore_tasks;
|
||||
|
@ -17,20 +17,22 @@ namespace DB
|
||||
|
||||
namespace
|
||||
{
|
||||
Block getResultRow(const BackupsWorker::Info & info)
|
||||
Block getResultRow(const std::optional<BackupsWorker::Info> & info)
|
||||
{
|
||||
Block res_columns;
|
||||
|
||||
auto column_uuid = ColumnUUID::create();
|
||||
column_uuid->insert(info.uuid);
|
||||
res_columns.insert(0, {std::move(column_uuid), std::make_shared<DataTypeUUID>(), "uuid"});
|
||||
|
||||
auto column_backup_name = ColumnString::create();
|
||||
column_backup_name->insert(info.backup_name);
|
||||
res_columns.insert(1, {std::move(column_backup_name), std::make_shared<DataTypeString>(), "backup_name"});
|
||||
|
||||
auto column_status = ColumnInt8::create();
|
||||
column_status->insert(static_cast<Int8>(info.status));
|
||||
|
||||
if (info)
|
||||
{
|
||||
column_uuid->insert(info->uuid);
|
||||
column_backup_name->insert(info->backup_name);
|
||||
column_status->insert(static_cast<Int8>(info->status));
|
||||
}
|
||||
|
||||
Block res_columns;
|
||||
res_columns.insert(0, {std::move(column_uuid), std::make_shared<DataTypeUUID>(), "uuid"});
|
||||
res_columns.insert(1, {std::move(column_backup_name), std::make_shared<DataTypeString>(), "backup_name"});
|
||||
res_columns.insert(2, {std::move(column_status), std::make_shared<DataTypeEnum8>(getBackupStatusEnumValues()), "status"});
|
||||
|
||||
return res_columns;
|
||||
@ -42,7 +44,7 @@ BlockIO InterpreterBackupQuery::execute()
|
||||
auto & backups_worker = context->getBackupsWorker();
|
||||
UUID uuid = backups_worker.start(query_ptr, context);
|
||||
BlockIO res_io;
|
||||
res_io.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(getResultRow(backups_worker.getInfo(uuid))));
|
||||
res_io.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(getResultRow(backups_worker.tryGetInfo(uuid))));
|
||||
return res_io;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user