Merge pull request #38861 from vitlibar/backup-improvements-9

Backup Improvements 9
This commit is contained in:
Vitaly Baranov 2022-07-07 02:24:47 +02:00 committed by GitHub
commit ed27987646
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1791 additions and 1018 deletions

View File

@ -321,7 +321,7 @@ AccessRestorerFromBackup::AccessRestorerFromBackup(
AccessRestorerFromBackup::~AccessRestorerFromBackup() = default;
void AccessRestorerFromBackup::addDataPath(const String & data_path, const QualifiedTableName & table_name_for_logs)
void AccessRestorerFromBackup::addDataPath(const String & data_path)
{
if (!data_paths.emplace(data_path).second)
return;
@ -334,8 +334,8 @@ void AccessRestorerFromBackup::addDataPath(const String & data_path, const Quali
for (const String & filename : filenames)
{
if (!filename.starts_with("access") || !filename.ends_with(".txt"))
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File name {} doesn't match the wildcard \"access*.txt\"",
table_name_for_logs.getFullName(), String{data_path_in_backup_fs / filename});
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File name {} doesn't match the wildcard \"access*.txt\"",
String{data_path_in_backup_fs / filename});
}
::sort(filenames.begin(), filenames.end());

View File

@ -17,7 +17,6 @@ using BackupPtr = std::shared_ptr<const IBackup>;
class IBackupEntry;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
struct RestoreSettings;
struct QualifiedTableName;
/// Makes a backup of access entities of a specified type.
@ -36,7 +35,7 @@ public:
~AccessRestorerFromBackup();
/// Adds a data path to loads access entities from.
void addDataPath(const String & data_path, const QualifiedTableName & table_name_for_logs);
void addDataPath(const String & data_path);
/// Checks that the current user can do restoring.
AccessRightsElements getRequiredAccess() const;

View File

@ -531,6 +531,9 @@ void IAccessStorage::backup(BackupEntriesCollector & backup_entries_collector, c
auto entities = readAllWithIDs(type);
boost::range::remove_erase_if(entities, [](const std::pair<UUID, AccessEntityPtr> & x) { return !x.second->isBackupAllowed(); });
if (entities.empty())
return;
auto backup_entry = makeBackupEntryForAccess(
entities,
data_path_in_backup,

View File

@ -627,6 +627,9 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col
auto entities = readAllWithIDs(type);
boost::range::remove_erase_if(entities, [](const std::pair<UUID, AccessEntityPtr> & x) { return !x.second->isBackupAllowed(); });
if (entities.empty())
return;
auto backup_entry_with_path = makeBackupEntryForAccess(
entities,
data_path_in_backup,
@ -634,21 +637,18 @@ void ReplicatedAccessStorage::backup(BackupEntriesCollector & backup_entries_col
backup_entries_collector.getContext()->getAccessControl());
auto backup_coordination = backup_entries_collector.getBackupCoordination();
backup_coordination->addReplicatedAccessPath(zookeeper_path, backup_entry_with_path.first);
String current_host_id = backup_entries_collector.getBackupSettings().host_id;
backup_coordination->setReplicatedAccessHost(zookeeper_path, current_host_id);
backup_coordination->addReplicatedAccessFilePath(zookeeper_path, type, current_host_id, backup_entry_with_path.first);
backup_entries_collector.addPostTask(
[backup_entry = backup_entry_with_path.second,
zookeeper_path = zookeeper_path,
type,
current_host_id,
&backup_entries_collector,
backup_coordination]
{
if (current_host_id != backup_coordination->getReplicatedAccessHost(zookeeper_path))
return;
for (const String & path : backup_coordination->getReplicatedAccessPaths(zookeeper_path))
for (const String & path : backup_coordination->getReplicatedAccessFilePaths(zookeeper_path, type, current_host_id))
backup_entries_collector.addBackupEntry(path, backup_entry);
});
}

View File

@ -1,392 +0,0 @@
#include <Backups/BackupCoordinationHelpers.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Common/Exception.h>
#include <Common/escapeForFileName.h>
#include <IO/ReadHelpers.h>
#include <base/chrono_io.h>
#include <boost/range/adaptor/map.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_BACKUP_TABLE;
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
extern const int LOGICAL_ERROR;
}
namespace
{
struct LessReplicaName
{
bool operator()(const std::shared_ptr<const String> & left, const std::shared_ptr<const String> & right) { return *left < *right; }
};
}
class BackupCoordinationReplicatedPartNames::CoveredPartsFinder
{
public:
explicit CoveredPartsFinder(const String & table_name_for_logs_) : table_name_for_logs(table_name_for_logs_) {}
void addPartName(const String & new_part_name, const std::shared_ptr<const String> & replica_name)
{
addPartName(MergeTreePartInfo::fromPartName(new_part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING), replica_name);
}
void addPartName(MergeTreePartInfo && new_part_info, const std::shared_ptr<const String> & replica_name)
{
auto new_min_block = new_part_info.min_block;
auto new_max_block = new_part_info.max_block;
auto & parts = partitions[new_part_info.partition_id];
/// Find the first part with max_block >= `part_info.min_block`.
auto first_it = parts.lower_bound(new_min_block);
if (first_it == parts.end())
{
/// All max_blocks < part_info.min_block, so we can safely add the `part_info` to the list of parts.
parts.emplace(new_max_block, PartInfo{std::move(new_part_info), replica_name});
return;
}
{
/// part_info.min_block <= current_info.max_block
const auto & part = first_it->second;
if (new_max_block < part.info.min_block)
{
/// (prev_info.max_block < part_info.min_block) AND (part_info.max_block < current_info.min_block),
/// so we can safely add the `part_info` to the list of parts.
parts.emplace(new_max_block, PartInfo{std::move(new_part_info), replica_name});
return;
}
/// (part_info.min_block <= current_info.max_block) AND (part_info.max_block >= current_info.min_block), parts intersect.
if (part.info.contains(new_part_info))
{
/// `part_info` is already contained in another part.
return;
}
}
/// Probably `part_info` is going to replace multiple parts, find the range of parts to replace.
auto last_it = first_it;
while (last_it != parts.end())
{
const auto & part = last_it->second;
if (part.info.min_block > new_max_block)
break;
if (!new_part_info.contains(part.info))
{
throw Exception(
ErrorCodes::CANNOT_BACKUP_TABLE,
"Intersected parts detected in the table {}: {} on replica {} and {} on replica {}. It should be investigated",
table_name_for_logs,
part.info.getPartName(),
*part.replica_name,
new_part_info.getPartName(),
*replica_name);
}
++last_it;
}
/// `part_info` will replace multiple parts [first_it..last_it)
parts.erase(first_it, last_it);
parts.emplace(new_max_block, PartInfo{std::move(new_part_info), replica_name});
}
bool isCoveredByAnotherPart(const String & part_name) const
{
return isCoveredByAnotherPart(MergeTreePartInfo::fromPartName(part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING));
}
bool isCoveredByAnotherPart(const MergeTreePartInfo & part_info) const
{
auto partition_it = partitions.find(part_info.partition_id);
if (partition_it == partitions.end())
return false;
const auto & parts = partition_it->second;
/// Find the first part with max_block >= `part_info.min_block`.
auto it_part = parts.lower_bound(part_info.min_block);
if (it_part == parts.end())
{
/// All max_blocks < part_info.min_block, so there is no parts covering `part_info`.
return false;
}
/// part_info.min_block <= current_info.max_block
const auto & existing_part = it_part->second;
if (part_info.max_block < existing_part.info.min_block)
{
/// (prev_info.max_block < part_info.min_block) AND (part_info.max_block < current_info.min_block),
/// so there is no parts covering `part_info`.
return false;
}
/// (part_info.min_block <= current_info.max_block) AND (part_info.max_block >= current_info.min_block), parts intersect.
if (existing_part.info == part_info)
{
/// It's the same part, it's kind of covers itself, but we check in this function whether a part is covered by another part.
return false;
}
/// Check if `part_info` is covered by `current_info`.
return existing_part.info.contains(part_info);
}
private:
struct PartInfo
{
MergeTreePartInfo info;
std::shared_ptr<const String> replica_name;
};
using Parts = std::map<Int64 /* max_block */, PartInfo>;
std::unordered_map<String, Parts> partitions;
const String table_name_for_logs;
};
BackupCoordinationReplicatedPartNames::BackupCoordinationReplicatedPartNames() = default;
BackupCoordinationReplicatedPartNames::~BackupCoordinationReplicatedPartNames() = default;
void BackupCoordinationReplicatedPartNames::addPartNames(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums)
{
if (part_names_prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addPartNames() must not be called after getPartNames()");
auto & table_info = table_infos[table_shared_id];
if (!table_info.covered_parts_finder)
table_info.covered_parts_finder = std::make_unique<CoveredPartsFinder>(table_name_for_logs);
auto replica_name_ptr = std::make_shared<String>(replica_name);
for (const auto & part_name_and_checksum : part_names_and_checksums)
{
const auto & part_name = part_name_and_checksum.part_name;
const auto & checksum = part_name_and_checksum.checksum;
auto it = table_info.parts_replicas.find(part_name);
if (it == table_info.parts_replicas.end())
{
it = table_info.parts_replicas.emplace(part_name, PartReplicas{}).first;
it->second.checksum = checksum;
}
else
{
const auto & other = it->second;
if (other.checksum != checksum)
{
const String & other_replica_name = **other.replica_names.begin();
throw Exception(
ErrorCodes::CANNOT_BACKUP_TABLE,
"Table {} on replica {} has part {} which is different from the part on replica {}. Must be the same",
table_name_for_logs,
replica_name,
part_name,
other_replica_name);
}
}
auto & replica_names = it->second.replica_names;
/// `replica_names` should be ordered because we need this vector to be in the same order on every replica.
replica_names.insert(
std::upper_bound(replica_names.begin(), replica_names.end(), replica_name_ptr, LessReplicaName{}), replica_name_ptr);
table_info.covered_parts_finder->addPartName(part_name, replica_name_ptr);
}
}
Strings BackupCoordinationReplicatedPartNames::getPartNames(const String & table_shared_id, const String & replica_name) const
{
preparePartNames();
auto it = table_infos.find(table_shared_id);
if (it == table_infos.end())
return {};
const auto & replicas_parts = it->second.replicas_parts;
auto it2 = replicas_parts.find(replica_name);
if (it2 == replicas_parts.end())
return {};
return it2->second;
}
void BackupCoordinationReplicatedPartNames::preparePartNames() const
{
if (part_names_prepared)
return;
size_t counter = 0;
for (const auto & table_info : table_infos | boost::adaptors::map_values)
{
for (const auto & [part_name, part_replicas] : table_info.parts_replicas)
{
if (table_info.covered_parts_finder->isCoveredByAnotherPart(part_name))
continue;
size_t chosen_index = (counter++) % part_replicas.replica_names.size();
const auto & chosen_replica_name = *part_replicas.replica_names[chosen_index];
table_info.replicas_parts[chosen_replica_name].push_back(part_name);
}
}
part_names_prepared = true;
}
/// Helps to wait until all hosts come to a specified stage.
BackupCoordinationStatusSync::BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, log(log_)
{
createRootNodes();
}
void BackupCoordinationStatusSync::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
}
void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status, const String & message)
{
setImpl(current_host, new_status, message, {}, {});
}
Strings BackupCoordinationStatusSync::setAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
return setImpl(current_host, new_status, message, all_hosts, {});
}
Strings BackupCoordinationStatusSync::setAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
return setImpl(current_host, new_status, message, all_hosts, timeout_ms);
}
Strings BackupCoordinationStatusSync::setImpl(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, const std::optional<UInt64> & timeout_ms)
{
/// Put new status to ZooKeeper.
auto zookeeper = get_zookeeper();
zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + new_status, message);
if (all_hosts.empty() || (new_status == kErrorStatus))
return {};
if ((all_hosts.size() == 1) && (all_hosts.front() == current_host))
return {message};
/// Wait for other hosts.
Strings ready_hosts_results;
ready_hosts_results.resize(all_hosts.size());
std::map<String, std::vector<size_t> /* index in `ready_hosts_results` */> unready_hosts;
for (size_t i = 0; i != all_hosts.size(); ++i)
unready_hosts[all_hosts[i]].push_back(i);
std::optional<String> host_with_error;
std::optional<String> error_message;
/// Process ZooKeeper's nodes and set `all_hosts_ready` or `unready_host` or `error_message`.
auto process_zk_nodes = [&](const Strings & zk_nodes)
{
for (const String & zk_node : zk_nodes)
{
if (zk_node.starts_with("remove_watch-"))
continue;
size_t separator_pos = zk_node.find('|');
if (separator_pos == String::npos)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Unexpected zk node {}", zookeeper_path + "/" + zk_node);
String host = zk_node.substr(0, separator_pos);
String status = zk_node.substr(separator_pos + 1);
if (status == kErrorStatus)
{
host_with_error = host;
error_message = zookeeper->get(zookeeper_path + "/" + zk_node);
return;
}
auto it = unready_hosts.find(host);
if ((it != unready_hosts.end()) && (status == new_status))
{
String result = zookeeper->get(zookeeper_path + "/" + zk_node);
for (size_t i : it->second)
ready_hosts_results[i] = result;
unready_hosts.erase(it);
}
}
};
/// Wait until all hosts are ready or an error happens or time is out.
std::atomic<bool> watch_set = false;
std::condition_variable watch_triggered_event;
auto watch_callback = [&](const Coordination::WatchResponse &)
{
watch_set = false; /// After it's triggered it's not set until we call getChildrenWatch() again.
watch_triggered_event.notify_all();
};
auto watch_triggered = [&] { return !watch_set; };
bool use_timeout = timeout_ms.has_value();
std::chrono::milliseconds timeout{timeout_ms.value_or(0)};
std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
std::chrono::steady_clock::duration elapsed;
std::mutex dummy_mutex;
while (!unready_hosts.empty() && !error_message)
{
watch_set = true;
Strings nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback);
process_zk_nodes(nodes);
if (!unready_hosts.empty() && !error_message)
{
LOG_TRACE(log, "Waiting for host {}", unready_hosts.begin()->first);
std::unique_lock dummy_lock{dummy_mutex};
if (use_timeout)
{
elapsed = std::chrono::steady_clock::now() - start_time;
if ((elapsed > timeout) || !watch_triggered_event.wait_for(dummy_lock, timeout - elapsed, watch_triggered))
break;
}
else
watch_triggered_event.wait(dummy_lock, watch_triggered);
}
}
if (watch_set)
{
/// Remove watch by triggering it.
zookeeper->create(zookeeper_path + "/remove_watch-", "", zkutil::CreateMode::EphemeralSequential);
std::unique_lock dummy_lock{dummy_mutex};
watch_triggered_event.wait(dummy_lock, watch_triggered);
}
if (error_message)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Error occurred on host {}: {}", *host_with_error, *error_message);
if (!unready_hosts.empty())
{
throw Exception(
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited for host {} too long ({})",
unready_hosts.begin()->first,
to_string(elapsed));
}
return ready_hosts_results;
}
}

View File

@ -1,81 +0,0 @@
#pragma once
#include <Backups/IBackupCoordination.h>
#include <Backups/IRestoreCoordination.h>
#include <Common/ZooKeeper/Common.h>
#include <map>
#include <unordered_map>
namespace DB
{
/// Helper designed to be used in an implementation of the IBackupCoordination interface in the part related to replicated tables.
class BackupCoordinationReplicatedPartNames
{
public:
BackupCoordinationReplicatedPartNames();
~BackupCoordinationReplicatedPartNames();
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
/// Adds part names which a specified replica of a replicated table is going to put to the backup.
/// Multiple replicas of the replicated table call this function and then the added part names can be returned by call of the function
/// getPartNames().
/// Checksums are used only to control that parts under the same names on different replicas are the same.
void addPartNames(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums);
/// Returns the names of the parts which a specified replica of a replicated table should put to the backup.
/// This is the same list as it was added by call of the function addPartNames() but without duplications and without
/// parts covered by another parts.
Strings getPartNames(const String & table_shared_id, const String & replica_name) const;
private:
void preparePartNames() const;
class CoveredPartsFinder;
struct PartReplicas
{
std::vector<std::shared_ptr<const String>> replica_names;
UInt128 checksum;
};
struct TableInfo
{
std::map<String /* part_name */, PartReplicas> parts_replicas; /// Should be ordered because we need this map to be in the same order on every replica.
mutable std::unordered_map<String /* replica_name> */, Strings> replicas_parts;
std::unique_ptr<CoveredPartsFinder> covered_parts_finder;
};
std::map<String /* table_shared_id */, TableInfo> table_infos; /// Should be ordered because we need this map to be in the same order on every replica.
mutable bool part_names_prepared = false;
};
/// Helps to wait until all hosts come to a specified stage.
class BackupCoordinationStatusSync
{
public:
BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
void set(const String & current_host, const String & new_status, const String & message);
Strings setAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts);
Strings setAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms);
static constexpr const char * kErrorStatus = "error";
private:
void createRootNodes();
Strings setImpl(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, const std::optional<UInt64> & timeout_ms);
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;
Poco::Logger * log;
};
}

View File

@ -30,60 +30,52 @@ Strings BackupCoordinationLocal::setStatusAndWaitFor(const String &, const Strin
void BackupCoordinationLocal::addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector<PartNameAndChecksum> & part_names_and_checksums)
{
std::lock_guard lock{mutex};
replicated_part_names.addPartNames(table_shared_id, table_name_for_logs, replica_name, part_names_and_checksums);
replicated_tables.addPartNames(table_shared_id, table_name_for_logs, replica_name, part_names_and_checksums);
}
Strings BackupCoordinationLocal::getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const
{
std::lock_guard lock{mutex};
return replicated_part_names.getPartNames(table_shared_id, replica_name);
return replicated_tables.getPartNames(table_shared_id, replica_name);
}
void BackupCoordinationLocal::addReplicatedMutations(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector<MutationInfo> & mutations)
{
std::lock_guard lock{mutex};
replicated_tables.addMutations(table_shared_id, table_name_for_logs, replica_name, mutations);
}
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationLocal::getReplicatedMutations(const String & table_shared_id, const String & replica_name) const
{
std::lock_guard lock{mutex};
return replicated_tables.getMutations(table_shared_id, replica_name);
}
void BackupCoordinationLocal::addReplicatedDataPath(const String & table_shared_id, const String & data_path)
{
std::lock_guard lock{mutex};
replicated_data_paths[table_shared_id].push_back(data_path);
replicated_tables.addDataPath(table_shared_id, data_path);
}
Strings BackupCoordinationLocal::getReplicatedDataPaths(const String & table_shared_id) const
{
std::lock_guard lock{mutex};
auto it = replicated_data_paths.find(table_shared_id);
if (it == replicated_data_paths.end())
return {};
return it->second;
return replicated_tables.getDataPaths(table_shared_id);
}
void BackupCoordinationLocal::addReplicatedAccessPath(const String & access_zk_path, const String & file_path)
void BackupCoordinationLocal::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
{
std::lock_guard lock{mutex};
replicated_access_paths[access_zk_path].push_back(file_path);
replicated_access.addFilePath(access_zk_path, access_entity_type, host_id, file_path);
}
Strings BackupCoordinationLocal::getReplicatedAccessPaths(const String & access_zk_path) const
Strings BackupCoordinationLocal::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
{
std::lock_guard lock{mutex};
auto it = replicated_access_paths.find(access_zk_path);
if (it == replicated_access_paths.end())
return {};
return it->second;
}
void BackupCoordinationLocal::setReplicatedAccessHost(const String & access_zk_path, const String & host_id)
{
std::lock_guard lock{mutex};
replicated_access_hosts[access_zk_path] = host_id;
}
String BackupCoordinationLocal::getReplicatedAccessHost(const String & access_zk_path) const
{
std::lock_guard lock{mutex};
auto it = replicated_access_hosts.find(access_zk_path);
if (it == replicated_access_hosts.end())
return {};
return it->second;
return replicated_access.getFilePaths(access_zk_path, access_entity_type, host_id);
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupCoordinationReplicatedAccess.h>
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <base/defines.h>
#include <map>
#include <mutex>
@ -12,7 +13,7 @@ namespace Poco { class Logger; }
namespace DB
{
/// Stores backup contents information in memory.
/// Implementation of the IBackupCoordination interface performing coordination in memory.
class BackupCoordinationLocal : public IBackupCoordination
{
public:
@ -27,14 +28,15 @@ public:
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;
Strings getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const override;
void addReplicatedMutations(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
const std::vector<MutationInfo> & mutations) override;
std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const override;
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedAccessPath(const String & access_zk_path, const String & file_path) override;
Strings getReplicatedAccessPaths(const String & access_zk_path) const override;
void setReplicatedAccessHost(const String & access_zk_path, const String & host_id) override;
String getReplicatedAccessHost(const String & access_zk_path) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;
@ -52,15 +54,12 @@ public:
private:
mutable std::mutex mutex;
BackupCoordinationReplicatedPartNames replicated_part_names TSA_GUARDED_BY(mutex);
std::unordered_map<String, Strings> replicated_data_paths TSA_GUARDED_BY(mutex);
std::unordered_map<String, Strings> replicated_access_paths TSA_GUARDED_BY(mutex);
std::unordered_map<String, String> replicated_access_hosts TSA_GUARDED_BY(mutex);
BackupCoordinationReplicatedTables replicated_tables TSA_GUARDED_BY(mutex);
BackupCoordinationReplicatedAccess replicated_access TSA_GUARDED_BY(mutex);
std::map<String /* file_name */, SizeAndChecksum> file_names TSA_GUARDED_BY(mutex); /// Should be ordered alphabetically, see listFiles(). For empty files we assume checksum = 0.
std::map<SizeAndChecksum, FileInfo> file_infos TSA_GUARDED_BY(mutex); /// Information about files. Without empty files.
Strings archive_suffixes TSA_GUARDED_BY(mutex);
size_t current_archive_suffix TSA_GUARDED_BY(mutex) = 0;
};
}

View File

@ -1,4 +1,5 @@
#include <Backups/BackupCoordinationDistributed.h>
#include <Backups/BackupCoordinationRemote.h>
#include <Access/Common/AccessEntityType.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
@ -27,6 +28,7 @@ namespace
using SizeAndChecksum = IBackupCoordination::SizeAndChecksum;
using FileInfo = IBackupCoordination::FileInfo;
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
using MutationInfo = IBackupCoordination::MutationInfo;
struct ReplicatedPartNames
{
@ -63,6 +65,41 @@ namespace
}
};
struct ReplicatedMutations
{
std::vector<MutationInfo> mutations;
String table_name_for_logs;
static String serialize(const std::vector<MutationInfo> & mutations_, const String & table_name_for_logs_)
{
WriteBufferFromOwnString out;
writeBinary(mutations_.size(), out);
for (const auto & mutation : mutations_)
{
writeBinary(mutation.id, out);
writeBinary(mutation.entry, out);
}
writeBinary(table_name_for_logs_, out);
return out.str();
}
static ReplicatedMutations deserialize(const String & str)
{
ReadBufferFromString in{str};
ReplicatedMutations res;
size_t num;
readBinary(num, in);
res.mutations.resize(num);
for (size_t i = 0; i != num; ++i)
{
readBinary(res.mutations[i].id, in);
readBinary(res.mutations[i].entry, in);
}
readBinary(res.table_name_for_logs, in);
return res;
}
};
String serializeFileInfo(const FileInfo & info)
{
WriteBufferFromOwnString out;
@ -128,7 +165,7 @@ namespace
constexpr size_t NUM_ATTEMPTS = 10;
}
BackupCoordinationDistributed::BackupCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
BackupCoordinationRemote::BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("BackupCoordination"))
@ -136,46 +173,46 @@ BackupCoordinationDistributed::BackupCoordinationDistributed(const String & zook
createRootNodes();
}
BackupCoordinationDistributed::~BackupCoordinationDistributed() = default;
BackupCoordinationRemote::~BackupCoordinationRemote() = default;
void BackupCoordinationDistributed::createRootNodes()
void BackupCoordinationRemote::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_part_names", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_mutations", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_data_paths", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_access_host", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_access_paths", "");
zookeeper->createIfNotExists(zookeeper_path + "/repl_access", "");
zookeeper->createIfNotExists(zookeeper_path + "/file_names", "");
zookeeper->createIfNotExists(zookeeper_path + "/file_infos", "");
zookeeper->createIfNotExists(zookeeper_path + "/archive_suffixes", "");
}
void BackupCoordinationDistributed::removeAllNodes()
void BackupCoordinationRemote::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
}
void BackupCoordinationDistributed::setStatus(const String & current_host, const String & new_status, const String & message)
void BackupCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message)
{
status_sync.set(current_host, new_status, message);
}
Strings BackupCoordinationDistributed::setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
Strings BackupCoordinationRemote::setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
return status_sync.setAndWait(current_host, new_status, message, all_hosts);
}
Strings BackupCoordinationDistributed::setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
Strings BackupCoordinationRemote::setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
return status_sync.setAndWaitFor(current_host, new_status, message, all_hosts, timeout_ms);
}
void BackupCoordinationDistributed::addReplicatedPartNames(
void BackupCoordinationRemote::addReplicatedPartNames(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
@ -183,8 +220,8 @@ void BackupCoordinationDistributed::addReplicatedPartNames(
{
{
std::lock_guard lock{mutex};
if (replicated_part_names)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addPartNames() must not be called after getPartNames()");
if (replicated_tables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedPartNames() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
@ -194,17 +231,49 @@ void BackupCoordinationDistributed::addReplicatedPartNames(
zookeeper->create(path, ReplicatedPartNames::serialize(part_names_and_checksums, table_name_for_logs), zkutil::CreateMode::Persistent);
}
Strings BackupCoordinationDistributed::getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const
Strings BackupCoordinationRemote::getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const
{
std::lock_guard lock{mutex};
prepareReplicatedPartNames();
return replicated_part_names->getPartNames(table_shared_id, replica_name);
prepareReplicatedTables();
return replicated_tables->getPartNames(table_shared_id, replica_name);
}
void BackupCoordinationRemote::addReplicatedMutations(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<MutationInfo> & mutations)
{
{
std::lock_guard lock{mutex};
if (replicated_tables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedMutations() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_mutations/" + escapeForFileName(table_shared_id);
zookeeper->createIfNotExists(path, "");
path += "/" + escapeForFileName(replica_name);
zookeeper->create(path, ReplicatedMutations::serialize(mutations, table_name_for_logs), zkutil::CreateMode::Persistent);
}
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationRemote::getReplicatedMutations(const String & table_shared_id, const String & replica_name) const
{
std::lock_guard lock{mutex};
prepareReplicatedTables();
return replicated_tables->getMutations(table_shared_id, replica_name);
}
void BackupCoordinationDistributed::addReplicatedDataPath(
void BackupCoordinationRemote::addReplicatedDataPath(
const String & table_shared_id, const String & data_path)
{
{
std::lock_guard lock{mutex};
if (replicated_tables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedDataPath() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_data_paths/" + escapeForFileName(table_shared_id);
zookeeper->createIfNotExists(path, "");
@ -212,83 +281,120 @@ void BackupCoordinationDistributed::addReplicatedDataPath(
zookeeper->createIfNotExists(path, "");
}
Strings BackupCoordinationDistributed::getReplicatedDataPaths(const String & table_shared_id) const
Strings BackupCoordinationRemote::getReplicatedDataPaths(const String & table_shared_id) const
{
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_data_paths/" + escapeForFileName(table_shared_id);
Strings children = zookeeper->getChildren(path);
Strings data_paths;
data_paths.reserve(children.size());
for (const String & child : children)
data_paths.push_back(unescapeForFileName(child));
return data_paths;
std::lock_guard lock{mutex};
prepareReplicatedTables();
return replicated_tables->getDataPaths(table_shared_id);
}
void BackupCoordinationDistributed::prepareReplicatedPartNames() const
void BackupCoordinationRemote::prepareReplicatedTables() const
{
if (replicated_part_names)
if (replicated_tables)
return;
replicated_part_names.emplace();
replicated_tables.emplace();
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_part_names";
for (const String & escaped_table_zk_path : zookeeper->getChildren(path))
{
String table_zk_path = unescapeForFileName(escaped_table_zk_path);
String path2 = path + "/" + escaped_table_zk_path;
for (const String & escaped_replica_name : zookeeper->getChildren(path2))
String path = zookeeper_path + "/repl_part_names";
for (const String & escaped_table_shared_id : zookeeper->getChildren(path))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto part_names = ReplicatedPartNames::deserialize(zookeeper->get(path2 + "/" + escaped_replica_name));
replicated_part_names->addPartNames(table_zk_path, part_names.table_name_for_logs, replica_name, part_names.part_names_and_checksums);
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
for (const String & escaped_replica_name : zookeeper->getChildren(path2))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto part_names = ReplicatedPartNames::deserialize(zookeeper->get(path2 + "/" + escaped_replica_name));
replicated_tables->addPartNames(table_shared_id, part_names.table_name_for_logs, replica_name, part_names.part_names_and_checksums);
}
}
}
{
String path = zookeeper_path + "/repl_mutations";
for (const String & escaped_table_shared_id : zookeeper->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
for (const String & escaped_replica_name : zookeeper->getChildren(path2))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto mutations = ReplicatedMutations::deserialize(zookeeper->get(path2 + "/" + escaped_replica_name));
replicated_tables->addMutations(table_shared_id, mutations.table_name_for_logs, replica_name, mutations.mutations);
}
}
}
{
String path = zookeeper_path + "/repl_data_paths";
for (const String & escaped_table_shared_id : zookeeper->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
for (const String & escaped_data_path : zookeeper->getChildren(path2))
{
String data_path = unescapeForFileName(escaped_data_path);
replicated_tables->addDataPath(table_shared_id, data_path);
}
}
}
}
void BackupCoordinationDistributed::addReplicatedAccessPath(const String & access_zk_path, const String & file_path)
void BackupCoordinationRemote::addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
{
{
std::lock_guard lock{mutex};
if (replicated_access)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addReplicatedAccessFilePath() must not be called after preparing");
}
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_access_paths/" + escapeForFileName(access_zk_path);
String path = zookeeper_path + "/repl_access/" + escapeForFileName(access_zk_path);
zookeeper->createIfNotExists(path, "");
path += "/" + escapeForFileName(file_path);
path += "/" + AccessEntityTypeInfo::get(access_entity_type).name;
zookeeper->createIfNotExists(path, "");
path += "/" + host_id;
zookeeper->createIfNotExists(path, file_path);
}
Strings BackupCoordinationDistributed::getReplicatedAccessPaths(const String & access_zk_path) const
Strings BackupCoordinationRemote::getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
{
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_access_paths/" + escapeForFileName(access_zk_path);
Strings children = zookeeper->getChildren(path);
Strings file_paths;
file_paths.reserve(children.size());
for (const String & child : children)
file_paths.push_back(unescapeForFileName(child));
return file_paths;
std::lock_guard lock{mutex};
prepareReplicatedAccess();
return replicated_access->getFilePaths(access_zk_path, access_entity_type, host_id);
}
void BackupCoordinationDistributed::setReplicatedAccessHost(const String & access_zk_path, const String & host_id)
void BackupCoordinationRemote::prepareReplicatedAccess() const
{
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_access_host/" + escapeForFileName(access_zk_path);
auto code = zookeeper->tryCreate(path, host_id, zkutil::CreateMode::Persistent);
if ((code != Coordination::Error::ZOK) && (code != Coordination::Error::ZNODEEXISTS))
throw zkutil::KeeperException(code, path);
if (replicated_access)
return;
if (code == Coordination::Error::ZNODEEXISTS)
zookeeper->set(path, host_id);
}
String BackupCoordinationDistributed::getReplicatedAccessHost(const String & access_zk_path) const
{
replicated_access.emplace();
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/repl_access_host/" + escapeForFileName(access_zk_path);
return zookeeper->get(path);
String path = zookeeper_path + "/repl_access";
for (const String & escaped_access_zk_path : zookeeper->getChildren(path))
{
String access_zk_path = unescapeForFileName(escaped_access_zk_path);
String path2 = path + "/" + escaped_access_zk_path;
for (const String & type_str : zookeeper->getChildren(path2))
{
AccessEntityType type = AccessEntityTypeInfo::parseType(type_str);
String path3 = path2 + "/" + type_str;
for (const String & host_id : zookeeper->getChildren(path3))
{
String file_path = zookeeper->get(path3 + "/" + host_id);
replicated_access->addFilePath(access_zk_path, type, host_id, file_path);
}
}
}
}
void BackupCoordinationDistributed::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
void BackupCoordinationRemote::addFileInfo(const FileInfo & file_info, bool & is_data_file_required)
{
auto zookeeper = get_zookeeper();
@ -310,7 +416,7 @@ void BackupCoordinationDistributed::addFileInfo(const FileInfo & file_info, bool
is_data_file_required = (code == Coordination::Error::ZOK) && (file_info.size > file_info.base_size);
}
void BackupCoordinationDistributed::updateFileInfo(const FileInfo & file_info)
void BackupCoordinationRemote::updateFileInfo(const FileInfo & file_info)
{
if (!file_info.size)
return; /// we don't keep FileInfos for empty files, nothing to update
@ -332,7 +438,7 @@ void BackupCoordinationDistributed::updateFileInfo(const FileInfo & file_info)
}
}
std::vector<FileInfo> BackupCoordinationDistributed::getAllFileInfos() const
std::vector<FileInfo> BackupCoordinationRemote::getAllFileInfos() const
{
auto zookeeper = get_zookeeper();
std::vector<FileInfo> file_infos;
@ -350,7 +456,7 @@ std::vector<FileInfo> BackupCoordinationDistributed::getAllFileInfos() const
return file_infos;
}
Strings BackupCoordinationDistributed::listFiles(const String & directory, bool recursive) const
Strings BackupCoordinationRemote::listFiles(const String & directory, bool recursive) const
{
auto zookeeper = get_zookeeper();
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
@ -383,7 +489,7 @@ Strings BackupCoordinationDistributed::listFiles(const String & directory, bool
return elements;
}
bool BackupCoordinationDistributed::hasFiles(const String & directory) const
bool BackupCoordinationRemote::hasFiles(const String & directory) const
{
auto zookeeper = get_zookeeper();
Strings escaped_names = zookeeper->getChildren(zookeeper_path + "/file_names");
@ -402,7 +508,7 @@ bool BackupCoordinationDistributed::hasFiles(const String & directory) const
return false;
}
std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const String & file_name) const
std::optional<FileInfo> BackupCoordinationRemote::getFileInfo(const String & file_name) const
{
auto zookeeper = get_zookeeper();
String size_and_checksum;
@ -416,7 +522,7 @@ std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const String
return file_info;
}
std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const SizeAndChecksum & size_and_checksum) const
std::optional<FileInfo> BackupCoordinationRemote::getFileInfo(const SizeAndChecksum & size_and_checksum) const
{
auto zookeeper = get_zookeeper();
String file_info_str;
@ -425,7 +531,7 @@ std::optional<FileInfo> BackupCoordinationDistributed::getFileInfo(const SizeAnd
return deserializeFileInfo(file_info_str);
}
std::optional<SizeAndChecksum> BackupCoordinationDistributed::getFileSizeAndChecksum(const String & file_name) const
std::optional<SizeAndChecksum> BackupCoordinationRemote::getFileSizeAndChecksum(const String & file_name) const
{
auto zookeeper = get_zookeeper();
String size_and_checksum;
@ -434,7 +540,7 @@ std::optional<SizeAndChecksum> BackupCoordinationDistributed::getFileSizeAndChec
return deserializeSizeAndChecksum(size_and_checksum);
}
String BackupCoordinationDistributed::getNextArchiveSuffix()
String BackupCoordinationRemote::getNextArchiveSuffix()
{
auto zookeeper = get_zookeeper();
String path = zookeeper_path + "/archive_suffixes/a";
@ -445,7 +551,7 @@ String BackupCoordinationDistributed::getNextArchiveSuffix()
return formatArchiveSuffix(extractCounterFromSequentialNodeName(path_created));
}
Strings BackupCoordinationDistributed::getAllArchiveSuffixes() const
Strings BackupCoordinationRemote::getAllArchiveSuffixes() const
{
auto zookeeper = get_zookeeper();
Strings node_names = zookeeper->getChildren(zookeeper_path + "/archive_suffixes");
@ -454,7 +560,7 @@ Strings BackupCoordinationDistributed::getAllArchiveSuffixes() const
return node_names;
}
void BackupCoordinationDistributed::drop()
void BackupCoordinationRemote::drop()
{
removeAllNodes();
}

View File

@ -1,18 +1,20 @@
#pragma once
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupCoordinationReplicatedAccess.h>
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <Backups/BackupCoordinationStatusSync.h>
namespace DB
{
/// Stores backup temporary information in Zookeeper, used to perform BACKUP ON CLUSTER.
class BackupCoordinationDistributed : public IBackupCoordination
/// Implementation of the IBackupCoordination interface performing coordination via ZooKeeper. It's necessary for "BACKUP ON CLUSTER".
class BackupCoordinationRemote : public IBackupCoordination
{
public:
BackupCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
~BackupCoordinationDistributed() override;
BackupCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_);
~BackupCoordinationRemote() override;
void setStatus(const String & current_host, const String & new_status, const String & message) override;
Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts) override;
@ -26,14 +28,19 @@ public:
Strings getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const override;
void addReplicatedMutations(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<MutationInfo> & mutations) override;
std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const override;
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedAccessPath(const String & access_zk_path, const String & file_path) override;
Strings getReplicatedAccessPaths(const String & access_zk_path) const override;
void setReplicatedAccessHost(const String & access_zk_path, const String & host_id) override;
String getReplicatedAccessHost(const String & access_zk_path) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const override;
void addFileInfo(const FileInfo & file_info, bool & is_data_file_required) override;
void updateFileInfo(const FileInfo & file_info) override;
@ -53,7 +60,8 @@ public:
private:
void createRootNodes();
void removeAllNodes();
void prepareReplicatedPartNames() const;
void prepareReplicatedTables() const;
void prepareReplicatedAccess() const;
const String zookeeper_path;
const zkutil::GetZooKeeper get_zookeeper;
@ -61,7 +69,8 @@ private:
BackupCoordinationStatusSync status_sync;
mutable std::mutex mutex;
mutable std::optional<BackupCoordinationReplicatedPartNames> replicated_part_names;
mutable std::optional<BackupCoordinationReplicatedTables> replicated_tables;
mutable std::optional<BackupCoordinationReplicatedAccess> replicated_access;
};
}

View File

@ -0,0 +1,33 @@
#include <Backups/BackupCoordinationReplicatedAccess.h>
namespace DB
{
BackupCoordinationReplicatedAccess::BackupCoordinationReplicatedAccess() = default;
BackupCoordinationReplicatedAccess::~BackupCoordinationReplicatedAccess() = default;
void BackupCoordinationReplicatedAccess::addFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path)
{
auto & ref = file_paths_by_zk_path[std::make_pair(access_zk_path, access_entity_type)];
ref.file_paths.emplace(file_path);
/// std::max() because the calculation must give the same result being repeated on a different replica.
ref.host_to_store_access = std::max(ref.host_to_store_access, host_id);
}
Strings BackupCoordinationReplicatedAccess::getFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const
{
auto it = file_paths_by_zk_path.find(std::make_pair(access_zk_path, access_entity_type));
if (it == file_paths_by_zk_path.end())
return {};
const auto & file_paths = it->second;
if (file_paths.host_to_store_access != host_id)
return {};
Strings res{file_paths.file_paths.begin(), file_paths.file_paths.end()};
return res;
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <Core/Types.h>
#include <map>
#include <unordered_set>
namespace DB
{
enum class AccessEntityType;
/// This class is used by hosts to coordinate the access entities of ReplicatedAccessStorage they're writing to a backup.
/// It's designed to make all hosts save the same access entities to the backup even in case the ReplicatedAccessStorage changes
/// while the backup is being produced. This is important to make RESTORE more predicitible.
///
/// For example, let's consider three replicas having a ReplicatedAccessStorage on them.
/// This class ensures that the following files in the backup are the same:
/// /shards/1/replicas/1/data/system/users/access01.txt
/// /shards/1/replicas/2/data/system/users/access01.txt
/// /shards/1/replicas/3/data/system/users/access01.txt
///
/// To implement that this class chooses one host to write access entities for all the hosts so in fact all those files
/// in the example above are written by the same host.
class BackupCoordinationReplicatedAccess
{
public:
BackupCoordinationReplicatedAccess();
~BackupCoordinationReplicatedAccess();
/// Adds a path to access*.txt file keeping access entities of a ReplicatedAccessStorage.
void addFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path);
/// Returns all paths added by addFilePath() if `host_id` is a host chosen to store access.
Strings getFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const;
private:
using ZkPathAndEntityType = std::pair<String, AccessEntityType>;
struct FilePathsAndHost
{
std::unordered_set<String> file_paths;
String host_to_store_access;
};
std::map<ZkPathAndEntityType, FilePathsAndHost> file_paths_by_zk_path;
};
}

View File

@ -0,0 +1,335 @@
#include <Backups/BackupCoordinationReplicatedTables.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Common/Exception.h>
#include <boost/range/adaptor/map.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_BACKUP_TABLE;
extern const int LOGICAL_ERROR;
}
namespace
{
struct LessReplicaName
{
bool operator()(const std::shared_ptr<const String> & left, const std::shared_ptr<const String> & right) { return *left < *right; }
};
}
using MutationInfo = IBackupCoordination::MutationInfo;
class BackupCoordinationReplicatedTables::CoveredPartsFinder
{
public:
explicit CoveredPartsFinder(const String & table_name_for_logs_) : table_name_for_logs(table_name_for_logs_) {}
void addPartInfo(MergeTreePartInfo && new_part_info, const std::shared_ptr<const String> & replica_name)
{
auto new_min_block = new_part_info.min_block;
auto new_max_block = new_part_info.max_block;
auto & parts = partitions[new_part_info.partition_id];
/// Find the first part with max_block >= `part_info.min_block`.
auto first_it = parts.lower_bound(new_min_block);
if (first_it == parts.end())
{
/// All max_blocks < part_info.min_block, so we can safely add the `part_info` to the list of parts.
parts.emplace(new_max_block, PartInfo{std::move(new_part_info), replica_name});
return;
}
{
/// part_info.min_block <= current_info.max_block
const auto & part = first_it->second;
if (new_max_block < part.info.min_block)
{
/// (prev_info.max_block < part_info.min_block) AND (part_info.max_block < current_info.min_block),
/// so we can safely add the `part_info` to the list of parts.
parts.emplace(new_max_block, PartInfo{std::move(new_part_info), replica_name});
return;
}
/// (part_info.min_block <= current_info.max_block) AND (part_info.max_block >= current_info.min_block), parts intersect.
if (part.info.contains(new_part_info))
{
/// `part_info` is already contained in another part.
return;
}
}
/// Probably `part_info` is going to replace multiple parts, find the range of parts to replace.
auto last_it = first_it;
while (last_it != parts.end())
{
const auto & part = last_it->second;
if (part.info.min_block > new_max_block)
break;
if (!new_part_info.contains(part.info))
{
throw Exception(
ErrorCodes::CANNOT_BACKUP_TABLE,
"Intersected parts detected: {} on replica {} and {} on replica {}",
part.info.getPartName(),
*part.replica_name,
new_part_info.getPartName(),
*replica_name);
}
++last_it;
}
/// `part_info` will replace multiple parts [first_it..last_it)
parts.erase(first_it, last_it);
parts.emplace(new_max_block, PartInfo{std::move(new_part_info), replica_name});
}
bool isCoveredByAnotherPart(const String & part_name) const
{
return isCoveredByAnotherPart(MergeTreePartInfo::fromPartName(part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING));
}
bool isCoveredByAnotherPart(const MergeTreePartInfo & part_info) const
{
auto partition_it = partitions.find(part_info.partition_id);
if (partition_it == partitions.end())
return false;
const auto & parts = partition_it->second;
/// Find the first part with max_block >= `part_info.min_block`.
auto it_part = parts.lower_bound(part_info.min_block);
if (it_part == parts.end())
{
/// All max_blocks < part_info.min_block, so there is no parts covering `part_info`.
return false;
}
/// part_info.min_block <= current_info.max_block
const auto & existing_part = it_part->second;
if (part_info.max_block < existing_part.info.min_block)
{
/// (prev_info.max_block < part_info.min_block) AND (part_info.max_block < current_info.min_block),
/// so there is no parts covering `part_info`.
return false;
}
/// (part_info.min_block <= current_info.max_block) AND (part_info.max_block >= current_info.min_block), parts intersect.
if (existing_part.info == part_info)
{
/// It's the same part, it's kind of covers itself, but we check in this function whether a part is covered by another part.
return false;
}
/// Check if `part_info` is covered by `current_info`.
return existing_part.info.contains(part_info);
}
private:
struct PartInfo
{
MergeTreePartInfo info;
std::shared_ptr<const String> replica_name;
};
using Parts = std::map<Int64 /* max_block */, PartInfo>;
std::unordered_map<String, Parts> partitions;
const String table_name_for_logs;
};
BackupCoordinationReplicatedTables::BackupCoordinationReplicatedTables() = default;
BackupCoordinationReplicatedTables::~BackupCoordinationReplicatedTables() = default;
void BackupCoordinationReplicatedTables::addPartNames(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums)
{
if (prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addPartNames() must not be called after preparing");
auto & table_info = table_infos[table_shared_id];
table_info.table_name_for_logs = table_name_for_logs;
if (!table_info.covered_parts_finder)
table_info.covered_parts_finder = std::make_unique<CoveredPartsFinder>(table_name_for_logs);
auto replica_name_ptr = std::make_shared<String>(replica_name);
for (const auto & part_name_and_checksum : part_names_and_checksums)
{
const auto & part_name = part_name_and_checksum.part_name;
const auto & checksum = part_name_and_checksum.checksum;
auto it = table_info.replicas_by_part_name.find(part_name);
if (it == table_info.replicas_by_part_name.end())
{
it = table_info.replicas_by_part_name.emplace(part_name, PartReplicas{}).first;
it->second.checksum = checksum;
}
else
{
const auto & other = it->second;
if (other.checksum != checksum)
{
const String & other_replica_name = **other.replica_names.begin();
throw Exception(
ErrorCodes::CANNOT_BACKUP_TABLE,
"Table {} on replica {} has part {} which is different from the part on replica {}. Must be the same",
table_name_for_logs,
replica_name,
part_name,
other_replica_name);
}
}
auto & replica_names = it->second.replica_names;
/// `replica_names` should be ordered because we need this vector to be in the same order on every replica.
replica_names.insert(
std::upper_bound(replica_names.begin(), replica_names.end(), replica_name_ptr, LessReplicaName{}), replica_name_ptr);
}
}
Strings BackupCoordinationReplicatedTables::getPartNames(const String & table_shared_id, const String & replica_name) const
{
prepare();
auto it = table_infos.find(table_shared_id);
if (it == table_infos.end())
return {};
const auto & part_names_by_replica_name = it->second.part_names_by_replica_name;
auto it2 = part_names_by_replica_name.find(replica_name);
if (it2 == part_names_by_replica_name.end())
return {};
return it2->second;
}
void BackupCoordinationReplicatedTables::addMutations(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<MutationInfo> & mutations)
{
if (prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addMutations() must not be called after preparing");
auto & table_info = table_infos[table_shared_id];
table_info.table_name_for_logs = table_name_for_logs;
for (const auto & [mutation_id, mutation_entry] : mutations)
table_info.mutations.emplace(mutation_id, mutation_entry);
/// std::max() because the calculation must give the same result being repeated on a different replica.
table_info.replica_name_to_store_mutations = std::max(table_info.replica_name_to_store_mutations, replica_name);
}
std::vector<MutationInfo>
BackupCoordinationReplicatedTables::getMutations(const String & table_shared_id, const String & replica_name) const
{
prepare();
auto it = table_infos.find(table_shared_id);
if (it == table_infos.end())
return {};
const auto & table_info = it->second;
if (table_info.replica_name_to_store_mutations != replica_name)
return {};
std::vector<MutationInfo> res;
for (const auto & [mutation_id, mutation_entry] : table_info.mutations)
res.emplace_back(MutationInfo{mutation_id, mutation_entry});
return res;
}
void BackupCoordinationReplicatedTables::addDataPath(const String & table_shared_id, const String & data_path)
{
auto & table_info = table_infos[table_shared_id];
table_info.data_paths.emplace(data_path);
}
Strings BackupCoordinationReplicatedTables::getDataPaths(const String & table_shared_id) const
{
auto it = table_infos.find(table_shared_id);
if (it == table_infos.end())
return {};
const auto & table_info = it->second;
return Strings{table_info.data_paths.begin(), table_info.data_paths.end()};
}
void BackupCoordinationReplicatedTables::prepare() const
{
if (prepared)
return;
size_t counter = 0;
for (const auto & table_info : table_infos | boost::adaptors::map_values)
{
try
{
/// Remove parts covered by other parts.
for (const auto & [part_name, part_replicas] : table_info.replicas_by_part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
auto & min_data_versions_by_partition = table_info.min_data_versions_by_partition;
auto it2 = min_data_versions_by_partition.find(part_info.partition_id);
if (it2 == min_data_versions_by_partition.end())
min_data_versions_by_partition[part_info.partition_id] = part_info.getDataVersion();
else
it2->second = std::min(it2->second, part_info.getDataVersion());
table_info.covered_parts_finder->addPartInfo(std::move(part_info), part_replicas.replica_names[0]);
}
for (const auto & [part_name, part_replicas] : table_info.replicas_by_part_name)
{
if (table_info.covered_parts_finder->isCoveredByAnotherPart(part_name))
continue;
size_t chosen_index = (counter++) % part_replicas.replica_names.size();
const auto & chosen_replica_name = *part_replicas.replica_names[chosen_index];
table_info.part_names_by_replica_name[chosen_replica_name].push_back(part_name);
}
/// Remove finished or unrelated mutations.
std::unordered_map<String, String> unfinished_mutations;
for (const auto & [mutation_id, mutation_entry_str] : table_info.mutations)
{
auto mutation_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_entry_str, mutation_id);
std::map<String, Int64> new_block_numbers;
for (const auto & [partition_id, block_number] : mutation_entry.block_numbers)
{
auto it = table_info.min_data_versions_by_partition.find(partition_id);
if ((it != table_info.min_data_versions_by_partition.end()) && (it->second < block_number))
new_block_numbers[partition_id] = block_number;
}
mutation_entry.block_numbers = std::move(new_block_numbers);
if (!mutation_entry.block_numbers.empty())
unfinished_mutations[mutation_id] = mutation_entry.toString();
}
table_info.mutations = unfinished_mutations;
}
catch (Exception & e)
{
e.addMessage("While checking data of table {}", table_info.table_name_for_logs);
throw;
}
}
prepared = true;
}
}

View File

@ -0,0 +1,103 @@
#pragma once
#include <Backups/IBackupCoordination.h>
#include <map>
#include <memory>
#include <unordered_map>
#include <unordered_set>
namespace DB
{
/// Replicas used this class to coordinate how they're writing replicated tables to a backup.
/// "BACKUP ON CLUSTER" can be executed on multiple hosts and parts of replicated tables on those hosts could be slightly different
/// at any specific moment. This class is designed so that inside the backup all replicas would contain all the parts
/// no matter if the replication queues of those tables are fast or slow.
/// This is important to make RESTORE more correct and not dependent on random things like how fast the replicas doing RESTORE
/// comparing to each other or how many replicas will be when RESTORE will be executed.
///
/// Example 1: Let's consider two replicas of a table, and let the first replica contain part all_1_1_0 and the second replica contain
/// all_2_2_0. The files in the backup will look like this:
/// /shards/1/replicas/1/data/mydb/mytable/all_1_1_0
/// /shards/1/replicas/1/data/mydb/mytable/all_2_2_0
/// /shards/1/replicas/2/data/mydb/mytable/all_1_1_0
/// /shards/1/replicas/2/data/mydb/mytable/all_2_2_0
///
/// Example 2: Let's consider two replicas again, and let the first replica contain parts all_1_1_0 and all_2_2_0 and
/// the second replica contain part all_1_2_1 (i.e. the second replica have those parts merged).
/// In this case the files in the backup will look like this:
/// /shards/1/replicas/1/data/mydb/mytable/all_1_2_1
/// /shards/1/replicas/2/data/mydb/mytable/all_1_2_1
class BackupCoordinationReplicatedTables
{
public:
BackupCoordinationReplicatedTables();
~BackupCoordinationReplicatedTables();
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
/// Adds part names which a specified replica of a replicated table is going to put to the backup.
/// Multiple replicas of the replicated table call this function and then the added part names can be returned by call of the function
/// getPartNames().
/// Checksums are used only to control that parts under the same names on different replicas are the same.
void addPartNames(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums);
/// Returns the names of the parts which a specified replica of a replicated table should put to the backup.
/// This is the same list as it was added by call of the function addPartNames() but without duplications and without
/// parts covered by another parts.
Strings getPartNames(const String & table_shared_id, const String & replica_name) const;
using MutationInfo = IBackupCoordination::MutationInfo;
/// Adds information about mutations of a replicated table.
void addMutations(
const String & table_shared_id,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<MutationInfo> & mutations);
/// Returns all mutations of a replicated table which are not finished for some data parts added by addReplicatedPartNames().
std::vector<MutationInfo> getMutations(const String & table_shared_id, const String & replica_name) const;
/// Adds a data path in backup for a replicated table.
/// Multiple replicas of the replicated table call this function and then all the added paths can be returned by call of the function
/// getDataPaths().
void addDataPath(const String & table_shared_id, const String & data_path);
/// Returns all the data paths in backup added for a replicated table (see also addReplicatedDataPath()).
Strings getDataPaths(const String & table_shared_id) const;
private:
void prepare() const;
class CoveredPartsFinder;
struct PartReplicas
{
std::vector<std::shared_ptr<const String>> replica_names;
UInt128 checksum;
};
struct TableInfo
{
String table_name_for_logs;
std::map<String /* part_name */, PartReplicas> replicas_by_part_name; /// Should be ordered because we need this map to be in the same order on every replica.
mutable std::unordered_map<String /* replica_name> */, Strings> part_names_by_replica_name;
std::unique_ptr<CoveredPartsFinder> covered_parts_finder;
mutable std::unordered_map<String, Int64> min_data_versions_by_partition;
mutable std::unordered_map<String, String> mutations;
String replica_name_to_store_mutations;
std::unordered_set<String> data_paths;
};
std::map<String /* table_shared_id */, TableInfo> table_infos; /// Should be ordered because we need this map to be in the same order on every replica.
mutable bool prepared = false;
};
}

View File

@ -0,0 +1,161 @@
#include <Backups/BackupCoordinationStatusSync.h>
#include <Common/Exception.h>
#include <base/chrono_io.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FAILED_TO_SYNC_BACKUP_OR_RESTORE;
}
BackupCoordinationStatusSync::BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, log(log_)
{
createRootNodes();
}
void BackupCoordinationStatusSync::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
}
void BackupCoordinationStatusSync::set(const String & current_host, const String & new_status, const String & message)
{
setImpl(current_host, new_status, message, {}, {});
}
Strings BackupCoordinationStatusSync::setAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
return setImpl(current_host, new_status, message, all_hosts, {});
}
Strings BackupCoordinationStatusSync::setAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
return setImpl(current_host, new_status, message, all_hosts, timeout_ms);
}
Strings BackupCoordinationStatusSync::setImpl(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, const std::optional<UInt64> & timeout_ms)
{
/// Put new status to ZooKeeper.
auto zookeeper = get_zookeeper();
zookeeper->createIfNotExists(zookeeper_path + "/" + current_host + "|" + new_status, message);
if (all_hosts.empty() || (new_status == kErrorStatus))
return {};
if ((all_hosts.size() == 1) && (all_hosts.front() == current_host))
return {message};
/// Wait for other hosts.
Strings ready_hosts_results;
ready_hosts_results.resize(all_hosts.size());
std::map<String, std::vector<size_t> /* index in `ready_hosts_results` */> unready_hosts;
for (size_t i = 0; i != all_hosts.size(); ++i)
unready_hosts[all_hosts[i]].push_back(i);
std::optional<String> host_with_error;
std::optional<String> error_message;
/// Process ZooKeeper's nodes and set `all_hosts_ready` or `unready_host` or `error_message`.
auto process_zk_nodes = [&](const Strings & zk_nodes)
{
for (const String & zk_node : zk_nodes)
{
if (zk_node.starts_with("remove_watch-"))
continue;
size_t separator_pos = zk_node.find('|');
if (separator_pos == String::npos)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Unexpected zk node {}", zookeeper_path + "/" + zk_node);
String host = zk_node.substr(0, separator_pos);
String status = zk_node.substr(separator_pos + 1);
if (status == kErrorStatus)
{
host_with_error = host;
error_message = zookeeper->get(zookeeper_path + "/" + zk_node);
return;
}
auto it = unready_hosts.find(host);
if ((it != unready_hosts.end()) && (status == new_status))
{
String result = zookeeper->get(zookeeper_path + "/" + zk_node);
for (size_t i : it->second)
ready_hosts_results[i] = result;
unready_hosts.erase(it);
}
}
};
/// Wait until all hosts are ready or an error happens or time is out.
std::atomic<bool> watch_set = false;
std::condition_variable watch_triggered_event;
auto watch_callback = [&](const Coordination::WatchResponse &)
{
watch_set = false; /// After it's triggered it's not set until we call getChildrenWatch() again.
watch_triggered_event.notify_all();
};
auto watch_triggered = [&] { return !watch_set; };
bool use_timeout = timeout_ms.has_value();
std::chrono::milliseconds timeout{timeout_ms.value_or(0)};
std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
std::chrono::steady_clock::duration elapsed;
std::mutex dummy_mutex;
while (!unready_hosts.empty() && !error_message)
{
watch_set = true;
Strings nodes = zookeeper->getChildrenWatch(zookeeper_path, nullptr, watch_callback);
process_zk_nodes(nodes);
if (!unready_hosts.empty() && !error_message)
{
LOG_TRACE(log, "Waiting for host {}", unready_hosts.begin()->first);
std::unique_lock dummy_lock{dummy_mutex};
if (use_timeout)
{
elapsed = std::chrono::steady_clock::now() - start_time;
if ((elapsed > timeout) || !watch_triggered_event.wait_for(dummy_lock, timeout - elapsed, watch_triggered))
break;
}
else
watch_triggered_event.wait(dummy_lock, watch_triggered);
}
}
if (watch_set)
{
/// Remove watch by triggering it.
zookeeper->create(zookeeper_path + "/remove_watch-", "", zkutil::CreateMode::EphemeralSequential);
std::unique_lock dummy_lock{dummy_mutex};
watch_triggered_event.wait(dummy_lock, watch_triggered);
}
if (error_message)
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, "Error occurred on host {}: {}", *host_with_error, *error_message);
if (!unready_hosts.empty())
{
throw Exception(
ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited for host {} too long ({})",
unready_hosts.begin()->first,
to_string(elapsed));
}
return ready_hosts_results;
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Common/ZooKeeper/Common.h>
namespace DB
{
/// Used to coordinate hosts so all hosts would come to a specific status at around the same time.
class BackupCoordinationStatusSync
{
public:
BackupCoordinationStatusSync(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_, Poco::Logger * log_);
/// Sets the status of the current host and signal other hosts if there were other hosts waiting for that.
void set(const String & current_host, const String & new_status, const String & message);
/// Sets the status of the current host and waits until all hosts come to the same status.
/// The function returns the messages all hosts set when they come to the required status.
Strings setAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts);
/// Almost the same as setAndWait() but this one stops waiting and throws an exception after a specific amount of time.
Strings setAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms);
static constexpr const char * kErrorStatus = "error";
private:
void createRootNodes();
Strings setImpl(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, const std::optional<UInt64> & timeout_ms);
String zookeeper_path;
zkutil::GetZooKeeper get_zookeeper;
Poco::Logger * log;
};
}

View File

@ -1,7 +1,6 @@
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupUtils.h>
#include <Backups/DDLAdjustingForBackupVisitor.h>
#include <Databases/IDatabase.h>
@ -47,7 +46,7 @@ namespace
constexpr const char * kWritingBackupStatus = "writing backup";
/// Error status.
constexpr const char * kErrorStatus = BackupCoordinationStatusSync::kErrorStatus;
constexpr const char * kErrorStatus = IBackupCoordination::kErrorStatus;
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
@ -436,46 +435,7 @@ void BackupEntriesCollector::gatherTablesMetadata()
table_infos.clear();
for (const auto & [database_name, database_info] : database_infos)
{
const auto & database = database_info.database;
bool is_temporary_database = (database_name == DatabaseCatalog::TEMPORARY_DATABASE);
auto filter_by_table_name = [database_info = &database_info](const String & table_name)
{
/// We skip inner tables of materialized views.
if (table_name.starts_with(".inner_id."))
return false;
if (database_info->tables.contains(table_name))
return true;
if (database_info->all_tables)
return !database_info->except_table_names.contains(table_name);
return false;
};
auto db_tables = database->getTablesForBackup(filter_by_table_name, context);
std::unordered_set<String> found_table_names;
for (const auto & db_table : db_tables)
{
const auto & create_table_query = db_table.first;
const auto & create = create_table_query->as<const ASTCreateQuery &>();
found_table_names.emplace(create.getTable());
if (is_temporary_database && !create.temporary)
throw Exception(ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Got a non-temporary create query for {}", tableNameWithTypeToString(database_name, create.getTable(), false));
if (!is_temporary_database && (create.getDatabase() != database_name))
throw Exception(ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Got a create query with unexpected database name {} for {}", backQuoteIfNeed(create.getDatabase()), tableNameWithTypeToString(database_name, create.getTable(), false));
}
/// Check that all tables were found.
for (const auto & [table_name, table_info] : database_info.tables)
{
if (table_info.throw_if_table_not_found && !found_table_names.contains(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "{} not found", tableNameWithTypeToString(database_name, table_name, true));
}
std::vector<std::pair<ASTPtr, StoragePtr>> db_tables = findTablesInDatabase(database_name);
for (const auto & db_table : db_tables)
{
@ -501,7 +461,7 @@ void BackupEntriesCollector::gatherTablesMetadata()
/// Add information to `table_infos`.
auto & res_table_info = table_infos[QualifiedTableName{database_name, table_name}];
res_table_info.database = database;
res_table_info.database = database_info.database;
res_table_info.storage = storage;
res_table_info.create_table_query = create_table_query;
res_table_info.metadata_path_in_backup = metadata_path_in_backup;
@ -528,6 +488,67 @@ void BackupEntriesCollector::gatherTablesMetadata()
}
}
std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInDatabase(const String & database_name) const
{
const auto & database_info = database_infos.at(database_name);
const auto & database = database_info.database;
auto filter_by_table_name = [database_info = &database_info](const String & table_name)
{
/// We skip inner tables of materialized views.
if (table_name.starts_with(".inner_id."))
return false;
if (database_info->tables.contains(table_name))
return true;
if (database_info->all_tables)
return !database_info->except_table_names.contains(table_name);
return false;
};
std::vector<std::pair<ASTPtr, StoragePtr>> db_tables;
try
{
db_tables = database->getTablesForBackup(filter_by_table_name, context);
}
catch (Exception & e)
{
e.addMessage("While collecting tables for backup in database {}", backQuoteIfNeed(database_name));
throw;
}
std::unordered_set<String> found_table_names;
for (const auto & db_table : db_tables)
{
const auto & create_table_query = db_table.first;
const auto & create = create_table_query->as<const ASTCreateQuery &>();
found_table_names.emplace(create.getTable());
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
{
if (!create.temporary)
throw Exception(ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Got a non-temporary create query for {}", tableNameWithTypeToString(database_name, create.getTable(), false));
}
else
{
if (create.getDatabase() != database_name)
throw Exception(ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "Got a create query with unexpected database name {} for {}", backQuoteIfNeed(create.getDatabase()), tableNameWithTypeToString(database_name, create.getTable(), false));
}
}
/// Check that all tables were found.
for (const auto & [table_name, table_info] : database_info.tables)
{
if (table_info.throw_if_table_not_found && !found_table_names.contains(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "{} was not found", tableNameWithTypeToString(database_name, table_name, true));
}
return db_tables;
}
void BackupEntriesCollector::lockTablesForReading()
{
for (auto & [table_name, table_info] : table_infos)
@ -544,7 +565,7 @@ void BackupEntriesCollector::lockTablesForReading()
{
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
throw Exception(ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "{} is dropped", tableNameWithTypeToString(table_name.database, table_name.table, true));
throw Exception(ErrorCodes::INCONSISTENT_METADATA_FOR_BACKUP, "{} was dropped during scanning", tableNameWithTypeToString(table_name.database, table_name.table, true));
}
}
}
@ -648,7 +669,7 @@ void BackupEntriesCollector::makeBackupEntriesForDatabasesDefs()
if (!database_info.create_database_query)
continue; /// We store CREATE DATABASE queries only if there was BACKUP DATABASE specified.
LOG_TRACE(log, "Adding definition of database {}", backQuoteIfNeed(database_name));
LOG_TRACE(log, "Adding the definition of database {} to backup", backQuoteIfNeed(database_name));
ASTPtr new_create_query = database_info.create_database_query;
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext(), nullptr);
@ -664,7 +685,7 @@ void BackupEntriesCollector::makeBackupEntriesForTablesDefs()
{
for (auto & [table_name, table_info] : table_infos)
{
LOG_TRACE(log, "Adding definition of {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
LOG_TRACE(log, "Adding the definition of {} to backup", tableNameWithTypeToString(table_name.database, table_name.table, false));
ASTPtr new_create_query = table_info.create_table_query;
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext(), &table_info.replicated_table_shared_id);
@ -680,24 +701,40 @@ void BackupEntriesCollector::makeBackupEntriesForTablesData()
if (backup_settings.structure_only)
return;
for (const auto & [table_name, table_info] : table_infos)
for (const auto & table_name : table_infos | boost::adaptors::map_keys)
makeBackupEntriesForTableData(table_name);
}
void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableName & table_name)
{
if (backup_settings.structure_only)
return;
const auto & table_info = table_infos.at(table_name);
const auto & storage = table_info.storage;
const auto & data_path_in_backup = table_info.data_path_in_backup;
if (!storage)
{
const auto & storage = table_info.storage;
const auto & data_path_in_backup = table_info.data_path_in_backup;
if (storage)
{
LOG_TRACE(log, "Adding data of {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
storage->backupData(*this, data_path_in_backup, table_info.partitions);
}
else
{
/// Storage == null means this storage exists on other replicas but it has not been created on this replica yet.
/// If this table is replicated in this case we call IBackupCoordination::addReplicatedDataPath() which will cause
/// other replicas to fill the storage's data in the backup.
/// If this table is not replicated we'll do nothing leaving the storage's data empty in the backup.
if (table_info.replicated_table_shared_id)
backup_coordination->addReplicatedDataPath(*table_info.replicated_table_shared_id, data_path_in_backup);
}
/// If storage == null that means this storage exists on other replicas but it has not been created on this replica yet.
/// If this table is replicated in this case we call IBackupCoordination::addReplicatedDataPath() which will cause
/// other replicas to fill the storage's data in the backup.
/// If this table is not replicated we'll do nothing leaving the storage's data empty in the backup.
if (table_info.replicated_table_shared_id)
backup_coordination->addReplicatedDataPath(*table_info.replicated_table_shared_id, data_path_in_backup);
return;
}
LOG_TRACE(log, "Collecting data of {} for backup", tableNameWithTypeToString(table_name.database, table_name.table, false));
try
{
storage->backupData(*this, data_path_in_backup, table_info.partitions);
}
catch (Exception & e)
{
e.addMessage("While collecting data of {} for backup", tableNameWithTypeToString(table_name.database, table_name.table, false));
throw;
}
}
@ -716,21 +753,21 @@ void BackupEntriesCollector::addBackupEntry(const std::pair<String, BackupEntryP
void BackupEntriesCollector::addBackupEntries(const BackupEntries & backup_entries_)
{
if (current_status == kWritingBackupStatus)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding backup entries is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of backup entries is not allowed");
insertAtEnd(backup_entries, backup_entries_);
}
void BackupEntriesCollector::addBackupEntries(BackupEntries && backup_entries_)
{
if (current_status == kWritingBackupStatus)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding backup entries is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of backup entries is not allowed");
insertAtEnd(backup_entries, std::move(backup_entries_));
}
void BackupEntriesCollector::addPostTask(std::function<void()> task)
{
if (current_status == kWritingBackupStatus)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding post tasks is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of post tasks is not allowed");
post_tasks.push(std::move(task));
}

View File

@ -75,12 +75,15 @@ private:
const std::set<DatabaseAndTableName> & except_table_names);
void gatherTablesMetadata();
std::vector<std::pair<ASTPtr, StoragePtr>> findTablesInDatabase(const String & database_name) const;
void lockTablesForReading();
bool compareWithPrevious(std::optional<Exception> & inconsistency_error);
void makeBackupEntriesForDatabasesDefs();
void makeBackupEntriesForTablesDefs();
void makeBackupEntriesForTablesData();
void makeBackupEntriesForTableData(const QualifiedTableName & table_name);
void runPostTasks();
Strings setStatus(const String & new_status, const String & message = "");

View File

@ -4,7 +4,7 @@
#include <Backups/BackupIO.h>
#include <Backups/IBackupEntry.h>
#include <Backups/BackupCoordinationLocal.h>
#include <Backups/BackupCoordinationDistributed.h>
#include <Backups/BackupCoordinationRemote.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <Common/quoteString.h>

View File

@ -5,9 +5,9 @@
#include <Backups/BackupUtils.h>
#include <Backups/IBackupEntry.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupCoordinationDistributed.h>
#include <Backups/BackupCoordinationRemote.h>
#include <Backups/BackupCoordinationLocal.h>
#include <Backups/RestoreCoordinationDistributed.h>
#include <Backups/RestoreCoordinationRemote.h>
#include <Backups/RestoreCoordinationLocal.h>
#include <Backups/RestoreSettings.h>
#include <Backups/RestorerFromBackup.h>
@ -120,7 +120,7 @@ UUID BackupsWorker::startMakingBackup(const ASTPtr & query, const ContextPtr & c
if (!backup_settings.coordination_zk_path.empty())
{
backup_coordination = std::make_shared<BackupCoordinationDistributed>(
backup_coordination = std::make_shared<BackupCoordinationRemote>(
backup_settings.coordination_zk_path,
[global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); });
}
@ -291,7 +291,7 @@ UUID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr conte
if (!restore_settings.coordination_zk_path.empty())
{
restore_coordination = std::make_shared<RestoreCoordinationDistributed>(
restore_coordination = std::make_shared<RestoreCoordinationRemote>(
restore_settings.coordination_zk_path,
[global_context = context_in_use->getGlobalContext()] { return global_context->getZooKeeper(); });
}

View File

@ -6,8 +6,12 @@
namespace DB
{
enum class AccessEntityType;
/// Keeps information about files contained in a backup.
/// Replicas use this class to coordinate what they're writing to a backup while executing BACKUP ON CLUSTER.
/// There are two implementation of this interface: BackupCoordinationLocal and BackupCoordinationRemote.
/// BackupCoordinationLocal is used while executing BACKUP without ON CLUSTER and performs coordination in memory.
/// BackupCoordinationRemote is used while executing BACKUP with ON CLUSTER and performs coordination via ZooKeeper.
class IBackupCoordination
{
public:
@ -18,6 +22,8 @@ public:
virtual Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts) = 0;
virtual Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts, UInt64 timeout_ms) = 0;
static constexpr const char * kErrorStatus = "error";
struct PartNameAndChecksum
{
String part_name;
@ -36,6 +42,18 @@ public:
/// parts covered by another parts.
virtual Strings getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const = 0;
struct MutationInfo
{
String id;
String entry;
};
/// Adds information about mutations of a replicated table.
virtual void addReplicatedMutations(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector<MutationInfo> & mutations) = 0;
/// Returns all mutations of a replicated table which are not finished for some data parts added by addReplicatedPartNames().
virtual std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const = 0;
/// Adds a data path in backup for a replicated table.
/// Multiple replicas of the replicated table call this function and then all the added paths can be returned by call of the function
/// getReplicatedDataPaths().
@ -45,12 +63,8 @@ public:
virtual Strings getReplicatedDataPaths(const String & table_shared_id) const = 0;
/// Adds a path to access.txt file keeping access entities of a ReplicatedAccessStorage.
virtual void addReplicatedAccessPath(const String & access_zk_path, const String & file_path) = 0;
virtual Strings getReplicatedAccessPaths(const String & access_zk_path) const = 0;
/// Sets the host id of a host storing access entities of a ReplicatedAccessStorage to backup.
virtual void setReplicatedAccessHost(const String & access_zk_path, const String & host) = 0;
virtual String getReplicatedAccessHost(const String & access_zk_path) const = 0;
virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id, const String & file_path) = 0;
virtual Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type, const String & host_id) const = 0;
struct FileInfo
{

View File

@ -1,37 +0,0 @@
#include <Backups/IBackupEntriesBatch.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
class IBackupEntriesBatch::BackupEntryFromBatch : public IBackupEntry
{
public:
BackupEntryFromBatch(const std::shared_ptr<IBackupEntriesBatch> & generator_, size_t index_) : batch(generator_), index(index_)
{
assert(batch);
}
UInt64 getSize() const override { return batch->getSize(index); }
std::optional<UInt128> getChecksum() const override { return batch->getChecksum(index); }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return batch->getReadBuffer(index); }
private:
const std::shared_ptr<IBackupEntriesBatch> batch;
const size_t index;
};
BackupEntries IBackupEntriesBatch::getBackupEntries()
{
BackupEntries res;
res.reserve(entry_names.size());
for (size_t i = 0; i != entry_names.size(); ++i)
{
res.emplace_back(entry_names[i], std::make_unique<BackupEntryFromBatch>(shared_from_this(), i));
}
return res;
}
}

View File

@ -1,29 +0,0 @@
#pragma once
#include <Backups/IBackupEntry.h>
#include <mutex>
namespace DB
{
/// Helper class designed to generate multiple backup entries from one source.
class IBackupEntriesBatch : public std::enable_shared_from_this<IBackupEntriesBatch>
{
public:
BackupEntries getBackupEntries();
virtual ~IBackupEntriesBatch() = default;
protected:
IBackupEntriesBatch(const Strings & entry_names_) : entry_names(entry_names_) {}
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer(size_t index) = 0;
virtual UInt64 getSize(size_t index) = 0;
virtual std::optional<UInt128> getChecksum(size_t) { return {}; }
private:
class BackupEntryFromBatch;
const Strings entry_names;
};
}

View File

@ -0,0 +1,77 @@
#include <Backups/IBackupEntriesLazyBatch.h>
#include <Common/Exception.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class IBackupEntriesLazyBatch::BackupEntryFromBatch : public IBackupEntry
{
public:
BackupEntryFromBatch(const std::shared_ptr<IBackupEntriesLazyBatch> & batch_, size_t index_) : batch(batch_), index(index_) { }
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
std::optional<UInt128> getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return getInternalBackupEntry()->getReadBuffer(); }
private:
BackupEntryPtr getInternalBackupEntry() const
{
std::lock_guard lock{mutex};
if (!entry)
{
batch->generateIfNecessary();
entry = batch->entries[index].second;
}
return entry;
}
const std::shared_ptr<IBackupEntriesLazyBatch> batch;
const size_t index;
mutable std::mutex mutex;
mutable BackupEntryPtr entry;
};
BackupEntries IBackupEntriesLazyBatch::getBackupEntries()
{
BackupEntries res;
size_t size = getSize();
res.reserve(size);
for (size_t i = 0; i != size; ++i)
{
res.emplace_back(getName(i), std::make_unique<BackupEntryFromBatch>(shared_from_this(), i));
}
return res;
}
void IBackupEntriesLazyBatch::generateIfNecessary()
{
std::lock_guard lock{mutex};
if (generated)
return;
entries = generate();
if (entries.size() != getSize())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup entries were generated incorrectly");
for (size_t i = 0; i != entries.size(); ++i)
{
if (entries[i].first != getName(i))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup entries were generated incorrectly");
}
generated = true;
}
IBackupEntriesLazyBatch::~IBackupEntriesLazyBatch() = default;
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Backups/IBackupEntry.h>
#include <mutex>
namespace DB
{
/// Helper class designed to generate multiple backup entries from one source.
class IBackupEntriesLazyBatch : public std::enable_shared_from_this<IBackupEntriesLazyBatch>
{
public:
BackupEntries getBackupEntries();
virtual ~IBackupEntriesLazyBatch();
protected:
virtual size_t getSize() const = 0;
virtual const String & getName(size_t i) const = 0;
virtual BackupEntries generate() = 0;
private:
void generateIfNecessary();
class BackupEntryFromBatch;
std::mutex mutex;
BackupEntries entries;
bool generated = false;
};
}

View File

@ -7,7 +7,10 @@ namespace DB
{
using DatabaseAndTableName = std::pair<String, String>;
/// Keeps information about files contained in a backup.
/// Replicas use this class to coordinate what they're reading from a backup while executing RESTORE ON CLUSTER.
/// There are two implementation of this interface: RestoreCoordinationLocal and RestoreCoordinationRemote.
/// RestoreCoordinationLocal is used while executing RESTORE without ON CLUSTER and performs coordination in memory.
/// RestoreCoordinationRemote is used while executing RESTORE with ON CLUSTER and performs coordination via ZooKeeper.
class IRestoreCoordination
{
public:
@ -18,6 +21,8 @@ public:
virtual Strings setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts) = 0;
virtual Strings setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & other_hosts, UInt64 timeout_ms) = 0;
static constexpr const char * kErrorStatus = "error";
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
virtual bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) = 0;

View File

@ -11,6 +11,7 @@ namespace Poco { class Logger; }
namespace DB
{
/// Implementation of the IRestoreCoordination interface performing coordination in memory.
class RestoreCoordinationLocal : public IRestoreCoordination
{
public:

View File

@ -1,4 +1,4 @@
#include <Backups/RestoreCoordinationDistributed.h>
#include <Backups/RestoreCoordinationRemote.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/escapeForFileName.h>
@ -6,7 +6,7 @@
namespace DB
{
RestoreCoordinationDistributed::RestoreCoordinationDistributed(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
RestoreCoordinationRemote::RestoreCoordinationRemote(const String & zookeeper_path_, zkutil::GetZooKeeper get_zookeeper_)
: zookeeper_path(zookeeper_path_)
, get_zookeeper(get_zookeeper_)
, status_sync(zookeeper_path_ + "/status", get_zookeeper_, &Poco::Logger::get("RestoreCoordination"))
@ -14,9 +14,9 @@ RestoreCoordinationDistributed::RestoreCoordinationDistributed(const String & zo
createRootNodes();
}
RestoreCoordinationDistributed::~RestoreCoordinationDistributed() = default;
RestoreCoordinationRemote::~RestoreCoordinationRemote() = default;
void RestoreCoordinationDistributed::createRootNodes()
void RestoreCoordinationRemote::createRootNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->createAncestors(zookeeper_path);
@ -26,22 +26,22 @@ void RestoreCoordinationDistributed::createRootNodes()
zookeeper->createIfNotExists(zookeeper_path + "/repl_access_storages_acquired", "");
}
void RestoreCoordinationDistributed::setStatus(const String & current_host, const String & new_status, const String & message)
void RestoreCoordinationRemote::setStatus(const String & current_host, const String & new_status, const String & message)
{
status_sync.set(current_host, new_status, message);
}
Strings RestoreCoordinationDistributed::setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
Strings RestoreCoordinationRemote::setStatusAndWait(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts)
{
return status_sync.setAndWait(current_host, new_status, message, all_hosts);
}
Strings RestoreCoordinationDistributed::setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
Strings RestoreCoordinationRemote::setStatusAndWaitFor(const String & current_host, const String & new_status, const String & message, const Strings & all_hosts, UInt64 timeout_ms)
{
return status_sync.setAndWaitFor(current_host, new_status, message, all_hosts, timeout_ms);
}
bool RestoreCoordinationDistributed::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)
bool RestoreCoordinationRemote::acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name)
{
auto zookeeper = get_zookeeper();
@ -56,7 +56,7 @@ bool RestoreCoordinationDistributed::acquireCreatingTableInReplicatedDatabase(co
return (code == Coordination::Error::ZOK);
}
bool RestoreCoordinationDistributed::acquireInsertingDataIntoReplicatedTable(const String & table_zk_path)
bool RestoreCoordinationRemote::acquireInsertingDataIntoReplicatedTable(const String & table_zk_path)
{
auto zookeeper = get_zookeeper();
@ -68,7 +68,7 @@ bool RestoreCoordinationDistributed::acquireInsertingDataIntoReplicatedTable(con
return (code == Coordination::Error::ZOK);
}
bool RestoreCoordinationDistributed::acquireReplicatedAccessStorage(const String & access_storage_zk_path)
bool RestoreCoordinationRemote::acquireReplicatedAccessStorage(const String & access_storage_zk_path)
{
auto zookeeper = get_zookeeper();
@ -80,13 +80,13 @@ bool RestoreCoordinationDistributed::acquireReplicatedAccessStorage(const String
return (code == Coordination::Error::ZOK);
}
void RestoreCoordinationDistributed::removeAllNodes()
void RestoreCoordinationRemote::removeAllNodes()
{
auto zookeeper = get_zookeeper();
zookeeper->removeRecursive(zookeeper_path);
}
void RestoreCoordinationDistributed::drop()
void RestoreCoordinationRemote::drop()
{
removeAllNodes();
}

View File

@ -1,18 +1,18 @@
#pragma once
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupCoordinationStatusSync.h>
namespace DB
{
/// Stores restore temporary information in Zookeeper, used to perform RESTORE ON CLUSTER.
class RestoreCoordinationDistributed : public IRestoreCoordination
/// Implementation of the IRestoreCoordination interface performing coordination via ZooKeeper. It's necessary for "RESTORE ON CLUSTER".
class RestoreCoordinationRemote : public IRestoreCoordination
{
public:
RestoreCoordinationDistributed(const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper);
~RestoreCoordinationDistributed() override;
RestoreCoordinationRemote(const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper);
~RestoreCoordinationRemote() override;
/// Sets the current status and waits for other hosts to come to this status too. If status starts with "error:" it'll stop waiting on all the hosts.
void setStatus(const String & current_host, const String & new_status, const String & message) override;

View File

@ -1,6 +1,5 @@
#include <Backups/RestorerFromBackup.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <Backups/BackupSettings.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
@ -54,7 +53,7 @@ namespace
constexpr const char * kInsertingDataToTablesStatus = "inserting data to tables";
/// Error status.
constexpr const char * kErrorStatus = BackupCoordinationStatusSync::kErrorStatus;
constexpr const char * kErrorStatus = IRestoreCoordination::kErrorStatus;
/// Uppercases the first character of a passed string.
String toUpperFirst(const String & str)
@ -381,11 +380,23 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
insertAtEnd(*res_table_info.partitions, *partitions);
}
/// Special handling for ACL-related system tables.
if (!restore_settings.structure_only && isSystemAccessTableName(table_name))
{
if (!access_restorer)
access_restorer = std::make_unique<AccessRestorerFromBackup>(backup, restore_settings);
access_restorer->addDataPath(data_path_in_backup, table_name);
try
{
/// addDataPath() will parse access*.txt files and extract access entities from them.
/// We need to do that early because we need those access entities to check access.
access_restorer->addDataPath(data_path_in_backup);
}
catch (Exception & e)
{
e.addMessage("While parsing data of {} from backup", tableNameWithTypeToString(table_name.database, table_name.table, false));
throw;
}
}
}
@ -563,33 +574,57 @@ void RestorerFromBackup::checkAccessForObjectsFoundInBackup() const
required_access = AccessRights{required_access}.getElements();
context->checkAccess(required_access);
}
void RestorerFromBackup::createDatabases()
{
for (const auto & [database_name, database_info] : database_infos)
for (const auto & database_name : database_infos | boost::adaptors::map_keys)
{
bool need_create_database = (restore_settings.create_database != RestoreDatabaseCreationMode::kMustExist);
if (database_info.is_predefined_database)
need_create_database = false; /// Predefined databases always exist.
createDatabase(database_name);
checkDatabase(database_name);
}
}
if (need_create_database)
{
/// Execute CREATE DATABASE query.
auto create_database_query = database_info.create_database_query;
if (restore_settings.create_table == RestoreTableCreationMode::kCreateIfNotExists)
{
create_database_query = create_database_query->clone();
create_database_query->as<ASTCreateQuery &>().if_not_exists = true;
}
LOG_TRACE(log, "Creating database {}: {}", backQuoteIfNeed(database_name), serializeAST(*create_database_query));
InterpreterCreateQuery interpreter{create_database_query, context};
interpreter.setInternal(true);
interpreter.execute();
}
void RestorerFromBackup::createDatabase(const String & database_name) const
{
if (restore_settings.create_database == RestoreDatabaseCreationMode::kMustExist)
return;
/// Predefined databases always exist.
const auto & database_info = database_infos.at(database_name);
if (database_info.is_predefined_database)
return;
auto create_database_query = database_info.create_database_query;
if (restore_settings.create_table == RestoreTableCreationMode::kCreateIfNotExists)
{
create_database_query = create_database_query->clone();
create_database_query->as<ASTCreateQuery &>().if_not_exists = true;
}
LOG_TRACE(log, "Creating database {}: {}", backQuoteIfNeed(database_name), serializeAST(*create_database_query));
try
{
/// Execute CREATE DATABASE query.
InterpreterCreateQuery interpreter{create_database_query, context};
interpreter.setInternal(true);
interpreter.execute();
}
catch (Exception & e)
{
e.addMessage("While creating database {}", backQuoteIfNeed(database_name));
throw;
}
}
void RestorerFromBackup::checkDatabase(const String & database_name)
{
auto & database_info = database_infos.at(database_name);
try
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name);
database_info.database = database;
if (!restore_settings.allow_different_database_def && !database_info.is_predefined_database)
{
@ -601,14 +636,18 @@ void RestorerFromBackup::createDatabases()
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_DATABASE,
"The database {} has a different definition: {} "
"The database has a different definition: {} "
"comparing to its definition in the backup: {}",
backQuoteIfNeed(database_name),
serializeAST(*create_database_query),
serializeAST(*expected_create_query));
}
}
}
catch (Exception & e)
{
e.addMessage("While checking database {}", backQuoteIfNeed(database_name));
throw;
}
}
void RestorerFromBackup::createTables()
@ -622,82 +661,123 @@ void RestorerFromBackup::createTables()
for (const auto & table_name : tables_to_create)
{
auto & table_info = table_infos.at(table_name);
createTable(table_name);
checkTable(table_name);
insertDataToTable(table_name);
}
}
}
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_name.database);
void RestorerFromBackup::createTable(const QualifiedTableName & table_name)
{
if (restore_settings.create_table == RestoreTableCreationMode::kMustExist)
return;
bool need_create_table = (restore_settings.create_table != RestoreTableCreationMode::kMustExist);
if (table_info.is_predefined_table)
need_create_table = false; /// Predefined tables always exist.
/// Predefined tables always exist.
auto & table_info = table_infos.at(table_name);
if (table_info.is_predefined_table)
return;
if (need_create_table)
auto create_table_query = table_info.create_table_query;
if (restore_settings.create_table == RestoreTableCreationMode::kCreateIfNotExists)
{
create_table_query = create_table_query->clone();
create_table_query->as<ASTCreateQuery &>().if_not_exists = true;
}
LOG_TRACE(
log, "Creating {}: {}", tableNameWithTypeToString(table_name.database, table_name.table, false), serializeAST(*create_table_query));
try
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_name.database);
table_info.database = database;
/// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some
/// database-specific things).
database->createTableRestoredFromBackup(
create_table_query,
context,
restore_coordination,
std::chrono::duration_cast<std::chrono::milliseconds>(create_table_timeout).count());
}
catch (Exception & e)
{
e.addMessage("While creating {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
throw;
}
}
void RestorerFromBackup::checkTable(const QualifiedTableName & table_name)
{
auto & table_info = table_infos.at(table_name);
auto database = table_info.database;
try
{
if (!database)
{
database = DatabaseCatalog::instance().getDatabase(table_name.database);
table_info.database = database;
}
auto resolved_id = (table_name.database == DatabaseCatalog::TEMPORARY_DATABASE)
? context->resolveStorageID(StorageID{"", table_name.table}, Context::ResolveExternal)
: context->resolveStorageID(StorageID{table_name.database, table_name.table}, Context::ResolveGlobal);
StoragePtr storage = database->getTable(resolved_id.table_name, context);
table_info.storage = storage;
table_info.table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
if (!restore_settings.allow_different_table_def && !table_info.is_predefined_table)
{
ASTPtr create_table_query = database->getCreateTableQuery(resolved_id.table_name, context);
adjustCreateQueryForBackup(create_table_query, context->getGlobalContext(), nullptr);
ASTPtr expected_create_query = table_info.create_table_query;
if (serializeAST(*create_table_query) != serializeAST(*expected_create_query))
{
auto create_table_query = table_info.create_table_query;
if (restore_settings.create_table == RestoreTableCreationMode::kCreateIfNotExists)
{
create_table_query = create_table_query->clone();
create_table_query->as<ASTCreateQuery &>().if_not_exists = true;
}
LOG_TRACE(
log,
"Creating {}: {}",
tableNameWithTypeToString(table_name.database, table_name.table, false),
serializeAST(*create_table_query));
/// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some
/// database-specific things).
database->createTableRestoredFromBackup(
create_table_query,
context,
restore_coordination,
std::chrono::duration_cast<std::chrono::milliseconds>(create_table_timeout).count());
}
table_info.created = true;
auto resolved_id = (table_name.database == DatabaseCatalog::TEMPORARY_DATABASE)
? context->resolveStorageID(StorageID{"", table_name.table}, Context::ResolveExternal)
: context->resolveStorageID(StorageID{table_name.database, table_name.table}, Context::ResolveGlobal);
auto storage = database->getTable(resolved_id.table_name, context);
table_info.storage = storage;
table_info.table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
if (!restore_settings.allow_different_table_def && !table_info.is_predefined_table)
{
ASTPtr create_table_query = database->getCreateTableQuery(resolved_id.table_name, context);
adjustCreateQueryForBackup(create_table_query, context->getGlobalContext(), nullptr);
ASTPtr expected_create_query = table_info.create_table_query;
if (serializeAST(*create_table_query) != serializeAST(*expected_create_query))
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"{} has a different definition: {} "
"comparing to its definition in the backup: {}",
tableNameWithTypeToString(table_name.database, table_name.table, true),
serializeAST(*create_table_query),
serializeAST(*expected_create_query));
}
}
if (!restore_settings.structure_only)
{
const auto & data_path_in_backup = table_info.data_path_in_backup;
const auto & partitions = table_info.partitions;
if (partitions && !storage->supportsBackupPartition())
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Table engine {} doesn't support partitions, cannot restore {}",
storage->getName(),
tableNameWithTypeToString(table_name.database, table_name.table, false));
}
storage->restoreDataFromBackup(*this, data_path_in_backup, partitions);
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"The table has a different definition: {} "
"comparing to its definition in the backup: {}",
serializeAST(*create_table_query),
serializeAST(*expected_create_query));
}
}
}
catch (Exception & e)
{
e.addMessage("While checking {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
throw;
}
}
void RestorerFromBackup::insertDataToTable(const QualifiedTableName & table_name)
{
if (restore_settings.structure_only)
return;
auto & table_info = table_infos.at(table_name);
auto storage = table_info.storage;
try
{
const auto & data_path_in_backup = table_info.data_path_in_backup;
const auto & partitions = table_info.partitions;
if (partitions && !storage->supportsBackupPartition())
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Table engine {} doesn't support partitions",
storage->getName());
}
storage->restoreDataFromBackup(*this, data_path_in_backup, partitions);
}
catch (Exception & e)
{
e.addMessage("While restoring data of {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
throw;
}
}
/// Returns the list of tables without dependencies or those which dependencies have been created before.
@ -708,7 +788,7 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
for (const auto & [key, table_info] : table_infos)
{
if (table_info.created)
if (table_info.storage)
continue;
/// Found a table which is not created yet.
@ -719,7 +799,7 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
for (const auto & dependency : table_info.dependencies)
{
auto it = table_infos.find(dependency);
if ((it != table_infos.end()) && !it->second.created)
if ((it != table_infos.end()) && !it->second.storage)
{
all_dependencies_met = false;
break;
@ -740,7 +820,7 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
std::vector<QualifiedTableName> tables_with_cyclic_dependencies;
for (const auto & [key, table_info] : table_infos)
{
if (!table_info.created)
if (!table_info.storage)
tables_with_cyclic_dependencies.push_back(key);
}
@ -759,14 +839,14 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
void RestorerFromBackup::addDataRestoreTask(DataRestoreTask && new_task)
{
if (current_status == kInsertingDataToTablesStatus)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding data-restoring tasks is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
data_restore_tasks.push_back(std::move(new_task));
}
void RestorerFromBackup::addDataRestoreTasks(DataRestoreTasks && new_tasks)
{
if (current_status == kInsertingDataToTablesStatus)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding data-restoring tasks is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
insertAtEnd(data_restore_tasks, std::move(new_tasks));
}

View File

@ -15,10 +15,13 @@ class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
class IRestoreCoordination;
struct StorageID;
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
class AccessRestorerFromBackup;
struct IAccessEntity;
using AccessEntityPtr = std::shared_ptr<const IAccessEntity>;
/// Restores the definition of databases and tables and prepares tasks to restore the data of the tables.
class RestorerFromBackup : private boost::noncopyable
{
@ -87,7 +90,13 @@ private:
void checkAccessForObjectsFoundInBackup() const;
void createDatabases();
void createDatabase(const String & database_name) const;
void checkDatabase(const String & database_name);
void createTables();
void createTable(const QualifiedTableName & table_name);
void checkTable(const QualifiedTableName & table_name);
void insertDataToTable(const QualifiedTableName & table_name);
DataRestoreTasks getDataRestoreTasks();
@ -97,6 +106,7 @@ private:
{
ASTPtr create_database_query;
bool is_predefined_database = false;
DatabasePtr database;
};
struct TableInfo
@ -107,7 +117,7 @@ private:
bool has_data = false;
std::filesystem::path data_path_in_backup;
std::optional<ASTs> partitions;
bool created = false;
DatabasePtr database;
StoragePtr storage;
TableLockHolder table_lock;
};

View File

@ -14,19 +14,21 @@ using DatabaseAndTableName = std::pair<String, String>;
* DICTIONARY [db.]dictionary_name [AS [db.]dictionary_name_in_backup] |
* DATABASE database_name [AS database_name_in_backup] [EXCEPT TABLES ...] |
* TEMPORARY TABLE table_name [AS table_name_in_backup] |
* ALL DATABASES [EXCEPT ...] } [,...]
* ALL [EXCEPT {TABLES|DATABASES}...] } [,...]
* [ON CLUSTER 'cluster_name']
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {File(...) | Disk(...)}]
* Disk('disk_name', 'path/') }
* [SETTINGS ...]
*
* RESTORE { TABLE [db.]table_name_in_backup [AS [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [AS [db.]dictionary_name] |
* DATABASE database_name_in_backup [AS database_name] [EXCEPT TABLES ...] |
* TEMPORARY TABLE table_name_in_backup [AS table_name] |
* ALL DATABASES [EXCEPT ...] } [,...]
* DICTIONARY [db.]dictionary_name_in_backup [AS [db.]dictionary_name] |
* DATABASE database_name_in_backup [AS database_name] [EXCEPT TABLES ...] |
* TEMPORARY TABLE table_name_in_backup [AS table_name] |
* ALL [EXCEPT {TABLES|DATABASES} ...] } [,...]
* [ON CLUSTER 'cluster_name']
* FROM {File(...) | Disk(...)}
* FROM { File('path/') |
* Disk('disk_name', 'path/') }
* [SETTINGS ...]
*
* Notes:
* RESTORE doesn't drop any data, it either creates a table or appends an existing table with restored data.

View File

@ -13,8 +13,8 @@ namespace DB
* ALL [EXCEPT {TABLES|DATABASES}...] } [,...]
* [ON CLUSTER 'cluster_name']
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {File(...) | Disk(...)}]
* Disk('disk_name', 'path/') }
* [SETTINGS ...]
*
* RESTORE { TABLE [db.]table_name_in_backup [AS [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [AS [db.]dictionary_name] |
@ -22,7 +22,9 @@ namespace DB
* TEMPORARY TABLE table_name_in_backup [AS table_name] |
* ALL [EXCEPT {TABLES|DATABASES} ...] } [,...]
* [ON CLUSTER 'cluster_name']
* FROM {File(...) | Disk(...)}
* FROM { File('path/') |
* Disk('disk_name', 'path/') }
* [SETTINGS ...]
*/
class ParserBackupQuery : public IParserBase
{

View File

@ -628,7 +628,7 @@ void DataPartStorageOnDisk::backup(
auto disk = volume->getDisk();
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end())
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup/")).first;
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
auto temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_part_dir = temp_dir / part_path_in_backup.relative_path();
@ -637,11 +637,11 @@ void DataPartStorageOnDisk::backup(
/// For example,
/// part_path_in_backup = /data/test/table/0_1_1_0
/// part_path_on_disk = store/f57/f5728353-44bb-4575-85e8-28deb893657a/0_1_1_0
/// tmp_part_dir = tmp/backup/1aaaaaa/data/test/table/0_1_1_0
/// tmp_part_dir = tmp/1aaaaaa/data/test/table/0_1_1_0
/// Or, for projections:
/// part_path_in_backup = /data/test/table/0_1_1_0/prjmax.proj
/// part_path_on_disk = store/f57/f5728353-44bb-4575-85e8-28deb893657a/0_1_1_0/prjmax.proj
/// tmp_part_dir = tmp/backup/1aaaaaa/data/test/table/0_1_1_0/prjmax.proj
/// tmp_part_dir = tmp/1aaaaaa/data/test/table/0_1_1_0/prjmax.proj
for (const auto & [filepath, checksum] : checksums.files)
{

View File

@ -72,6 +72,7 @@
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -3989,17 +3990,19 @@ Pipe MergeTreeData::alterPartition(
void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
backup_entries_collector.addBackupEntries(backupParts(backup_entries_collector.getContext(), data_path_in_backup, partitions));
}
auto local_context = backup_entries_collector.getContext();
BackupEntries MergeTreeData::backupParts(const ContextPtr & local_context, const String & data_path_in_backup, const std::optional<ASTs> & partitions) const
{
DataPartsVector data_parts;
if (partitions)
data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(*partitions, local_context));
else
data_parts = getVisibleDataPartsVector(local_context);
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
}
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup)
{
BackupEntries backup_entries;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
@ -4026,6 +4029,9 @@ BackupEntries MergeTreeData::backupParts(const ContextPtr & local_context, const
void MergeTreeData::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
auto backup = restorer.getBackup();
if (!backup->hasFiles(data_path_in_backup))
return;
if (!restorer.isNonEmptyTableAllowed() && getTotalActiveSizeInBytes() && backup->hasFiles(data_path_in_backup))
restorer.throwTableIsNotEmpty(getStorageID());
@ -4049,14 +4055,22 @@ public:
attachIfAllPartsRestored();
}
void addPart(MutableDataPartPtr part, std::shared_ptr<TemporaryFileOnDisk> temp_part_dir_owner)
void addPart(MutableDataPartPtr part)
{
std::lock_guard lock{mutex};
parts.emplace_back(part);
temp_part_dir_owners.emplace_back(temp_part_dir_owner);
attachIfAllPartsRestored();
}
String getTemporaryDirectory(const DiskPtr & disk)
{
std::lock_guard lock{mutex};
auto it = temp_dirs.find(disk);
if (it == temp_dirs.end())
it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
return it->second->getPath();
}
private:
void attachIfAllPartsRestored()
{
@ -4071,7 +4085,7 @@ private:
storage->attachRestoredParts(std::move(parts));
parts.clear();
temp_part_dir_owners.clear();
temp_dirs.clear();
num_parts = 0;
}
@ -4079,7 +4093,7 @@ private:
BackupPtr backup;
size_t num_parts = 0;
MutableDataPartsVector parts;
std::vector<std::shared_ptr<TemporaryFileOnDisk>> temp_part_dir_owners;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
mutable std::mutex mutex;
};
@ -4091,6 +4105,8 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
auto backup = restorer.getBackup();
Strings part_names = backup->listFiles(data_path_in_backup);
boost::remove_erase(part_names, "mutations");
auto restored_parts_holder
= std::make_shared<RestoredPartsHolder>(std::static_pointer_cast<MergeTreeData>(shared_from_this()), backup, part_names.size());
@ -4102,8 +4118,8 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_info)
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File name {} doesn't look like the name of a part",
getStorageID().getFullTableName(), String{data_path_in_backup_fs / part_name});
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File name {} is not a part's name",
String{data_path_in_backup_fs / part_name});
}
if (partition_ids && !partition_ids->contains(part_info->partition_id))
@ -4123,8 +4139,9 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
restored_parts_holder->setNumParts(num_parts);
}
void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup)
void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const
{
String part_name = part_info.getPartName();
auto backup = restored_parts_holder->getBackup();
UInt64 total_size_of_part = 0;
@ -4136,14 +4153,18 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
std::shared_ptr<IReservation> reservation = getStoragePolicy()->reserveAndCheck(total_size_of_part);
auto disk = reservation->getDisk();
String part_name = part_info.getPartName();
auto temp_part_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, fs::path{relative_data_path} / ("restoring_" + part_name + "_"));
fs::path temp_part_dir = temp_part_dir_owner->getPath();
fs::path temp_dir = restored_parts_holder->getTemporaryDirectory(disk);
fs::path temp_part_dir = temp_dir / part_path_in_backup_fs.relative_path();
disk->createDirectories(temp_part_dir);
std::unordered_set<String> subdirs;
/// temp_part_name = "restoring_<part_name>_<random_chars>", for example "restoring_0_1_1_0_1baaaaa"
String temp_part_name = temp_part_dir.filename();
/// For example:
/// part_name = 0_1_1_0
/// part_path_in_backup = /data/test/table/0_1_1_0
/// tmp_dir = tmp/1aaaaaa
/// tmp_part_dir = tmp/1aaaaaa/data/test/table/0_1_1_0
/// Subdirectories in the part's directory. It's used to restore projections.
std::unordered_set<String> subdirs;
for (const String & filename : filenames)
{
@ -4168,12 +4189,12 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, temp_part_name);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, temp_part_dir.parent_path(), part_name);
auto part = createPart(part_name, part_info, data_part_storage);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(false, true);
restored_parts_holder->addPart(part, temp_part_dir_owner);
restored_parts_holder->addPart(part);
}

View File

@ -1232,13 +1232,13 @@ protected:
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
/// Makes backup entries to backup the parts of this table.
BackupEntries backupParts(const ContextPtr & local_context, const String & data_path_in_backup, const std::optional<ASTs> & partitions) const;
static BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup);
class RestoredPartsHolder;
/// Restores the parts of this table from backup.
void restorePartsFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions);
void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup);
void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const;
/// Attaches restored parts to the storage.
virtual void attachRestoredParts(MutableDataPartsVector && parts) = 0;

View File

@ -5,6 +5,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/TransactionLog.h>
#include <Backups/BackupEntryFromMemory.h>
#include <utility>
@ -167,4 +168,16 @@ MergeTreeMutationEntry::~MergeTreeMutationEntry()
}
}
std::shared_ptr<const IBackupEntry> MergeTreeMutationEntry::backup() const
{
WriteBufferFromOwnString out;
out << "block number: " << block_number << "\n";
out << "commands: ";
commands.writeText(out);
out << "\n";
return std::make_shared<BackupEntryFromMemory>(out.str());
}
}

View File

@ -9,6 +9,7 @@
namespace DB
{
class IBackupEntry;
/// A mutation entry for non-replicated MergeTree storage engines.
/// Stores information about mutation in file mutation_*.txt.
@ -48,6 +49,8 @@ struct MergeTreeMutationEntry
void writeCSN(CSN csn_);
std::shared_ptr<const IBackupEntry> backup() const;
static String versionToFileName(UInt64 block_number_);
static UInt64 tryParseFileName(const String & file_name_);
static UInt64 parseFileName(const String & file_name_);

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Backups/BackupEntryFromMemory.h>
namespace DB
@ -78,4 +79,24 @@ ReplicatedMergeTreeMutationEntry ReplicatedMergeTreeMutationEntry::parse(const S
return res;
}
std::shared_ptr<const IBackupEntry> ReplicatedMergeTreeMutationEntry::backup() const
{
WriteBufferFromOwnString out;
out << "block numbers count: " << block_numbers.size() << "\n";
for (const auto & kv : block_numbers)
{
const String & partition_id = kv.first;
Int64 number = kv.second;
out << partition_id << "\t" << number << "\n";
}
out << "commands: ";
commands.writeText(out);
out << "\n";
return std::make_shared<BackupEntryFromMemory>(out.str());
}
}

View File

@ -12,6 +12,7 @@ namespace DB
class ReadBuffer;
class WriteBuffer;
class IBackupEntry;
/// Mutation entry in /mutations path in zookeeper. This record contains information about blocks
/// in patitions. We will mutatate all parts with left number less than this numbers.
@ -48,6 +49,8 @@ struct ReplicatedMergeTreeMutationEntry
int alter_version = -1;
bool isAlterMutation() const { return alter_version != -1; }
std::shared_ptr<const IBackupEntry> backup() const;
};
using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;

View File

@ -935,7 +935,7 @@ void StorageLog::backupData(BackupEntriesCollector & backup_entries_collector, c
return;
fs::path data_path_in_backup_fs = data_path_in_backup;
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_");
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/");
fs::path temp_dir = temp_dir_owner->getPath();
disk->createDirectories(temp_dir);
@ -1023,10 +1023,8 @@ void StorageLog::restoreDataImpl(const BackupPtr & backup, const String & data_p
{
String file_path_in_backup = data_path_in_backup_fs / fileName(data_file.path);
if (!backup->fileExists(file_path_in_backup))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File {} in backup is required",
getStorageID().getFullTableName(), file_path_in_backup);
}
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file.path, max_compress_block_size, WriteMode::Append);
@ -1039,10 +1037,8 @@ void StorageLog::restoreDataImpl(const BackupPtr & backup, const String & data_p
size_t num_extra_marks = 0;
String file_path_in_backup = data_path_in_backup_fs / fileName(marks_file_path);
if (!backup->fileExists(file_path_in_backup))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File {} in backup is required",
getStorageID().getFullTableName(), file_path_in_backup);
}
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", file_path_in_backup);
size_t file_size = backup->getFileSize(file_path_in_backup);
if (file_size % (num_data_files * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);

View File

@ -24,12 +24,14 @@
#include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntriesBatch.h>
#include <Backups/IBackupEntriesLazyBatch.h>
#include <Backups/RestorerFromBackup.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <IO/copyData.h>
#include <Poco/TemporaryFile.h>
namespace DB
@ -382,109 +384,125 @@ void StorageMemory::truncate(
namespace
{
class MemoryBackupEntriesBatch : public IBackupEntriesBatch, boost::noncopyable
class MemoryBackup : public IBackupEntriesLazyBatch, boost::noncopyable
{
public:
MemoryBackupEntriesBatch(
MemoryBackup(
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<const Blocks> blocks_,
const String & data_path_in_backup,
const DiskPtr & temp_disk_,
UInt64 max_compress_block_size_)
: IBackupEntriesBatch(
{fs::path{data_path_in_backup} / "data.bin",
fs::path{data_path_in_backup} / "index.mrk",
fs::path{data_path_in_backup} / "sizes.json"})
, metadata_snapshot(metadata_snapshot_)
: metadata_snapshot(metadata_snapshot_)
, blocks(blocks_)
, temp_disk(temp_disk_)
, max_compress_block_size(max_compress_block_size_)
{
fs::path data_path_in_backup_fs = data_path_in_backup;
data_bin_pos = file_paths.size();
file_paths.emplace_back(data_path_in_backup_fs / "data.bin");
index_mrk_pos= file_paths.size();
file_paths.emplace_back(data_path_in_backup_fs / "index.mrk");
columns_txt_pos = file_paths.size();
file_paths.emplace_back(data_path_in_backup_fs / "columns.txt");
count_txt_pos = file_paths.size();
file_paths.emplace_back(data_path_in_backup_fs / "count.txt");
sizes_json_pos = file_paths.size();
file_paths.emplace_back(data_path_in_backup_fs / "sizes.json");
}
private:
static constexpr const size_t kDataBinPos = 0;
static constexpr const size_t kIndexMrkPos = 1;
static constexpr const size_t kSizesJsonPos = 2;
static constexpr const size_t kSize = 3;
void initialize()
size_t getSize() const override
{
std::call_once(initialized_flag, [this]()
return file_paths.size();
}
const String & getName(size_t i) const override
{
return file_paths[i];
}
BackupEntries generate() override
{
BackupEntries backup_entries;
backup_entries.resize(file_paths.size());
temp_dir_owner.emplace(temp_disk);
fs::path temp_dir = temp_dir_owner->getPath();
temp_disk->createDirectories(temp_dir);
/// Writing data.bin
IndexForNativeFormat index;
{
temp_dir_owner.emplace();
fs::path temp_dir = temp_dir_owner->path();
fs::create_directories(temp_dir);
auto data_file_path = temp_dir / fs::path{file_paths[data_bin_pos]}.filename();
auto data_out_compressed = temp_disk->writeFile(data_file_path);
CompressedWriteBuffer data_out{*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size};
NativeWriter block_out{data_out, 0, metadata_snapshot->getSampleBlock(), false, &index};
for (const auto & block : *blocks)
block_out.write(block);
backup_entries[data_bin_pos] = {file_paths[data_bin_pos], std::make_shared<BackupEntryFromImmutableFile>(temp_disk, data_file_path)};
}
/// Writing data.bin
constexpr char data_file_name[] = "data.bin";
auto data_file_path = temp_dir / data_file_name;
IndexForNativeFormat index;
/// Writing index.mrk
{
auto index_mrk_path = temp_dir / fs::path{file_paths[index_mrk_pos]}.filename();
auto index_mrk_out_compressed = temp_disk->writeFile(index_mrk_path);
CompressedWriteBuffer index_mrk_out{*index_mrk_out_compressed};
index.write(index_mrk_out);
backup_entries[index_mrk_pos] = {file_paths[index_mrk_pos], std::make_shared<BackupEntryFromImmutableFile>(temp_disk, index_mrk_path)};
}
/// Writing columns.txt
{
auto columns_desc = metadata_snapshot->getColumns().getAllPhysical().toString();
backup_entries[columns_txt_pos] = {file_paths[columns_txt_pos], std::make_shared<BackupEntryFromMemory>(columns_desc)};
}
/// Writing count.txt
{
size_t num_rows = 0;
for (const auto & block : *blocks)
num_rows += block.rows();
backup_entries[count_txt_pos] = {file_paths[count_txt_pos], std::make_shared<BackupEntryFromMemory>(toString(num_rows))};
}
/// Writing sizes.json
{
auto sizes_json_path = temp_dir / fs::path{file_paths[sizes_json_pos]}.filename();
FileChecker file_checker{temp_disk, sizes_json_path};
for (size_t i = 0; i != file_paths.size(); ++i)
{
auto data_out_compressed = std::make_unique<WriteBufferFromFile>(data_file_path);
CompressedWriteBuffer data_out{*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size};
NativeWriter block_out{data_out, 0, metadata_snapshot->getSampleBlock(), false, &index};
for (const auto & block : *blocks)
block_out.write(block);
if (i == sizes_json_pos)
continue;
file_checker.update(temp_dir / fs::path{file_paths[i]}.filename());
}
/// Writing index.mrk
constexpr char index_file_name[] = "index.mrk";
auto index_file_path = temp_dir / index_file_name;
{
auto index_out_compressed = std::make_unique<WriteBufferFromFile>(index_file_path);
CompressedWriteBuffer index_out{*index_out_compressed};
index.write(index_out);
}
/// Writing sizes.json
constexpr char sizes_file_name[] = "sizes.json";
auto sizes_file_path = temp_dir / sizes_file_name;
FileChecker file_checker{sizes_file_path};
file_checker.update(data_file_path);
file_checker.update(index_file_path);
file_checker.save();
backup_entries[sizes_json_pos] = {file_paths[sizes_json_pos], std::make_shared<BackupEntryFromSmallFile>(temp_disk, sizes_json_path)};
}
file_paths[kDataBinPos] = data_file_path;
file_sizes[kDataBinPos] = file_checker.getFileSize(data_file_path);
/// We don't need to keep `blocks` any longer.
blocks.reset();
metadata_snapshot.reset();
file_paths[kIndexMrkPos] = index_file_path;
file_sizes[kIndexMrkPos] = file_checker.getFileSize(index_file_path);
file_paths[kSizesJsonPos] = sizes_file_path;
file_sizes[kSizesJsonPos] = fs::file_size(sizes_file_path);
/// We don't need to keep `blocks` any longer.
blocks.reset();
metadata_snapshot.reset();
});
}
std::unique_ptr<SeekableReadBuffer> getReadBuffer(size_t index) override
{
initialize();
return createReadBufferFromFileBase(file_paths[index], {});
}
UInt64 getSize(size_t index) override
{
initialize();
return file_sizes[index];
return backup_entries;
}
StorageMetadataPtr metadata_snapshot;
std::shared_ptr<const Blocks> blocks;
DiskPtr temp_disk;
std::optional<TemporaryFileOnDisk> temp_dir_owner;
UInt64 max_compress_block_size;
std::once_flag initialized_flag;
std::optional<Poco::TemporaryFile> temp_dir_owner;
std::array<String, kSize> file_paths;
std::array<UInt64, kSize> file_sizes;
Strings file_paths;
size_t data_bin_pos, index_mrk_pos, columns_txt_pos, count_txt_pos, sizes_json_pos;
};
}
void StorageMemory::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & /* partitions */)
{
auto temp_disk = backup_entries_collector.getContext()->getTemporaryVolume()->getDisk(0);
auto max_compress_block_size = backup_entries_collector.getContext()->getSettingsRef().max_compress_block_size;
backup_entries_collector.addBackupEntries(
std::make_shared<MemoryBackupEntriesBatch>(getInMemoryMetadataPtr(), data.get(), data_path_in_backup, max_compress_block_size)
std::make_shared<MemoryBackup>(getInMemoryMetadataPtr(), data.get(), data_path_in_backup, temp_disk, max_compress_block_size)
->getBackupEntries());
}
@ -497,12 +515,14 @@ void StorageMemory::restoreDataFromBackup(RestorerFromBackup & restorer, const S
if (!restorer.isNonEmptyTableAllowed() && total_size_bytes)
RestorerFromBackup::throwTableIsNotEmpty(getStorageID());
auto temp_disk = restorer.getContext()->getTemporaryVolume()->getDisk(0);
restorer.addDataRestoreTask(
[storage = std::static_pointer_cast<StorageMemory>(shared_from_this()), backup, data_path_in_backup]
{ storage->restoreDataImpl(backup, data_path_in_backup); });
[storage = std::static_pointer_cast<StorageMemory>(shared_from_this()), backup, data_path_in_backup, temp_disk]
{ storage->restoreDataImpl(backup, data_path_in_backup, temp_disk); });
}
void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & data_path_in_backup)
void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & data_path_in_backup, const DiskPtr & temporary_disk)
{
/// Our data are in the StripeLog format.
@ -513,10 +533,8 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
{
String index_file_path = data_path_in_backup_fs / "index.mrk";
if (!backup->fileExists(index_file_path))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File {} in backup is required",
getStorageID().getFullTableName(), index_file_path);
}
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", index_file_path);
auto backup_entry = backup->readFile(index_file_path);
auto in = backup_entry->getReadBuffer();
CompressedReadBuffer compressed_in{*in};
@ -530,20 +548,18 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
{
String data_file_path = data_path_in_backup_fs / "data.bin";
if (!backup->fileExists(data_file_path))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File {} in backup is required",
getStorageID().getFullTableName(), data_file_path);
}
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", data_file_path);
auto backup_entry = backup->readFile(data_file_path);
std::unique_ptr<ReadBuffer> in = backup_entry->getReadBuffer();
std::optional<Poco::TemporaryFile> temp_data_copy;
std::optional<TemporaryFileOnDisk> temp_data_file;
if (!dynamic_cast<ReadBufferFromFileBase *>(in.get()))
{
temp_data_copy.emplace();
auto temp_data_copy_out = std::make_unique<WriteBufferFromFile>(temp_data_copy->path());
copyData(*in, *temp_data_copy_out);
temp_data_copy_out.reset();
in = createReadBufferFromFileBase(temp_data_copy->path(), {});
temp_data_file.emplace(temporary_disk);
auto out = std::make_unique<WriteBufferFromFile>(temp_data_file->getPath());
copyData(*in, *out);
out.reset();
in = createReadBufferFromFileBase(temp_data_file->getPath(), {});
}
std::unique_ptr<ReadBufferFromFileBase> in_from_file{static_cast<ReadBufferFromFileBase *>(in.release())};
CompressedReadBufferFromFile compressed_in{std::move(in_from_file)};

View File

@ -116,7 +116,7 @@ public:
private:
/// Restores the data of this table from backup.
void restoreDataImpl(const BackupPtr & backup, const String & data_path_in_backup);
void restoreDataImpl(const BackupPtr & backup, const String & data_path_in_backup, const DiskPtr & temporary_disk);
/// MultiVersion data storage, so that we can copy the vector of blocks to readers.

View File

@ -3,6 +3,7 @@
#include <optional>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
@ -1801,6 +1802,35 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
}
void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
auto local_context = backup_entries_collector.getContext();
DataPartsVector data_parts;
if (partitions)
data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(*partitions, local_context));
else
data_parts = getVisibleDataPartsVector(local_context);
Int64 min_data_version = std::numeric_limits<Int64>::max();
for (const auto & data_part : data_parts)
min_data_version = std::min(min_data_version, data_part->info.getDataVersion());
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup));
}
BackupEntries StorageMergeTree::backupMutations(UInt64 version, const String & data_path_in_backup) const
{
fs::path mutations_path_in_backup = fs::path{data_path_in_backup} / "mutations";
BackupEntries backup_entries;
for (auto it = current_mutations_by_version.lower_bound(version); it != current_mutations_by_version.end(); ++it)
backup_entries.emplace_back(mutations_path_in_backup / fmt::format("{:010}.txt", it->first), it->second.backup());
return backup_entries;
}
void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{

View File

@ -92,6 +92,9 @@ public:
CancellationCode killMutation(const String & mutation_id) override;
/// Makes backup entries to backup the data of the storage.
void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
@ -255,6 +258,8 @@ private:
void startBackgroundMovesIfNeeded() override;
BackupEntries backupMutations(UInt64 version, const String & data_path_in_backup) const;
/// Attaches restored parts to the storage.
void attachRestoredParts(MutableDataPartsVector && parts) override;

View File

@ -8208,7 +8208,15 @@ void StorageReplicatedMergeTree::backupData(
/// First we generate backup entries in the same way as an ordinary MergeTree does.
/// But then we don't add them to the BackupEntriesCollector right away,
/// because we need to coordinate them with other replicas (other replicas can have better parts).
auto backup_entries = backupParts(backup_entries_collector.getContext(), "", partitions);
auto local_context = backup_entries_collector.getContext();
DataPartsVector data_parts;
if (partitions)
data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(*partitions, local_context));
else
data_parts = getVisibleDataPartsVector(local_context);
auto backup_entries = backupParts(data_parts, "");
auto coordination = backup_entries_collector.getBackupCoordination();
String shared_id = getTableSharedID();
@ -8252,6 +8260,20 @@ void StorageReplicatedMergeTree::backupData(
/// Send our list of part names to the coordination (to compare with other replicas).
coordination->addReplicatedPartNames(shared_id, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
/// Send a list of mutations to the coordination too (we need to find the mutations which are not finished for added part names).
{
std::vector<IBackupCoordination::MutationInfo> mutation_infos;
auto zookeeper = getZooKeeper();
Strings mutation_ids = zookeeper->getChildren(fs::path(zookeeper_path) / "mutations");
mutation_infos.reserve(mutation_ids.size());
for (const auto & mutation_id : mutation_ids)
{
mutation_infos.emplace_back(
IBackupCoordination::MutationInfo{mutation_id, zookeeper->get(fs::path(zookeeper_path) / "mutations" / mutation_id)});
}
coordination->addReplicatedMutations(shared_id, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
}
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
/// give us the final list of parts to add to the BackupEntriesCollector.
auto post_collecting_task = [shared_id,
@ -8278,7 +8300,16 @@ void StorageReplicatedMergeTree::backupData(
for (const auto & data_path : data_paths_fs)
backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry);
}
auto mutation_infos = coordination->getReplicatedMutations(shared_id, replica_name);
for (const auto & mutation_info : mutation_infos)
{
auto backup_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_info.entry, mutation_info.id).backup();
for (const auto & data_path : data_paths_fs)
backup_entries_collector.addBackupEntry(data_path / "mutations" / (mutation_info.id + ".txt"), backup_entry);
}
};
backup_entries_collector.addPostTask(post_collecting_task);
}

View File

@ -535,7 +535,7 @@ void StorageStripeLog::backupData(BackupEntriesCollector & backup_entries_collec
return;
fs::path data_path_in_backup_fs = data_path_in_backup;
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_");
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/");
fs::path temp_dir = temp_dir_owner->getPath();
disk->createDirectories(temp_dir);
@ -617,10 +617,8 @@ void StorageStripeLog::restoreDataImpl(const BackupPtr & backup, const String &
{
String file_path_in_backup = data_path_in_backup_fs / fileName(data_file_path);
if (!backup->fileExists(file_path_in_backup))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File {} in backup is required",
getStorageID().getFullTableName(), file_path_in_backup);
}
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file_path, max_compress_block_size, WriteMode::Append);
@ -632,10 +630,8 @@ void StorageStripeLog::restoreDataImpl(const BackupPtr & backup, const String &
String index_path_in_backup = data_path_in_backup_fs / fileName(index_file_path);
IndexForNativeFormat extra_indices;
if (!backup->fileExists(index_path_in_backup))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File {} in backup is required",
getStorageID().getFullTableName(), index_path_in_backup);
}
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", index_path_in_backup);
auto backup_entry = backup->readFile(index_path_in_backup);
auto index_in = backup_entry->getReadBuffer();
CompressedReadBuffer index_compressed_in{*index_in};

View File

@ -69,6 +69,15 @@ def new_session_id():
return "Session #" + str(session_id_counter)
def has_mutation_in_backup(mutation_id, backup_name, database, table):
return os.path.exists(
os.path.join(
get_path_to_backup(backup_name),
f"data/{database}/{table}/mutations/{mutation_id}.txt",
)
)
@pytest.mark.parametrize(
"engine", ["MergeTree", "Log", "TinyLog", "StripeLog", "Memory"]
)
@ -829,3 +838,31 @@ def test_restore_partition():
assert instance.query("SELECT * FROM test.table ORDER BY x") == TSV(
[[2, "2"], [3, "3"], [12, "12"], [13, "13"], [22, "22"], [23, "23"]]
)
def test_mutation():
create_and_fill_table(engine="MergeTree ORDER BY tuple()", n=5)
instance.query(
"INSERT INTO test.table SELECT number, toString(number) FROM numbers(5, 5)"
)
instance.query(
"INSERT INTO test.table SELECT number, toString(number) FROM numbers(10, 5)"
)
instance.query("ALTER TABLE test.table UPDATE x=x+1 WHERE 1")
instance.query("ALTER TABLE test.table UPDATE x=x+1+sleep(1) WHERE 1")
instance.query("ALTER TABLE test.table UPDATE x=x+1+sleep(2) WHERE 1")
backup_name = new_backup_name()
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert not has_mutation_in_backup("0000000004", backup_name, "test", "table")
assert has_mutation_in_backup("0000000005", backup_name, "test", "table")
assert has_mutation_in_backup("0000000006", backup_name, "test", "table")
assert not has_mutation_in_backup("0000000007", backup_name, "test", "table")
instance.query("DROP TABLE test.table")
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")

View File

@ -79,7 +79,7 @@ def new_backup_name():
def get_path_to_backup(backup_name):
name = backup_name.split(",")[1].strip("')/ ")
return os.path.join(instance.cluster.instances_dir, "backups", name)
return os.path.join(node1.cluster.instances_dir, "backups", name)
def test_replicated_table():
@ -652,3 +652,57 @@ def test_table_in_replicated_database_with_not_synced_def():
assert node2.query(
"SELECT name, type FROM system.columns WHERE database='mydb' AND table='tbl'"
) == TSV([["x", "String"], ["y", "String"]])
def has_mutation_in_backup(mutation_id, backup_name, database, table):
return (
os.path.exists(
os.path.join(
get_path_to_backup(backup_name),
f"shards/1/replicas/1/data/{database}/{table}/mutations/{mutation_id}.txt",
)
)
or os.path.exists(
os.path.join(
get_path_to_backup(backup_name),
f"shards/1/replicas/2/data/{database}/{table}/mutations/{mutation_id}.txt",
)
)
or os.path.exists(
os.path.join(
get_path_to_backup(backup_name),
f"shards/1/replicas/3/data/{database}/{table}/mutations/{mutation_id}.txt",
)
)
)
def test_mutation():
node1.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x UInt8, y String"
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
"ORDER BY tuple()"
)
node1.query("INSERT INTO tbl SELECT number, toString(number) FROM numbers(5)")
node2.query("INSERT INTO tbl SELECT number, toString(number) FROM numbers(5, 5)")
node1.query("INSERT INTO tbl SELECT number, toString(number) FROM numbers(10, 5)")
node1.query("ALTER TABLE tbl UPDATE x=x+1 WHERE 1")
node1.query("ALTER TABLE tbl UPDATE x=x+1+sleep(1) WHERE 1")
node1.query("ALTER TABLE tbl UPDATE x=x+1+sleep(2) WHERE 1")
backup_name = new_backup_name()
node1.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}")
assert not has_mutation_in_backup("0000000000", backup_name, "default", "tbl")
assert has_mutation_in_backup("0000000001", backup_name, "default", "tbl")
assert has_mutation_in_backup("0000000002", backup_name, "default", "tbl")
assert not has_mutation_in_backup("0000000003", backup_name, "default", "tbl")
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
node1.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")