diff --git a/src/Backups/BackupCoordinationLocal.cpp b/src/Backups/BackupCoordinationLocal.cpp index 417b84c6b5f..d4064902a40 100644 --- a/src/Backups/BackupCoordinationLocal.cpp +++ b/src/Backups/BackupCoordinationLocal.cpp @@ -13,20 +13,20 @@ using FileInfo = IBackupCoordination::FileInfo; BackupCoordinationLocal::BackupCoordinationLocal() = default; BackupCoordinationLocal::~BackupCoordinationLocal() = default; -void BackupCoordinationLocal::setStatus(const String &, const String &, const String &) +void BackupCoordinationLocal::setStage(const String &, const String &, const String &) { } -void BackupCoordinationLocal::setErrorStatus(const String &, const Exception &) +void BackupCoordinationLocal::setError(const String &, const Exception &) { } -Strings BackupCoordinationLocal::waitStatus(const Strings &, const String &) +Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &) { return {}; } -Strings BackupCoordinationLocal::waitStatusFor(const Strings &, const String &, UInt64) +Strings BackupCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds) { return {}; } diff --git a/src/Backups/BackupCoordinationLocal.h b/src/Backups/BackupCoordinationLocal.h index 8cf5fba5c5c..aca7f71545b 100644 --- a/src/Backups/BackupCoordinationLocal.h +++ b/src/Backups/BackupCoordinationLocal.h @@ -20,10 +20,10 @@ public: BackupCoordinationLocal(); ~BackupCoordinationLocal() override; - void setStatus(const String & current_host, const String & new_status, const String & message) override; - void setErrorStatus(const String & current_host, const Exception & exception) override; - Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override; - Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override; + void setStage(const String & current_host, const String & new_stage, const String & message) override; + void setError(const String & current_host, const Exception & exception) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector & part_names_and_checksums) override; diff --git a/src/Backups/BackupCoordinationRemote.cpp b/src/Backups/BackupCoordinationRemote.cpp index a180358e088..bac99b0da2d 100644 --- a/src/Backups/BackupCoordinationRemote.cpp +++ b/src/Backups/BackupCoordinationRemote.cpp @@ -165,15 +165,28 @@ namespace constexpr size_t NUM_ATTEMPTS = 10; } -BackupCoordinationRemote::BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_) +BackupCoordinationRemote::BackupCoordinationRemote( + const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_) : zookeeper_path(zookeeper_path_) , get_zookeeper(get_zookeeper_) - , status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("BackupCoordination")) + , remove_zk_nodes_in_destructor(remove_zk_nodes_in_destructor_) + , stage_sync(zookeeper_path_ + "/stage", get_zookeeper_, &Poco::Logger::get("BackupCoordination")) { createRootNodes(); } -BackupCoordinationRemote::~BackupCoordinationRemote() = default; +BackupCoordinationRemote::~BackupCoordinationRemote() +{ + try + { + if (remove_zk_nodes_in_destructor) + removeAllNodes(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} void BackupCoordinationRemote::createRootNodes() { @@ -196,24 +209,24 @@ void BackupCoordinationRemote::removeAllNodes() } -void BackupCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message) +void BackupCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message) { - status_sync.set(current_host, new_status, message); + stage_sync.set(current_host, new_stage, message); } -void BackupCoordinationRemote::setErrorStatus(const String & current_host, const Exception & exception) +void BackupCoordinationRemote::setError(const String & current_host, const Exception & exception) { - status_sync.setError(current_host, exception); + stage_sync.setError(current_host, exception); } -Strings BackupCoordinationRemote::waitStatus(const Strings & all_hosts, const String & status_to_wait) +Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait) { - return status_sync.wait(all_hosts, status_to_wait); + return stage_sync.wait(all_hosts, stage_to_wait); } -Strings BackupCoordinationRemote::waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) +Strings BackupCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) { - return status_sync.waitFor(all_hosts, status_to_wait, timeout_ms); + return stage_sync.waitFor(all_hosts, stage_to_wait, timeout); } @@ -565,9 +578,4 @@ Strings BackupCoordinationRemote::getAllArchiveSuffixes() const return node_names; } -void BackupCoordinationRemote::drop() -{ - removeAllNodes(); -} - } diff --git a/src/Backups/BackupCoordinationRemote.h b/src/Backups/BackupCoordinationRemote.h index 349d04c7d87..d1d206683fa 100644 --- a/src/Backups/BackupCoordinationRemote.h +++ b/src/Backups/BackupCoordinationRemote.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include namespace DB @@ -13,13 +13,13 @@ namespace DB class BackupCoordinationRemote : public IBackupCoordination { public: - BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_); + BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_); ~BackupCoordinationRemote() override; - void setStatus(const String & current_host, const String & new_status, const String & message) override; - void setErrorStatus(const String & current_host, const Exception & exception) override; - Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override; - Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override; + void setStage(const String & current_host, const String & new_stage, const String & message) override; + void setError(const String & current_host, const Exception & exception) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; void addReplicatedPartNames( const String & table_shared_id, @@ -56,8 +56,6 @@ public: String getNextArchiveSuffix() override; Strings getAllArchiveSuffixes() const override; - void drop() override; - private: void createRootNodes(); void removeAllNodes(); @@ -66,8 +64,9 @@ private: const String zookeeper_path; const zkutil::GetZooKeeper get_zookeeper; + const bool remove_zk_nodes_in_destructor; - BackupCoordinationStatusSync status_sync; + BackupCoordinationStageSync stage_sync; mutable std::mutex mutex; mutable std::optional replicated_tables; diff --git a/src/Backups/BackupCoordinationStageSync.cpp b/src/Backups/BackupCoordinationStageSync.cpp new file mode 100644 index 00000000000..5524029bbf2 --- /dev/null +++ b/src/Backups/BackupCoordinationStageSync.cpp @@ -0,0 +1,228 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE; +} + + +BackupCoordinationStageSync::BackupCoordinationStageSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_) + : zookeeper_path(zookeeper_path_) + , get_zookeeper(get_zookeeper_) + , log(log_) +{ + createRootNodes(); +} + +void BackupCoordinationStageSync::createRootNodes() +{ + auto zookeeper = get_zookeeper(); + zookeeper->createAncestors(zookeeper_path); + zookeeper->createIfNotExists(zookeeper_path, ""); +} + +void BackupCoordinationStageSync::set(const String & current_host, const String & new_stage, const String & message) +{ + auto zookeeper = get_zookeeper(); + + /// Make an ephemeral node so the initiator can track if the current host is still working. + String alive_node_path = zookeeper_path + "/alive|" + current_host; + auto code = zookeeper->tryCreate(alive_node_path, "", zkutil::CreateMode::Ephemeral); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) + throw zkutil::KeeperException(code, alive_node_path); + + zookeeper->createIfNotExists(zookeeper_path + "/started|" + current_host, ""); + zookeeper->createIfNotExists(zookeeper_path + "/current|" + current_host + "|" + new_stage, message); +} + +void BackupCoordinationStageSync::setError(const String & current_host, const Exception & exception) +{ + auto zookeeper = get_zookeeper(); + WriteBufferFromOwnString buf; + writeStringBinary(current_host, buf); + writeException(exception, buf, true); + zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str()); +} + +Strings BackupCoordinationStageSync::wait(const Strings & all_hosts, const String & stage_to_wait) +{ + return waitImpl(all_hosts, stage_to_wait, {}); +} + +Strings BackupCoordinationStageSync::waitFor(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) +{ + return waitImpl(all_hosts, stage_to_wait, timeout); +} + +namespace +{ + struct UnreadyHostState + { + bool started = false; + bool alive = false; + }; +} + +struct BackupCoordinationStageSync::State +{ + Strings results; + std::map unready_hosts; + std::optional> error; + std::optional host_terminated; +}; + +BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState( + zkutil::ZooKeeperPtr zookeeper, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const +{ + std::unordered_set zk_nodes_set{zk_nodes.begin(), zk_nodes.end()}; + + State state; + if (zk_nodes_set.contains("error")) + { + ReadBufferFromOwnString buf{zookeeper->get(zookeeper_path + "/error")}; + String host; + readStringBinary(host, buf); + state.error = std::make_pair(host, readException(buf, fmt::format("Got error from {}", host))); + return state; + } + + for (const auto & host : all_hosts) + { + if (!zk_nodes_set.contains("current|" + host + "|" + stage_to_wait)) + { + UnreadyHostState unready_host_state; + unready_host_state.started = zk_nodes_set.contains("started|" + host); + unready_host_state.alive = zk_nodes_set.contains("alive|" + host); + state.unready_hosts.emplace(host, unready_host_state); + if (!unready_host_state.alive && unready_host_state.started && !state.host_terminated) + state.host_terminated = host; + } + } + + if (state.host_terminated || !state.unready_hosts.empty()) + return state; + + state.results.reserve(all_hosts.size()); + for (const auto & host : all_hosts) + state.results.emplace_back(zookeeper->get(zookeeper_path + "/current|" + host + "|" + stage_to_wait)); + + return state; +} + +Strings BackupCoordinationStageSync::waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional timeout) const +{ + if (all_hosts.empty()) + return {}; + + /// Wait until all hosts are ready or an error happens or time is out. + + auto zookeeper = get_zookeeper(); + + struct Watch + { + std::mutex mutex; + std::condition_variable event; + bool zk_nodes_changed = false; + bool watch_set = false; + }; + + /// shared_ptr because `watch_callback` can be called by ZooKeeper after leaving this function's scope. + auto watch = std::make_shared(); + + /// Called by ZooKepper when list of zk nodes have changed. + auto watch_callback = [watch](const Coordination::WatchResponse &) + { + std::lock_guard lock{watch->mutex}; + watch->zk_nodes_changed = true; + watch->watch_set = false; /// When it's triggered ZooKeeper resets the watch so we need to call getChildrenWatch() again. + watch->event.notify_all(); + }; + + auto zk_nodes_changed = [watch] { return watch->zk_nodes_changed; }; + + bool use_timeout = timeout.has_value(); + std::chrono::steady_clock::time_point end_of_timeout; + if (use_timeout) + end_of_timeout = std::chrono::steady_clock::now() + std::chrono::duration_cast(*timeout); + + State state; + + String previous_unready_host; /// Used for logging: we don't want to log the same unready host again. + + for (;;) + { + /// Get zk nodes and subscribe on their changes. + { + std::lock_guard lock{watch->mutex}; + watch->watch_set = true; + watch->zk_nodes_changed = false; + } + Strings zk_nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback); + + /// Read and analyze the current state of zk nodes. + state = readCurrentState(zookeeper, zk_nodes, all_hosts, stage_to_wait); + if (state.error || state.host_terminated || state.unready_hosts.empty()) + break; /// Error happened or everything is ready. + + /// Log that we will wait for another host. + const auto & unready_host = state.unready_hosts.begin()->first; + if (unready_host != previous_unready_host) + { + LOG_TRACE(log, "Waiting for host {}", unready_host); + previous_unready_host = unready_host; + } + + /// Wait until `watch_callback` is called by ZooKeeper meaning that zk nodes have changed. + { + std::unique_lock lock{watch->mutex}; + if (use_timeout) + { + auto current_time = std::chrono::steady_clock::now(); + if ((current_time > end_of_timeout) || !watch->event.wait_for(lock, end_of_timeout - current_time, zk_nodes_changed)) + break; + } + else + { + watch->event.wait(lock, zk_nodes_changed); + } + assert(watch->zk_nodes_changed); + assert(!watch->watch_set); + } + } + + /// Rethrow an error raised originally on another host. + if (state.error) + state.error->second.rethrow(); + + /// Another host terminated without errors. + if (state.host_terminated) + throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Host {} suddenly stopped working", *state.host_terminated); + + /// Something's unready, timeout is probably not enough. + if (!state.unready_hosts.empty()) + { + const auto & [unready_host, unready_host_state] = *state.unready_hosts.begin(); + throw Exception( + ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, + "Waited for host {} too long (> {}){}", + unready_host, + to_string(*timeout), + unready_host_state.started ? "" : ": Operation didn't start"); + } + + return state.results; +} + +} diff --git a/src/Backups/BackupCoordinationStageSync.h b/src/Backups/BackupCoordinationStageSync.h new file mode 100644 index 00000000000..623b58fd9fa --- /dev/null +++ b/src/Backups/BackupCoordinationStageSync.h @@ -0,0 +1,39 @@ +#pragma once + +#include + + +namespace DB +{ + +/// Used to coordinate hosts so all hosts would come to a specific stage at around the same time. +class BackupCoordinationStageSync +{ +public: + BackupCoordinationStageSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_); + + /// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that. + void set(const String & current_host, const String & new_stage, const String & message); + void setError(const String & current_host, const Exception & exception); + + /// Sets the stage of the current host and waits until all hosts come to the same stage. + /// The function returns the messages all hosts set when they come to the required stage. + Strings wait(const Strings & all_hosts, const String & stage_to_wait); + + /// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time. + Strings waitFor(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout); + +private: + void createRootNodes(); + + struct State; + State readCurrentState(zkutil::ZooKeeperPtr zookeeper, const Strings & zk_nodes, const Strings & all_hosts, const String & stage_to_wait) const; + + Strings waitImpl(const Strings & all_hosts, const String & stage_to_wait, std::optional timeout) const; + + String zookeeper_path; + zkutil::GetZooKeeper get_zookeeper; + Poco::Logger * log; +}; + +} diff --git a/src/Backups/BackupCoordinationStatusSync.cpp b/src/Backups/BackupCoordinationStatusSync.cpp deleted file mode 100644 index c0ecfdcaebe..00000000000 --- a/src/Backups/BackupCoordinationStatusSync.cpp +++ /dev/null @@ -1,182 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE; -} - - -BackupCoordinationStatusSync::BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_) - : zookeeper_path(zookeeper_path_) - , get_zookeeper(get_zookeeper_) - , log(log_) -{ - createRootNodes(); -} - -void BackupCoordinationStatusSync::createRootNodes() -{ - auto zookeeper = get_zookeeper(); - zookeeper->createAncestors(zookeeper_path); - zookeeper->createIfNotExists(zookeeper_path, ""); -} - -void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status, const String & message) -{ - auto zookeeper = get_zookeeper(); - zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + new_status, message); -} - -void BackupCoordinationStatusSync::setError(const String & current_host, const Exception & exception) -{ - auto zookeeper = get_zookeeper(); - - Exception exception2 = exception; - exception2.addMessage("Host {}", current_host); - WriteBufferFromOwnString buf; - writeException(exception2, buf, true); - - zookeeper->createIfNotExists(zookeeper_path + "/error", buf.str()); -} - -Strings BackupCoordinationStatusSync::wait(const Strings & all_hosts, const String & status_to_wait) -{ - return waitImpl(all_hosts, status_to_wait, {}); -} - -Strings BackupCoordinationStatusSync::waitFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) -{ - return waitImpl(all_hosts, status_to_wait, timeout_ms); -} - -Strings BackupCoordinationStatusSync::waitImpl(const Strings & all_hosts, const String & status_to_wait, std::optional timeout_ms) -{ - if (all_hosts.empty()) - return {}; - - /// Wait for other hosts. - - Strings ready_hosts_results; - ready_hosts_results.resize(all_hosts.size()); - - std::map /* index in `ready_hosts_results` */> unready_hosts; - for (size_t i = 0; i != all_hosts.size(); ++i) - unready_hosts[all_hosts[i]].push_back(i); - - std::optional error; - - auto zookeeper = get_zookeeper(); - - /// Process ZooKeeper's nodes and set `all_hosts_ready` or `unready_host` or `error_message`. - auto process_zk_nodes = [&](const Strings & zk_nodes) - { - for (const String & zk_node : zk_nodes) - { - if (zk_node.starts_with("remove_watch-")) - continue; - - if (zk_node == "error") - { - ReadBufferFromOwnString buf{zookeeper->get(zookeeper_path + "/error")}; - error = readException(buf, "", true); - break; - } - - size_t separator_pos = zk_node.find('|'); - if (separator_pos == String::npos) - throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Unexpected zk node {}", zookeeper_path + "/" + zk_node); - - String host = zk_node.substr(0, separator_pos); - String status = zk_node.substr(separator_pos + 1); - - auto it = unready_hosts.find(host); - if ((it != unready_hosts.end()) && (status == status_to_wait)) - { - String result = zookeeper->get(zookeeper_path + "/" + zk_node); - for (size_t i : it->second) - ready_hosts_results[i] = result; - unready_hosts.erase(it); - } - } - }; - - /// Wait until all hosts are ready or an error happens or time is out. - std::atomic watch_set = false; - std::condition_variable watch_triggered_event; - - auto watch_callback = [&](const Coordination::WatchResponse &) - { - watch_set = false; /// After it's triggered it's not set until we call getChildrenWatch() again. - watch_triggered_event.notify_all(); - }; - - auto watch_triggered = [&] { return !watch_set; }; - - bool use_timeout = timeout_ms.has_value(); - std::chrono::milliseconds timeout{timeout_ms.value_or(0)}; - std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); - std::chrono::steady_clock::duration elapsed; - std::mutex dummy_mutex; - String previous_unready_host; - - while (!unready_hosts.empty() && !error) - { - watch_set = true; - Strings nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback); - process_zk_nodes(nodes); - - if (!unready_hosts.empty() && !error) - { - const auto & unready_host = unready_hosts.begin()->first; - if (unready_host != previous_unready_host) - { - LOG_TRACE(log, "Waiting for host {}", unready_host); - previous_unready_host = unready_host; - } - - std::unique_lock dummy_lock{dummy_mutex}; - if (use_timeout) - { - elapsed = std::chrono::steady_clock::now() - start_time; - if ((elapsed > timeout) || !watch_triggered_event.wait_for(dummy_lock, timeout - elapsed, watch_triggered)) - break; - } - else - watch_triggered_event.wait(dummy_lock, watch_triggered); - } - } - - if (watch_set) - { - /// Remove watch by triggering it. - zookeeper->create(zookeeper_path + "/remove_watch-", "", zkutil::CreateMode::EphemeralSequential); - std::unique_lock dummy_lock{dummy_mutex}; - watch_triggered_event.wait(dummy_lock, watch_triggered); - } - - if (error) - error->rethrow(); - - if (!unready_hosts.empty()) - { - throw Exception( - ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, - "Waited for host {} too long ({})", - unready_hosts.begin()->first, - to_string(elapsed)); - } - - return ready_hosts_results; -} - -} diff --git a/src/Backups/BackupCoordinationStatusSync.h b/src/Backups/BackupCoordinationStatusSync.h deleted file mode 100644 index fc03e8ec81c..00000000000 --- a/src/Backups/BackupCoordinationStatusSync.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include - - -namespace DB -{ - -/// Used to coordinate hosts so all hosts would come to a specific status at around the same time. -class BackupCoordinationStatusSync -{ -public: - BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_); - - /// Sets the status of the current host and signal other hosts if there were other hosts waiting for that. - void set(const String & current_host, const String & new_status, const String & message); - void setError(const String & current_host, const Exception & exception); - - /// Sets the status of the current host and waits until all hosts come to the same status. - /// The function returns the messages all hosts set when they come to the required status. - Strings wait(const Strings & all_hosts, const String & status_to_wait); - - /// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time. - Strings waitFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms); - - static constexpr const char * kErrorStatus = "error"; - -private: - void createRootNodes(); - Strings waitImpl(const Strings & all_hosts, const String & status_to_wait, std::optional timeout_ms); - - String zookeeper_path; - zkutil::GetZooKeeper get_zookeeper; - Poco::Logger * log; -}; - -} diff --git a/src/Backups/BackupEntriesCollector.cpp b/src/Backups/BackupEntriesCollector.cpp index d2e4b1f8c4b..3cd9649de61 100644 --- a/src/Backups/BackupEntriesCollector.cpp +++ b/src/Backups/BackupEntriesCollector.cpp @@ -34,16 +34,21 @@ namespace ErrorCodes namespace { /// Finding all tables and databases which we're going to put to the backup and collecting their metadata. - constexpr const char * kGatheringMetadataStatus = "gathering metadata"; + constexpr const char * kGatheringMetadataStage = "gathering metadata"; + + String formatGatheringMetadataStage(size_t pass) + { + return fmt::format("{} ({})", kGatheringMetadataStage, pass); + } /// Making temporary hard links and prepare backup entries. - constexpr const char * kExtractingDataFromTablesStatus = "extracting data from tables"; + constexpr const char * kExtractingDataFromTablesStage = "extracting data from tables"; /// Running special tasks for replicated tables which can also prepare some backup entries. - constexpr const char * kRunningPostTasksStatus = "running post-tasks"; + constexpr const char * kRunningPostTasksStage = "running post-tasks"; /// Writing backup entries to the backup and removing temporary hard links. - constexpr const char * kWritingBackupStatus = "writing backup"; + constexpr const char * kWritingBackupStage = "writing backup"; /// Uppercases the first character of a passed string. String toUpperFirst(const String & str) @@ -90,7 +95,8 @@ BackupEntriesCollector::BackupEntriesCollector( , backup_settings(backup_settings_) , backup_coordination(backup_coordination_) , context(context_) - , consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 300000)) + , on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000)) + , consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000)) , log(&Poco::Logger::get("BackupEntriesCollector")) { } @@ -100,7 +106,7 @@ BackupEntriesCollector::~BackupEntriesCollector() = default; BackupEntries BackupEntriesCollector::run() { /// run() can be called onle once. - if (!current_status.empty()) + if (!current_stage.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Already making backup entries"); /// Find other hosts working along with us to execute this ON CLUSTER query. @@ -123,36 +129,40 @@ BackupEntries BackupEntriesCollector::run() makeBackupEntriesForTablesDefs(); /// Make backup entries for the data of the found tables. - setStatus(kExtractingDataFromTablesStatus); + setStage(kExtractingDataFromTablesStage); makeBackupEntriesForTablesData(); /// Run all the tasks added with addPostCollectingTask(). - setStatus(kRunningPostTasksStatus); + setStage(kRunningPostTasksStage); runPostTasks(); /// No more backup entries or tasks are allowed after this point. - setStatus(kWritingBackupStatus); + setStage(kWritingBackupStage); return std::move(backup_entries); } -Strings BackupEntriesCollector::setStatus(const String & new_status, const String & message) +Strings BackupEntriesCollector::setStage(const String & new_stage, const String & message) { - LOG_TRACE(log, "{}", toUpperFirst(new_status)); - current_status = new_status; + LOG_TRACE(log, "{}", toUpperFirst(new_stage)); + current_stage = new_stage; - backup_coordination->setStatus(backup_settings.host_id, new_status, message); + backup_coordination->setStage(backup_settings.host_id, new_stage, message); - if (new_status.starts_with(kGatheringMetadataStatus)) + if (new_stage == formatGatheringMetadataStage(1)) { - auto now = std::chrono::steady_clock::now(); - auto end_of_timeout = std::max(now, consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout); - return backup_coordination->waitStatusFor( - all_hosts, new_status, std::chrono::duration_cast(end_of_timeout - now).count()); + return backup_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout); + } + else if (new_stage.starts_with(kGatheringMetadataStage)) + { + auto current_time = std::chrono::steady_clock::now(); + auto end_of_timeout = std::max(current_time, consistent_metadata_snapshot_end_time); + return backup_coordination->waitForStage( + all_hosts, new_stage, std::chrono::duration_cast(end_of_timeout - current_time)); } else { - return backup_coordination->waitStatus(all_hosts, new_status); + return backup_coordination->waitForStage(all_hosts, new_stage); } } @@ -173,18 +183,18 @@ void BackupEntriesCollector::calculateRootPathInBackup() /// Finds databases and tables which we will put to the backup. void BackupEntriesCollector::gatherMetadataAndCheckConsistency() { - consistent_metadata_snapshot_start_time = std::chrono::steady_clock::now(); - auto end_of_timeout = consistent_metadata_snapshot_start_time + consistent_metadata_snapshot_timeout; - setStatus(fmt::format("{} ({})", kGatheringMetadataStatus, 1)); + setStage(formatGatheringMetadataStage(1)); + + consistent_metadata_snapshot_end_time = std::chrono::steady_clock::now() + consistent_metadata_snapshot_timeout; for (size_t pass = 1;; ++pass) { - String new_status = fmt::format("{} ({})", kGatheringMetadataStatus, pass + 1); + String next_stage = formatGatheringMetadataStage(pass + 1); std::optional inconsistency_error; if (tryGatherMetadataAndCompareWithPrevious(inconsistency_error)) { /// Gathered metadata and checked consistency, cool! But we have to check that other hosts cope with that too. - auto all_hosts_results = setStatus(new_status, "consistent"); + auto all_hosts_results = setStage(next_stage, "consistent"); std::optional host_with_inconsistency; std::optional inconsistency_error_on_other_host; @@ -210,13 +220,13 @@ void BackupEntriesCollector::gatherMetadataAndCheckConsistency() else { /// Failed to gather metadata or something wasn't consistent. We'll let other hosts know that and try again. - setStatus(new_status, inconsistency_error->displayText()); + setStage(next_stage, inconsistency_error->displayText()); } /// Two passes is minimum (we need to compare with table names with previous ones to be sure we don't miss anything). if (pass >= 2) { - if (std::chrono::steady_clock::now() > end_of_timeout) + if (std::chrono::steady_clock::now() > consistent_metadata_snapshot_end_time) inconsistency_error->rethrow(); else LOG_WARNING(log, "{}", inconsistency_error->displayText()); @@ -713,7 +723,7 @@ void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableN void BackupEntriesCollector::addBackupEntry(const String & file_name, BackupEntryPtr backup_entry) { - if (current_status == kWritingBackupStatus) + if (current_stage == kWritingBackupStage) throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding backup entries is not allowed"); backup_entries.emplace_back(file_name, backup_entry); } @@ -725,21 +735,21 @@ void BackupEntriesCollector::addBackupEntry(const std::pair task) { - if (current_status == kWritingBackupStatus) + if (current_stage == kWritingBackupStage) throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of post tasks is not allowed"); post_tasks.push(std::move(task)); } diff --git a/src/Backups/BackupEntriesCollector.h b/src/Backups/BackupEntriesCollector.h index 03710605654..c42b5aedad4 100644 --- a/src/Backups/BackupEntriesCollector.h +++ b/src/Backups/BackupEntriesCollector.h @@ -86,12 +86,13 @@ private: void runPostTasks(); - Strings setStatus(const String & new_status, const String & message = ""); + Strings setStage(const String & new_stage, const String & message = ""); const ASTBackupQuery::Elements backup_query_elements; const BackupSettings backup_settings; std::shared_ptr backup_coordination; ContextPtr context; + std::chrono::milliseconds on_cluster_first_sync_timeout; std::chrono::milliseconds consistent_metadata_snapshot_timeout; Poco::Logger * log; @@ -129,8 +130,8 @@ private: std::optional partitions; }; - String current_status; - std::chrono::steady_clock::time_point consistent_metadata_snapshot_start_time; + String current_stage; + std::chrono::steady_clock::time_point consistent_metadata_snapshot_end_time; std::unordered_map database_infos; std::unordered_map table_infos; std::vector> previous_databases_metadata; diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index 09614886f06..47e1bac3200 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -18,37 +18,86 @@ #include #include #include -#include #include namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - namespace { /// Coordination status meaning that a host finished its work. - constexpr const char * kCompletedCoordinationStatus = "completed"; + constexpr const char * kCompletedStage = "completed"; - /// Sends information about the current exception to IBackupCoordination or IRestoreCoordination. - template - void sendErrorToCoordination(std::shared_ptr coordination, const String & current_host) + std::shared_ptr makeBackupCoordination(const String & coordination_zk_path, const ContextPtr & context, bool is_internal_backup) + { + if (!coordination_zk_path.empty()) + { + auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); }; + return std::make_shared(coordination_zk_path, get_zookeeper, !is_internal_backup); + } + else + { + return std::make_shared(); + } + } + + std::shared_ptr makeRestoreCoordination(const String & coordination_zk_path, const ContextPtr & context, bool is_internal_backup) + { + if (!coordination_zk_path.empty()) + { + auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); }; + return std::make_shared(coordination_zk_path, get_zookeeper, !is_internal_backup); + } + else + { + return std::make_shared(); + } + } + + /// Sends information about an exception to IBackupCoordination or IRestoreCoordination. + template + void sendExceptionToCoordination(std::shared_ptr coordination, const String & current_host, const Exception & exception) { - if (!coordination) - return; try { - coordination->setErrorStatus(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)}); + if (coordination) + coordination->setError(current_host, exception); } catch (...) { } } + + /// Sends information about the current exception to IBackupCoordination or IRestoreCoordination. + template + void sendCurrentExceptionToCoordination(std::shared_ptr coordination, const String & current_host) + { + try + { + throw; + } + catch (const Exception & e) + { + sendExceptionToCoordination(coordination, current_host, e); + } + catch (...) + { + coordination->setError(current_host, Exception{getCurrentExceptionCode(), getCurrentExceptionMessage(true, true)}); + } + } + + /// Used to change num_active_backups. + size_t getNumActiveBackupsChange(BackupStatus status) + { + return status == BackupStatus::MAKING_BACKUP; + } + + /// Used to change num_active_restores. + size_t getNumActiveRestoresChange(BackupStatus status) + { + return status == BackupStatus::RESTORING; + } } @@ -60,6 +109,7 @@ BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threa /// We set max_free_threads = 0 because we don't want to keep any threads if there is no BACKUP or RESTORE query running right now. } + UUID BackupsWorker::start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context) { const ASTBackupQuery & backup_query = typeid_cast(*backup_or_restore_query); @@ -74,308 +124,359 @@ UUID BackupsWorker::startMakingBackup(const ASTPtr & query, const ContextPtr & c { auto backup_query = std::static_pointer_cast(query->clone()); auto backup_settings = BackupSettings::fromBackupQuery(*backup_query); - auto backup_info = BackupInfo::fromAST(*backup_query->backup_name); - bool on_cluster = !backup_query->cluster.empty(); if (!backup_settings.backup_uuid) backup_settings.backup_uuid = UUIDHelpers::generateV4(); UUID backup_uuid = *backup_settings.backup_uuid; - /// Prepare context to use. - ContextPtr context_in_use = context; - ContextMutablePtr mutable_context; - if (on_cluster || backup_settings.async) + std::shared_ptr backup_coordination; + if (!backup_settings.coordination_zk_path.empty()) + backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal); + + try { - /// For ON CLUSTER queries we will need to change some settings. - /// For ASYNC queries we have to clone the context anyway. - context_in_use = mutable_context = Context::createCopy(context); + auto backup_info = BackupInfo::fromAST(*backup_query->backup_name); + addInfo(backup_uuid, backup_info.toString(), BackupStatus::MAKING_BACKUP, backup_settings.internal); + + /// Prepare context to use. + ContextPtr context_in_use = context; + ContextMutablePtr mutable_context; + bool on_cluster = !backup_query->cluster.empty(); + if (on_cluster || backup_settings.async) + { + /// For ON CLUSTER queries we will need to change some settings. + /// For ASYNC queries we have to clone the context anyway. + context_in_use = mutable_context = Context::createCopy(context); + } + + if (backup_settings.async) + { + backups_thread_pool.scheduleOrThrowOnError( + [this, backup_uuid, backup_query, backup_settings, backup_info, backup_coordination, context_in_use, mutable_context] { + doBackup( + backup_uuid, + backup_query, + backup_settings, + backup_info, + backup_coordination, + context_in_use, + mutable_context, + true); + }); + } + else + { + doBackup(backup_uuid, backup_query, backup_settings, backup_info, backup_coordination, context_in_use, mutable_context, false); + } + + return backup_uuid; } - - addInfo(backup_uuid, backup_info.toString(), BackupStatus::MAKING_BACKUP, backup_settings.internal); - - auto job = [this, - backup_uuid, - backup_query, - backup_settings, - backup_info, - on_cluster, - context_in_use, - mutable_context](bool async) mutable + catch (...) { - std::optional query_scope; - std::shared_ptr backup_coordination; - SCOPE_EXIT_SAFE(if (backup_coordination && !backup_settings.internal) backup_coordination->drop();); + /// Something bad happened, the backup has not built. + setStatus(backup_uuid, BackupStatus::FAILED_TO_BACKUP); + sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id); + throw; + } +} - try + +void BackupsWorker::doBackup( + const UUID & backup_uuid, + const std::shared_ptr & backup_query, + BackupSettings backup_settings, + const BackupInfo & backup_info, + std::shared_ptr backup_coordination, + const ContextPtr & context, + ContextMutablePtr mutable_context, + bool called_async) +{ + std::optional query_scope; + try + { + if (called_async) { - if (async) - { - query_scope.emplace(mutable_context); - setThreadName("BackupWorker"); - } - - /// Checks access rights if this is not ON CLUSTER query. - /// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.) - auto required_access = getRequiredAccessToBackup(backup_query->elements); - if (!on_cluster) - context_in_use->checkAccess(required_access); - - ClusterPtr cluster; - if (on_cluster) - { - backup_query->cluster = context_in_use->getMacros()->expand(backup_query->cluster); - cluster = context_in_use->getCluster(backup_query->cluster); - backup_settings.cluster_host_ids = cluster->getHostIDs(); - if (backup_settings.coordination_zk_path.empty()) - { - String root_zk_path = context_in_use->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); - backup_settings.coordination_zk_path = root_zk_path + "/backup-" + toString(backup_uuid); - } - } - - /// Make a backup coordination. - if (!backup_settings.coordination_zk_path.empty()) - { - backup_coordination = std::make_shared( - backup_settings.coordination_zk_path, - [global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); }); - } - else - { - backup_coordination = std::make_shared(); - } - - /// Opens a backup for writing. - BackupFactory::CreateParams backup_create_params; - backup_create_params.open_mode = IBackup::OpenMode::WRITE; - backup_create_params.context = context_in_use; - backup_create_params.backup_info = backup_info; - backup_create_params.base_backup_info = backup_settings.base_backup_info; - backup_create_params.compression_method = backup_settings.compression_method; - backup_create_params.compression_level = backup_settings.compression_level; - backup_create_params.password = backup_settings.password; - backup_create_params.is_internal_backup = backup_settings.internal; - backup_create_params.backup_coordination = backup_coordination; - backup_create_params.backup_uuid = backup_uuid; - BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params); - - /// Write the backup. - if (on_cluster) - { - DDLQueryOnClusterParams params; - params.cluster = cluster; - params.only_shard_num = backup_settings.shard_num; - params.only_replica_num = backup_settings.replica_num; - params.access_to_check = required_access; - backup_settings.copySettingsToQuery(*backup_query); - - // executeDDLQueryOnCluster() will return without waiting for completion - mutable_context->setSetting("distributed_ddl_task_timeout", Field{0}); - mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"}); - executeDDLQueryOnCluster(backup_query, mutable_context, params); - - /// Wait until all the hosts have written their backup entries. - auto all_hosts = BackupSettings::Util::filterHostIDs( - backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num); - backup_coordination->waitStatus(all_hosts, kCompletedCoordinationStatus); - } - else - { - backup_query->setCurrentDatabase(context_in_use->getCurrentDatabase()); - - /// Prepare backup entries. - BackupEntries backup_entries; - { - BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context_in_use}; - backup_entries = backup_entries_collector.run(); - } - - /// Write the backup entries to the backup. - writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool); - - /// We have written our backup entries, we need to tell other hosts (they could be waiting for it). - backup_coordination->setStatus(backup_settings.host_id, kCompletedCoordinationStatus, ""); - } - - /// Finalize backup (write its metadata). - if (!backup_settings.internal) - backup->finalizeWriting(); - - /// Close the backup. - backup.reset(); - - setStatus(backup_uuid, BackupStatus::BACKUP_COMPLETE); + query_scope.emplace(mutable_context); + setThreadName("BackupWorker"); } - catch (...) + + bool on_cluster = !backup_query->cluster.empty(); + assert(mutable_context || (!on_cluster && !called_async)); + + /// Checks access rights if this is not ON CLUSTER query. + /// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.) + auto required_access = getRequiredAccessToBackup(backup_query->elements); + if (!on_cluster) + context->checkAccess(required_access); + + ClusterPtr cluster; + if (on_cluster) { - /// Something bad happened, the backup has not built. + backup_query->cluster = context->getMacros()->expand(backup_query->cluster); + cluster = context->getCluster(backup_query->cluster); + backup_settings.cluster_host_ids = cluster->getHostIDs(); + if (backup_settings.coordination_zk_path.empty()) + { + String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); + backup_settings.coordination_zk_path = root_zk_path + "/backup-" + toString(backup_uuid); + } + } + + /// Make a backup coordination. + if (!backup_coordination) + backup_coordination = makeBackupCoordination(backup_settings.coordination_zk_path, context, backup_settings.internal); + + /// Opens a backup for writing. + BackupFactory::CreateParams backup_create_params; + backup_create_params.open_mode = IBackup::OpenMode::WRITE; + backup_create_params.context = context; + backup_create_params.backup_info = backup_info; + backup_create_params.base_backup_info = backup_settings.base_backup_info; + backup_create_params.compression_method = backup_settings.compression_method; + backup_create_params.compression_level = backup_settings.compression_level; + backup_create_params.password = backup_settings.password; + backup_create_params.is_internal_backup = backup_settings.internal; + backup_create_params.backup_coordination = backup_coordination; + backup_create_params.backup_uuid = backup_uuid; + BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params); + + /// Write the backup. + if (on_cluster) + { + DDLQueryOnClusterParams params; + params.cluster = cluster; + params.only_shard_num = backup_settings.shard_num; + params.only_replica_num = backup_settings.replica_num; + params.access_to_check = required_access; + backup_settings.copySettingsToQuery(*backup_query); + + // executeDDLQueryOnCluster() will return without waiting for completion + mutable_context->setSetting("distributed_ddl_task_timeout", Field{0}); + mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"}); + executeDDLQueryOnCluster(backup_query, mutable_context, params); + + /// Wait until all the hosts have written their backup entries. + auto all_hosts = BackupSettings::Util::filterHostIDs( + backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num); + backup_coordination->waitForStage(all_hosts, kCompletedStage); + } + else + { + backup_query->setCurrentDatabase(context->getCurrentDatabase()); + + /// Prepare backup entries. + BackupEntries backup_entries; + { + BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context}; + backup_entries = backup_entries_collector.run(); + } + + /// Write the backup entries to the backup. + writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool); + + /// We have written our backup entries, we need to tell other hosts (they could be waiting for it). + backup_coordination->setStage(backup_settings.host_id, kCompletedStage, ""); + } + + /// Finalize backup (write its metadata). + if (!backup_settings.internal) + backup->finalizeWriting(); + + /// Close the backup. + backup.reset(); + + LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_info.toString()); + setStatus(backup_uuid, BackupStatus::BACKUP_COMPLETE); + } + catch (...) + { + /// Something bad happened, the backup has not built. + if (called_async) + { + tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_info.toString())); setStatus(backup_uuid, BackupStatus::FAILED_TO_BACKUP); - sendErrorToCoordination(backup_coordination, backup_settings.host_id); - if (!async) - throw; + sendCurrentExceptionToCoordination(backup_coordination, backup_settings.host_id); } - }; - - if (backup_settings.async) - backups_thread_pool.scheduleOrThrowOnError([job]() mutable { job(true); }); - else - job(false); - - return backup_uuid; + else + { + /// setStatus() and sendCurrentExceptionToCoordination() will be called by startMakingBackup(). + throw; + } + } } UUID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context) { - UUID restore_uuid = UUIDHelpers::generateV4(); auto restore_query = std::static_pointer_cast(query->clone()); auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query); - auto backup_info = BackupInfo::fromAST(*restore_query->backup_name); - bool on_cluster = !restore_query->cluster.empty(); + UUID restore_uuid = UUIDHelpers::generateV4(); - /// Prepare context to use. - ContextMutablePtr context_in_use = context; - if (restore_settings.async || on_cluster) + std::shared_ptr restore_coordination; + if (!restore_settings.coordination_zk_path.empty()) + restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal); + + try { - /// For ON CLUSTER queries we will need to change some settings. - /// For ASYNC queries we have to clone the context anyway. - context_in_use = Context::createCopy(context); + auto backup_info = BackupInfo::fromAST(*restore_query->backup_name); + addInfo(restore_uuid, backup_info.toString(), BackupStatus::RESTORING, restore_settings.internal); + + /// Prepare context to use. + ContextMutablePtr context_in_use = context; + bool on_cluster = !restore_query->cluster.empty(); + if (restore_settings.async || on_cluster) + { + /// For ON CLUSTER queries we will need to change some settings. + /// For ASYNC queries we have to clone the context anyway. + context_in_use = Context::createCopy(context); + } + + if (restore_settings.async) + { + backups_thread_pool.scheduleOrThrowOnError( + [this, restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use] + { doRestore(restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use, true); }); + } + else + { + doRestore(restore_uuid, restore_query, restore_settings, backup_info, restore_coordination, context_in_use, false); + } + + return restore_uuid; } - - addInfo(restore_uuid, backup_info.toString(), BackupStatus::RESTORING, restore_settings.internal); - - auto job = [this, - restore_uuid, - restore_query, - restore_settings, - backup_info, - on_cluster, - context_in_use](bool async) mutable + catch (...) { - std::optional query_scope; - std::shared_ptr restore_coordination; - SCOPE_EXIT_SAFE(if (restore_coordination && !restore_settings.internal) restore_coordination->drop();); + /// Something bad happened, the backup has not built. + setStatus(restore_uuid, BackupStatus::FAILED_TO_RESTORE); + sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id); + throw; + } +} - try + +void BackupsWorker::doRestore( + const UUID & restore_uuid, + const std::shared_ptr & restore_query, + RestoreSettings restore_settings, + const BackupInfo & backup_info, + std::shared_ptr restore_coordination, + ContextMutablePtr context, + bool called_async) +{ + std::optional query_scope; + try + { + if (called_async) { - if (async) - { - query_scope.emplace(context_in_use); - setThreadName("RestoreWorker"); - } - - /// Open the backup for reading. - BackupFactory::CreateParams backup_open_params; - backup_open_params.open_mode = IBackup::OpenMode::READ; - backup_open_params.context = context_in_use; - backup_open_params.backup_info = backup_info; - backup_open_params.base_backup_info = restore_settings.base_backup_info; - backup_open_params.password = restore_settings.password; - BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params); - - String current_database = context_in_use->getCurrentDatabase(); - - /// Checks access rights if this is ON CLUSTER query. - /// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.) - ClusterPtr cluster; - if (on_cluster) - { - restore_query->cluster = context_in_use->getMacros()->expand(restore_query->cluster); - cluster = context_in_use->getCluster(restore_query->cluster); - restore_settings.cluster_host_ids = cluster->getHostIDs(); - - /// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect - /// because different replicas can contain different set of tables and so the required access rights can differ too. - /// So the right way is pass through the entire cluster and check access for each host. - auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num); - for (const auto * address : addresses) - { - restore_settings.host_id = address->toString(); - auto restore_elements = restore_query->elements; - String addr_database = address->default_database.empty() ? current_database : address->default_database; - for (auto & element : restore_elements) - element.setCurrentDatabase(addr_database); - RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context_in_use}; - dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY); - } - } - - /// Make a restore coordination. - if (on_cluster && restore_settings.coordination_zk_path.empty()) - { - String root_zk_path = context_in_use->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); - restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_uuid); - } - - if (!restore_settings.coordination_zk_path.empty()) - { - restore_coordination = std::make_shared( - restore_settings.coordination_zk_path, - [global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); }); - } - else - { - restore_coordination = std::make_shared(); - } - - /// Do RESTORE. - if (on_cluster) - { - - DDLQueryOnClusterParams params; - params.cluster = cluster; - params.only_shard_num = restore_settings.shard_num; - params.only_replica_num = restore_settings.replica_num; - restore_settings.copySettingsToQuery(*restore_query); - - // executeDDLQueryOnCluster() will return without waiting for completion - context_in_use->setSetting("distributed_ddl_task_timeout", Field{0}); - context_in_use->setSetting("distributed_ddl_output_mode", Field{"none"}); - - executeDDLQueryOnCluster(restore_query, context_in_use, params); - - /// Wait until all the hosts have written their backup entries. - auto all_hosts = BackupSettings::Util::filterHostIDs( - restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num); - restore_coordination->waitStatus(all_hosts, kCompletedCoordinationStatus); - } - else - { - restore_query->setCurrentDatabase(current_database); - - /// Restore metadata and prepare data restoring tasks. - DataRestoreTasks data_restore_tasks; - { - RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination, - backup, context_in_use}; - data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE); - } - - /// Execute the data restoring tasks. - restoreTablesData(std::move(data_restore_tasks), restores_thread_pool); - - /// We have restored everything, we need to tell other hosts (they could be waiting for it). - restore_coordination->setStatus(restore_settings.host_id, kCompletedCoordinationStatus, ""); - } - - setStatus(restore_uuid, BackupStatus::RESTORED); + query_scope.emplace(context); + setThreadName("RestoreWorker"); } - catch (...) + + /// Open the backup for reading. + BackupFactory::CreateParams backup_open_params; + backup_open_params.open_mode = IBackup::OpenMode::READ; + backup_open_params.context = context; + backup_open_params.backup_info = backup_info; + backup_open_params.base_backup_info = restore_settings.base_backup_info; + backup_open_params.password = restore_settings.password; + BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params); + + String current_database = context->getCurrentDatabase(); + + /// Checks access rights if this is ON CLUSTER query. + /// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.) + ClusterPtr cluster; + bool on_cluster = !restore_query->cluster.empty(); + if (on_cluster) { - /// Something bad happened, the backup has not built. + restore_query->cluster = context->getMacros()->expand(restore_query->cluster); + cluster = context->getCluster(restore_query->cluster); + restore_settings.cluster_host_ids = cluster->getHostIDs(); + + /// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect + /// because different replicas can contain different set of tables and so the required access rights can differ too. + /// So the right way is pass through the entire cluster and check access for each host. + auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num); + for (const auto * address : addresses) + { + restore_settings.host_id = address->toString(); + auto restore_elements = restore_query->elements; + String addr_database = address->default_database.empty() ? current_database : address->default_database; + for (auto & element : restore_elements) + element.setCurrentDatabase(addr_database); + RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context}; + dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY); + } + } + + /// Make a restore coordination. + if (on_cluster && restore_settings.coordination_zk_path.empty()) + { + String root_zk_path = context->getConfigRef().getString("backups.zookeeper_path", "/clickhouse/backups"); + restore_settings.coordination_zk_path = root_zk_path + "/restore-" + toString(restore_uuid); + } + + if (!restore_coordination) + restore_coordination = makeRestoreCoordination(restore_settings.coordination_zk_path, context, restore_settings.internal); + + /// Do RESTORE. + if (on_cluster) + { + + DDLQueryOnClusterParams params; + params.cluster = cluster; + params.only_shard_num = restore_settings.shard_num; + params.only_replica_num = restore_settings.replica_num; + restore_settings.copySettingsToQuery(*restore_query); + + // executeDDLQueryOnCluster() will return without waiting for completion + context->setSetting("distributed_ddl_task_timeout", Field{0}); + context->setSetting("distributed_ddl_output_mode", Field{"none"}); + + executeDDLQueryOnCluster(restore_query, context, params); + + /// Wait until all the hosts have written their backup entries. + auto all_hosts = BackupSettings::Util::filterHostIDs( + restore_settings.cluster_host_ids, restore_settings.shard_num, restore_settings.replica_num); + restore_coordination->waitForStage(all_hosts, kCompletedStage); + } + else + { + restore_query->setCurrentDatabase(current_database); + + /// Restore metadata and prepare data restoring tasks. + DataRestoreTasks data_restore_tasks; + { + RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination, + backup, context}; + data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE); + } + + /// Execute the data restoring tasks. + restoreTablesData(std::move(data_restore_tasks), restores_thread_pool); + + /// We have restored everything, we need to tell other hosts (they could be waiting for it). + restore_coordination->setStage(restore_settings.host_id, kCompletedStage, ""); + } + + LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_info.toString()); + setStatus(restore_uuid, BackupStatus::RESTORED); + } + catch (...) + { + /// Something bad happened, the backup has not built. + if (called_async) + { + tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_info.toString())); setStatus(restore_uuid, BackupStatus::FAILED_TO_RESTORE); - sendErrorToCoordination(restore_coordination, restore_settings.host_id); - if (!async) - throw; + sendCurrentExceptionToCoordination(restore_coordination, restore_settings.host_id); } - }; - - if (restore_settings.async) - backups_thread_pool.scheduleOrThrowOnError([job]() mutable { job(true); }); - else - job(false); - - return restore_uuid; + else + { + /// setStatus() and sendCurrentExceptionToCoordination() will be called by startRestoring(). + throw; + } + } } @@ -387,37 +488,28 @@ void BackupsWorker::addInfo(const UUID & uuid, const String & backup_name, Backu info.status = status; info.status_changed_time = time(nullptr); info.internal = internal; + std::lock_guard lock{infos_mutex}; infos[uuid] = std::move(info); + + num_active_backups += getNumActiveBackupsChange(status); + num_active_restores += getNumActiveRestoresChange(status); } + void BackupsWorker::setStatus(const UUID & uuid, BackupStatus status) { std::lock_guard lock{infos_mutex}; - auto & info = infos.at(uuid); + auto it = infos.find(uuid); + if (it == infos.end()) + return; + + auto & info = it->second; + auto old_status = info.status; info.status = status; info.status_changed_time = time(nullptr); - - if (status == BackupStatus::BACKUP_COMPLETE) - { - LOG_INFO(log, "{} {} was created successfully", (info.internal ? "Internal backup" : "Backup"), info.backup_name); - } - else if (status == BackupStatus::RESTORED) - { - LOG_INFO(log, "Restored from {} {} successfully", (info.internal ? "internal backup" : "backup"), info.backup_name); - } - else if ((status == BackupStatus::FAILED_TO_BACKUP) || (status == BackupStatus::FAILED_TO_RESTORE)) - { - String start_of_message; - if (status == BackupStatus::FAILED_TO_BACKUP) - start_of_message = fmt::format("Failed to create {} {}", (info.internal ? "internal backup" : "backup"), info.backup_name); - else - start_of_message = fmt::format("Failed to restore from {} {}", (info.internal ? "internal backup" : "backup"), info.backup_name); - tryLogCurrentException(log, start_of_message); - - info.error_message = getCurrentExceptionMessage(false); - info.exception = std::current_exception(); - } + num_active_backups += getNumActiveBackupsChange(status) - getNumActiveBackupsChange(old_status); + num_active_restores += getNumActiveRestoresChange(status) - getNumActiveRestoresChange(old_status); } @@ -428,7 +520,7 @@ void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool rethrow_excep { auto it = infos.find(backup_or_restore_uuid); if (it == infos.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "BackupsWorker: Unknown UUID {}", toString(backup_or_restore_uuid)); + return true; const auto & info = it->second; auto current_status = info.status; if (rethrow_exception && ((current_status == BackupStatus::FAILED_TO_BACKUP) || (current_status == BackupStatus::FAILED_TO_RESTORE))) @@ -437,12 +529,12 @@ void BackupsWorker::wait(const UUID & backup_or_restore_uuid, bool rethrow_excep }); } -BackupsWorker::Info BackupsWorker::getInfo(const UUID & backup_or_restore_uuid) const +std::optional BackupsWorker::tryGetInfo(const UUID & backup_or_restore_uuid) const { std::lock_guard lock{infos_mutex}; auto it = infos.find(backup_or_restore_uuid); if (it == infos.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "BackupsWorker: Unknown UUID {}", toString(backup_or_restore_uuid)); + return std::nullopt; return it->second; } @@ -457,14 +549,15 @@ std::vector BackupsWorker::getAllInfos() const void BackupsWorker::shutdown() { - size_t num_active_backups = backups_thread_pool.active(); - size_t num_active_restores = restores_thread_pool.active(); - if (!num_active_backups && !num_active_restores) - return; - LOG_INFO(log, "Waiting for {} backup and {} restore tasks to be finished", num_active_backups, num_active_restores); + bool has_active_backups_or_restores = (num_active_backups || num_active_restores); + if (has_active_backups_or_restores) + LOG_INFO(log, "Waiting for {} backups and {} restores to be finished", num_active_backups, num_active_restores); + backups_thread_pool.wait(); restores_thread_pool.wait(); - LOG_INFO(log, "All backup and restore tasks have finished"); + + if (has_active_backups_or_restores) + LOG_INFO(log, "All backup and restore tasks have finished"); } } diff --git a/src/Backups/BackupsWorker.h b/src/Backups/BackupsWorker.h index f546fa2497d..8db9c1367a9 100644 --- a/src/Backups/BackupsWorker.h +++ b/src/Backups/BackupsWorker.h @@ -11,6 +11,13 @@ namespace Poco::Util { class AbstractConfiguration; } namespace DB { +class ASTBackupQuery; +struct BackupSettings; +struct RestoreSettings; +struct BackupInfo; +class IBackupCoordination; +class IRestoreCoordination; + /// Manager of backups and restores: executes backups and restores' threads in the background. /// Keeps information about backups and restores started in this session. class BackupsWorker @@ -47,12 +54,21 @@ public: bool internal = false; }; - Info getInfo(const UUID & backup_or_restore_uuid) const; + std::optional tryGetInfo(const UUID & backup_or_restore_uuid) const; std::vector getAllInfos() const; private: UUID startMakingBackup(const ASTPtr & query, const ContextPtr & context); + + void doBackup(const UUID & backup_uuid, const std::shared_ptr & backup_query, BackupSettings backup_settings, + const BackupInfo & backup_info, std::shared_ptr backup_coordination, const ContextPtr & context, + ContextMutablePtr mutable_context, bool called_async); + UUID startRestoring(const ASTPtr & query, ContextMutablePtr context); + + void doRestore(const UUID & restore_uuid, const std::shared_ptr & restore_query, RestoreSettings restore_settings, + const BackupInfo & backup_info, std::shared_ptr restore_coordination, ContextMutablePtr context, + bool called_async); void addInfo(const UUID & uuid, const String & backup_name, BackupStatus status, bool internal); void setStatus(const UUID & uuid, BackupStatus status); @@ -62,6 +78,8 @@ private: std::unordered_map infos; std::condition_variable status_changed; + std::atomic num_active_backups = 0; + std::atomic num_active_restores = 0; mutable std::mutex infos_mutex; Poco::Logger * log; }; diff --git a/src/Backups/IBackupCoordination.h b/src/Backups/IBackupCoordination.h index 7cf43efea74..5e120218544 100644 --- a/src/Backups/IBackupCoordination.h +++ b/src/Backups/IBackupCoordination.h @@ -18,11 +18,11 @@ class IBackupCoordination public: virtual ~IBackupCoordination() = default; - /// Sets the current status and waits for other hosts to come to this status too. - virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0; - virtual void setErrorStatus(const String & current_host, const Exception & exception) = 0; - virtual Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) = 0; - virtual Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) = 0; + /// Sets the current stage and waits for other hosts to come to this stage too. + virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0; + virtual void setError(const String & current_host, const Exception & exception) = 0; + virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0; + virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; struct PartNameAndChecksum { @@ -115,9 +115,6 @@ public: /// Returns the list of all the archive suffixes which were generated. virtual Strings getAllArchiveSuffixes() const = 0; - - /// Removes remotely stored information. - virtual void drop() {} }; } diff --git a/src/Backups/IRestoreCoordination.h b/src/Backups/IRestoreCoordination.h index e852fa3c2d4..692054ae267 100644 --- a/src/Backups/IRestoreCoordination.h +++ b/src/Backups/IRestoreCoordination.h @@ -16,11 +16,11 @@ class IRestoreCoordination public: virtual ~IRestoreCoordination() = default; - /// Sets the current status and waits for other hosts to come to this status too. - virtual void setStatus(const String & current_host, const String & new_status, const String & message) = 0; - virtual void setErrorStatus(const String & current_host, const Exception & exception) = 0; - virtual Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) = 0; - virtual Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) = 0; + /// Sets the current stage and waits for other hosts to come to this stage too. + virtual void setStage(const String & current_host, const String & new_stage, const String & message) = 0; + virtual void setError(const String & current_host, const Exception & exception) = 0; + virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) = 0; + virtual Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) = 0; static constexpr const char * kErrorStatus = "error"; @@ -34,9 +34,6 @@ public: /// Sets that this replica is going to restore a ReplicatedAccessStorage. /// The function returns false if this access storage is being already restored by another replica. virtual bool acquireReplicatedAccessStorage(const String & access_storage_zk_path) = 0; - - /// Removes remotely stored information. - virtual void drop() {} }; } diff --git a/src/Backups/RestoreCoordinationLocal.cpp b/src/Backups/RestoreCoordinationLocal.cpp index deab75dc7de..b2a9849c38d 100644 --- a/src/Backups/RestoreCoordinationLocal.cpp +++ b/src/Backups/RestoreCoordinationLocal.cpp @@ -7,20 +7,20 @@ namespace DB RestoreCoordinationLocal::RestoreCoordinationLocal() = default; RestoreCoordinationLocal::~RestoreCoordinationLocal() = default; -void RestoreCoordinationLocal::setStatus(const String &, const String &, const String &) +void RestoreCoordinationLocal::setStage(const String &, const String &, const String &) { } -void RestoreCoordinationLocal::setErrorStatus(const String &, const Exception &) +void RestoreCoordinationLocal::setError(const String &, const Exception &) { } -Strings RestoreCoordinationLocal::waitStatus(const Strings &, const String &) +Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &) { return {}; } -Strings RestoreCoordinationLocal::waitStatusFor(const Strings &, const String &, UInt64) +Strings RestoreCoordinationLocal::waitForStage(const Strings &, const String &, std::chrono::milliseconds) { return {}; } diff --git a/src/Backups/RestoreCoordinationLocal.h b/src/Backups/RestoreCoordinationLocal.h index d8b0052cbd2..b4e70d83b72 100644 --- a/src/Backups/RestoreCoordinationLocal.h +++ b/src/Backups/RestoreCoordinationLocal.h @@ -18,11 +18,11 @@ public: RestoreCoordinationLocal(); ~RestoreCoordinationLocal() override; - /// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts. - void setStatus(const String & current_host, const String & new_status, const String & message) override; - void setErrorStatus(const String & current_host, const Exception & exception) override; - Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override; - Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override; + /// Sets the current stage and waits for other hosts to come to this stage too. + void setStage(const String & current_host, const String & new_stage, const String & message) override; + void setError(const String & current_host, const Exception & exception) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; /// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table. bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override; diff --git a/src/Backups/RestoreCoordinationRemote.cpp b/src/Backups/RestoreCoordinationRemote.cpp index 86c8ca6b509..fcc6a2a24b3 100644 --- a/src/Backups/RestoreCoordinationRemote.cpp +++ b/src/Backups/RestoreCoordinationRemote.cpp @@ -6,15 +6,27 @@ namespace DB { -RestoreCoordinationRemote::RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_) +RestoreCoordinationRemote::RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_) : zookeeper_path(zookeeper_path_) , get_zookeeper(get_zookeeper_) - , status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("RestoreCoordination")) + , remove_zk_nodes_in_destructor(remove_zk_nodes_in_destructor_) + , stage_sync(zookeeper_path_ + "/stage", get_zookeeper_, &Poco::Logger::get("RestoreCoordination")) { createRootNodes(); } -RestoreCoordinationRemote::~RestoreCoordinationRemote() = default; +RestoreCoordinationRemote::~RestoreCoordinationRemote() +{ + try + { + if (remove_zk_nodes_in_destructor) + removeAllNodes(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} void RestoreCoordinationRemote::createRootNodes() { @@ -27,24 +39,24 @@ void RestoreCoordinationRemote::createRootNodes() } -void RestoreCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message) +void RestoreCoordinationRemote::setStage(const String & current_host, const String & new_stage, const String & message) { - status_sync.set(current_host, new_status, message); + stage_sync.set(current_host, new_stage, message); } -void RestoreCoordinationRemote::setErrorStatus(const String & current_host, const Exception & exception) +void RestoreCoordinationRemote::setError(const String & current_host, const Exception & exception) { - status_sync.setError(current_host, exception); + stage_sync.setError(current_host, exception); } -Strings RestoreCoordinationRemote::waitStatus(const Strings & all_hosts, const String & status_to_wait) +Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait) { - return status_sync.wait(all_hosts, status_to_wait); + return stage_sync.wait(all_hosts, stage_to_wait); } -Strings RestoreCoordinationRemote::waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) +Strings RestoreCoordinationRemote::waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) { - return status_sync.waitFor(all_hosts, status_to_wait, timeout_ms); + return stage_sync.waitFor(all_hosts, stage_to_wait, timeout); } @@ -93,9 +105,4 @@ void RestoreCoordinationRemote::removeAllNodes() zookeeper->removeRecursive(zookeeper_path); } -void RestoreCoordinationRemote::drop() -{ - removeAllNodes(); -} - } diff --git a/src/Backups/RestoreCoordinationRemote.h b/src/Backups/RestoreCoordinationRemote.h index 883ea953efc..0cbbb6622ad 100644 --- a/src/Backups/RestoreCoordinationRemote.h +++ b/src/Backups/RestoreCoordinationRemote.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include namespace DB @@ -11,14 +11,14 @@ namespace DB class RestoreCoordinationRemote : public IRestoreCoordination { public: - RestoreCoordinationRemote(const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper); + RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, bool remove_zk_nodes_in_destructor_); ~RestoreCoordinationRemote() override; - /// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts. - void setStatus(const String & current_host, const String & new_status, const String & message) override; - void setErrorStatus(const String & current_host, const Exception & exception) override; - Strings waitStatus(const Strings & all_hosts, const String & status_to_wait) override; - Strings waitStatusFor(const Strings & all_hosts, const String & status_to_wait, UInt64 timeout_ms) override; + /// Sets the current stage and waits for other hosts to come to this stage too. + void setStage(const String & current_host, const String & new_stage, const String & message) override; + void setError(const String & current_host, const Exception & exception) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait) override; + Strings waitForStage(const Strings & all_hosts, const String & stage_to_wait, std::chrono::milliseconds timeout) override; /// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table. bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override; @@ -31,9 +31,6 @@ public: /// The function returns false if this access storage is being already restored by another replica. bool acquireReplicatedAccessStorage(const String & access_storage_zk_path) override; - /// Removes remotely stored information. - void drop() override; - private: void createRootNodes(); void removeAllNodes(); @@ -42,7 +39,9 @@ private: const String zookeeper_path; const zkutil::GetZooKeeper get_zookeeper; - BackupCoordinationStatusSync status_sync; + const bool remove_zk_nodes_in_destructor; + + BackupCoordinationStageSync stage_sync; }; } diff --git a/src/Backups/RestorerFromBackup.cpp b/src/Backups/RestorerFromBackup.cpp index b67cdf9c4dd..5e43d59ae56 100644 --- a/src/Backups/RestorerFromBackup.cpp +++ b/src/Backups/RestorerFromBackup.cpp @@ -41,16 +41,16 @@ namespace ErrorCodes namespace { /// Finding databases and tables in the backup which we're going to restore. - constexpr const char * kFindingTablesInBackupStatus = "finding tables in backup"; + constexpr const char * kFindingTablesInBackupStage = "finding tables in backup"; /// Creating databases or finding them and checking their definitions. - constexpr const char * kCreatingDatabasesStatus = "creating databases"; + constexpr const char * kCreatingDatabasesStage = "creating databases"; /// Creating tables or finding them and checking their definition. - constexpr const char * kCreatingTablesStatus = "creating tables"; + constexpr const char * kCreatingTablesStage = "creating tables"; /// Inserting restored data to tables. - constexpr const char * kInsertingDataToTablesStatus = "inserting data to tables"; + constexpr const char * kInsertingDataToTablesStage = "inserting data to tables"; /// Uppercases the first character of a passed string. String toUpperFirst(const String & str) @@ -102,6 +102,7 @@ RestorerFromBackup::RestorerFromBackup( , restore_coordination(restore_coordination_) , backup(backup_) , context(context_) + , on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000)) , create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000)) , log(&Poco::Logger::get("RestorerFromBackup")) { @@ -112,7 +113,7 @@ RestorerFromBackup::~RestorerFromBackup() = default; RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode) { /// run() can be called onle once. - if (!current_status.empty()) + if (!current_stage.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Already restoring"); /// Find other hosts working along with us to execute this ON CLUSTER query. @@ -126,7 +127,7 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode) findRootPathsInBackup(); /// Find all the databases and tables which we will read from the backup. - setStatus(kFindingTablesInBackupStatus); + setStage(kFindingTablesInBackupStage); findDatabasesAndTablesInBackup(); /// Check access rights. @@ -136,27 +137,31 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode) return {}; /// Create databases using the create queries read from the backup. - setStatus(kCreatingDatabasesStatus); + setStage(kCreatingDatabasesStage); createDatabases(); /// Create tables using the create queries read from the backup. - setStatus(kCreatingTablesStatus); + setStage(kCreatingTablesStage); createTables(); /// All what's left is to insert data to tables. /// No more data restoring tasks are allowed after this point. - setStatus(kInsertingDataToTablesStatus); + setStage(kInsertingDataToTablesStage); return getDataRestoreTasks(); } -void RestorerFromBackup::setStatus(const String & new_status, const String & message) +void RestorerFromBackup::setStage(const String & new_stage, const String & message) { - LOG_TRACE(log, "{}", toUpperFirst(new_status)); - current_status = new_status; + LOG_TRACE(log, "{}", toUpperFirst(new_stage)); + current_stage = new_stage; + if (restore_coordination) { - restore_coordination->setStatus(restore_settings.host_id, new_status, message); - restore_coordination->waitStatus(all_hosts, new_status); + restore_coordination->setStage(restore_settings.host_id, new_stage, message); + if (new_stage == kFindingTablesInBackupStage) + restore_coordination->waitForStage(all_hosts, new_stage, on_cluster_first_sync_timeout); + else + restore_coordination->waitForStage(all_hosts, new_stage); } } @@ -814,14 +819,14 @@ std::vector RestorerFromBackup::findTablesWithoutDependencie void RestorerFromBackup::addDataRestoreTask(DataRestoreTask && new_task) { - if (current_status == kInsertingDataToTablesStatus) + if (current_stage == kInsertingDataToTablesStage) throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed"); data_restore_tasks.push_back(std::move(new_task)); } void RestorerFromBackup::addDataRestoreTasks(DataRestoreTasks && new_tasks) { - if (current_status == kInsertingDataToTablesStatus) + if (current_stage == kInsertingDataToTablesStage) throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed"); insertAtEnd(data_restore_tasks, std::move(new_tasks)); } diff --git a/src/Backups/RestorerFromBackup.h b/src/Backups/RestorerFromBackup.h index a53477f6e6d..b081e16e2ce 100644 --- a/src/Backups/RestorerFromBackup.h +++ b/src/Backups/RestorerFromBackup.h @@ -73,6 +73,7 @@ private: std::shared_ptr restore_coordination; BackupPtr backup; ContextMutablePtr context; + std::chrono::milliseconds on_cluster_first_sync_timeout; std::chrono::milliseconds create_table_timeout; Poco::Logger * log; @@ -100,7 +101,7 @@ private: DataRestoreTasks getDataRestoreTasks(); - void setStatus(const String & new_status, const String & message = ""); + void setStage(const String & new_stage, const String & message = ""); struct DatabaseInfo { @@ -124,7 +125,7 @@ private: std::vector findTablesWithoutDependencies() const; - String current_status; + String current_stage; std::unordered_map database_infos; std::map table_infos; std::vector data_restore_tasks; diff --git a/src/Interpreters/InterpreterBackupQuery.cpp b/src/Interpreters/InterpreterBackupQuery.cpp index 246d4ba24e9..af3c8df8eef 100644 --- a/src/Interpreters/InterpreterBackupQuery.cpp +++ b/src/Interpreters/InterpreterBackupQuery.cpp @@ -17,20 +17,22 @@ namespace DB namespace { - Block getResultRow(const BackupsWorker::Info & info) + Block getResultRow(const std::optional & info) { - Block res_columns; - auto column_uuid = ColumnUUID::create(); - column_uuid->insert(info.uuid); - res_columns.insert(0, {std::move(column_uuid), std::make_shared(), "uuid"}); - auto column_backup_name = ColumnString::create(); - column_backup_name->insert(info.backup_name); - res_columns.insert(1, {std::move(column_backup_name), std::make_shared(), "backup_name"}); - auto column_status = ColumnInt8::create(); - column_status->insert(static_cast(info.status)); + + if (info) + { + column_uuid->insert(info->uuid); + column_backup_name->insert(info->backup_name); + column_status->insert(static_cast(info->status)); + } + + Block res_columns; + res_columns.insert(0, {std::move(column_uuid), std::make_shared(), "uuid"}); + res_columns.insert(1, {std::move(column_backup_name), std::make_shared(), "backup_name"}); res_columns.insert(2, {std::move(column_status), std::make_shared(getBackupStatusEnumValues()), "status"}); return res_columns; @@ -42,7 +44,7 @@ BlockIO InterpreterBackupQuery::execute() auto & backups_worker = context->getBackupsWorker(); UUID uuid = backups_worker.start(query_ptr, context); BlockIO res_io; - res_io.pipeline = QueryPipeline(std::make_shared(getResultRow(backups_worker.getInfo(uuid)))); + res_io.pipeline = QueryPipeline(std::make_shared(getResultRow(backups_worker.tryGetInfo(uuid)))); return res_io; }