Fix making backup when multiple shards are used.

This commit is contained in:
Vitaly Baranov 2023-12-08 16:51:35 +01:00
parent 3a789d7fb7
commit e09530ab75
14 changed files with 100 additions and 138 deletions

View File

@ -33,42 +33,42 @@ Strings BackupCoordinationLocal::waitForStage(const String &, std::chrono::milli
return {};
}
void BackupCoordinationLocal::addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector<PartNameAndChecksum> & part_names_and_checksums)
void BackupCoordinationLocal::addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name, const std::vector<PartNameAndChecksum> & part_names_and_checksums)
{
std::lock_guard lock{replicated_tables_mutex};
replicated_tables.addPartNames({table_shared_id, table_name_for_logs, replica_name, part_names_and_checksums});
replicated_tables.addPartNames({table_zk_path, table_name_for_logs, replica_name, part_names_and_checksums});
}
Strings BackupCoordinationLocal::getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const
Strings BackupCoordinationLocal::getReplicatedPartNames(const String & table_zk_path, const String & replica_name) const
{
std::lock_guard lock{replicated_tables_mutex};
return replicated_tables.getPartNames(table_shared_id, replica_name);
return replicated_tables.getPartNames(table_zk_path, replica_name);
}
void BackupCoordinationLocal::addReplicatedMutations(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector<MutationInfo> & mutations)
void BackupCoordinationLocal::addReplicatedMutations(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name, const std::vector<MutationInfo> & mutations)
{
std::lock_guard lock{replicated_tables_mutex};
replicated_tables.addMutations({table_shared_id, table_name_for_logs, replica_name, mutations});
replicated_tables.addMutations({table_zk_path, table_name_for_logs, replica_name, mutations});
}
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationLocal::getReplicatedMutations(const String & table_shared_id, const String & replica_name) const
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationLocal::getReplicatedMutations(const String & table_zk_path, const String & replica_name) const
{
std::lock_guard lock{replicated_tables_mutex};
return replicated_tables.getMutations(table_shared_id, replica_name);
return replicated_tables.getMutations(table_zk_path, replica_name);
}
void BackupCoordinationLocal::addReplicatedDataPath(const String & table_shared_id, const String & data_path)
void BackupCoordinationLocal::addReplicatedDataPath(const String & table_zk_path, const String & data_path)
{
std::lock_guard lock{replicated_tables_mutex};
replicated_tables.addDataPath({table_shared_id, data_path});
replicated_tables.addDataPath({table_zk_path, data_path});
}
Strings BackupCoordinationLocal::getReplicatedDataPaths(const String & table_shared_id) const
Strings BackupCoordinationLocal::getReplicatedDataPaths(const String & table_zk_path) const
{
std::lock_guard lock{replicated_tables_mutex};
return replicated_tables.getDataPaths(table_shared_id);
return replicated_tables.getDataPaths(table_zk_path);
}

View File

@ -29,16 +29,16 @@ public:
Strings waitForStage(const String & stage_to_wait) override;
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
void addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;
Strings getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const override;
Strings getReplicatedPartNames(const String & table_zk_path, const String & replica_name) const override;
void addReplicatedMutations(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
void addReplicatedMutations(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector<MutationInfo> & mutations) override;
std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const override;
std::vector<MutationInfo> getReplicatedMutations(const String & table_zk_path, const String & replica_name) const override;
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedDataPath(const String & table_zk_path, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_zk_path) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override;

View File

@ -358,7 +358,7 @@ String BackupCoordinationRemote::deserializeFromMultipleZooKeeperNodes(const Str
void BackupCoordinationRemote::addReplicatedPartNames(
const String & table_shared_id,
const String & table_zk_path,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums)
@ -374,22 +374,22 @@ void BackupCoordinationRemote::addReplicatedPartNames(
[&, &zk = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/repl_part_names/" + escapeForFileName(table_shared_id);
String path = zookeeper_path + "/repl_part_names/" + escapeForFileName(table_zk_path);
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(replica_name);
zk->createIfNotExists(path, ReplicatedPartNames::serialize(part_names_and_checksums, table_name_for_logs));
});
}
Strings BackupCoordinationRemote::getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const
Strings BackupCoordinationRemote::getReplicatedPartNames(const String & table_zk_path, const String & replica_name) const
{
std::lock_guard lock{replicated_tables_mutex};
prepareReplicatedTables();
return replicated_tables->getPartNames(table_shared_id, replica_name);
return replicated_tables->getPartNames(table_zk_path, replica_name);
}
void BackupCoordinationRemote::addReplicatedMutations(
const String & table_shared_id,
const String & table_zk_path,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<MutationInfo> & mutations)
@ -405,23 +405,23 @@ void BackupCoordinationRemote::addReplicatedMutations(
[&, &zk = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/repl_mutations/" + escapeForFileName(table_shared_id);
String path = zookeeper_path + "/repl_mutations/" + escapeForFileName(table_zk_path);
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(replica_name);
zk->createIfNotExists(path, ReplicatedMutations::serialize(mutations, table_name_for_logs));
});
}
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationRemote::getReplicatedMutations(const String & table_shared_id, const String & replica_name) const
std::vector<IBackupCoordination::MutationInfo> BackupCoordinationRemote::getReplicatedMutations(const String & table_zk_path, const String & replica_name) const
{
std::lock_guard lock{replicated_tables_mutex};
prepareReplicatedTables();
return replicated_tables->getMutations(table_shared_id, replica_name);
return replicated_tables->getMutations(table_zk_path, replica_name);
}
void BackupCoordinationRemote::addReplicatedDataPath(
const String & table_shared_id, const String & data_path)
const String & table_zk_path, const String & data_path)
{
{
std::lock_guard lock{replicated_tables_mutex};
@ -434,18 +434,18 @@ void BackupCoordinationRemote::addReplicatedDataPath(
[&, &zk = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/repl_data_paths/" + escapeForFileName(table_shared_id);
String path = zookeeper_path + "/repl_data_paths/" + escapeForFileName(table_zk_path);
zk->createIfNotExists(path, "");
path += "/" + escapeForFileName(data_path);
zk->createIfNotExists(path, "");
});
}
Strings BackupCoordinationRemote::getReplicatedDataPaths(const String & table_shared_id) const
Strings BackupCoordinationRemote::getReplicatedDataPaths(const String & table_zk_path) const
{
std::lock_guard lock{replicated_tables_mutex};
prepareReplicatedTables();
return replicated_tables->getDataPaths(table_shared_id);
return replicated_tables->getDataPaths(table_zk_path);
}
@ -464,16 +464,16 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/repl_part_names";
for (const String & escaped_table_shared_id : zk->getChildren(path))
for (const String & escaped_table_zk_path : zk->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
String table_zk_path = unescapeForFileName(escaped_table_zk_path);
String path2 = path + "/" + escaped_table_zk_path;
for (const String & escaped_replica_name : zk->getChildren(path2))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto part_names = ReplicatedPartNames::deserialize(zk->get(path2 + "/" + escaped_replica_name));
part_names_for_replicated_tables.push_back(
{table_shared_id, part_names.table_name_for_logs, replica_name, part_names.part_names_and_checksums});
{table_zk_path, part_names.table_name_for_logs, replica_name, part_names.part_names_and_checksums});
}
}
});
@ -489,16 +489,16 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/repl_mutations";
for (const String & escaped_table_shared_id : zk->getChildren(path))
for (const String & escaped_table_zk_path : zk->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
String table_zk_path = unescapeForFileName(escaped_table_zk_path);
String path2 = path + "/" + escaped_table_zk_path;
for (const String & escaped_replica_name : zk->getChildren(path2))
{
String replica_name = unescapeForFileName(escaped_replica_name);
auto mutations = ReplicatedMutations::deserialize(zk->get(path2 + "/" + escaped_replica_name));
mutations_for_replicated_tables.push_back(
{table_shared_id, mutations.table_name_for_logs, replica_name, mutations.mutations});
{table_zk_path, mutations.table_name_for_logs, replica_name, mutations.mutations});
}
}
});
@ -514,14 +514,14 @@ void BackupCoordinationRemote::prepareReplicatedTables() const
with_retries.renewZooKeeper(zk);
String path = zookeeper_path + "/repl_data_paths";
for (const String & escaped_table_shared_id : zk->getChildren(path))
for (const String & escaped_table_zk_path : zk->getChildren(path))
{
String table_shared_id = unescapeForFileName(escaped_table_shared_id);
String path2 = path + "/" + escaped_table_shared_id;
String table_zk_path = unescapeForFileName(escaped_table_zk_path);
String path2 = path + "/" + escaped_table_zk_path;
for (const String & escaped_data_path : zk->getChildren(path2))
{
String data_path = unescapeForFileName(escaped_data_path);
data_paths_for_replicated_tables.push_back({table_shared_id, data_path});
data_paths_for_replicated_tables.push_back({table_zk_path, data_path});
}
}
});

View File

@ -41,23 +41,23 @@ public:
Strings waitForStage(const String & stage_to_wait, std::chrono::milliseconds timeout) override;
void addReplicatedPartNames(
const String & table_shared_id,
const String & table_zk_path,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;
Strings getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const override;
Strings getReplicatedPartNames(const String & table_zk_path, const String & replica_name) const override;
void addReplicatedMutations(
const String & table_shared_id,
const String & table_zk_path,
const String & table_name_for_logs,
const String & replica_name,
const std::vector<MutationInfo> & mutations) override;
std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const override;
std::vector<MutationInfo> getReplicatedMutations(const String & table_zk_path, const String & replica_name) const override;
void addReplicatedDataPath(const String & table_shared_id, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_shared_id) const override;
void addReplicatedDataPath(const String & table_zk_path, const String & data_path) override;
Strings getReplicatedDataPaths(const String & table_zk_path) const override;
void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) override;
Strings getReplicatedAccessFilePaths(const String & access_zk_path, AccessEntityType access_entity_type) const override;

View File

@ -151,7 +151,7 @@ BackupCoordinationReplicatedTables::~BackupCoordinationReplicatedTables() = defa
void BackupCoordinationReplicatedTables::addPartNames(PartNamesForTableReplica && part_names)
{
const auto & table_shared_id = part_names.table_shared_id;
const auto & table_zk_path = part_names.table_zk_path;
const auto & table_name_for_logs = part_names.table_name_for_logs;
const auto & replica_name = part_names.replica_name;
const auto & part_names_and_checksums = part_names.part_names_and_checksums;
@ -159,7 +159,7 @@ void BackupCoordinationReplicatedTables::addPartNames(PartNamesForTableReplica &
if (prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addPartNames() must not be called after preparing");
auto & table_info = table_infos[table_shared_id];
auto & table_info = table_infos[table_zk_path];
table_info.table_name_for_logs = table_name_for_logs;
if (!table_info.covered_parts_finder)
@ -200,11 +200,11 @@ void BackupCoordinationReplicatedTables::addPartNames(PartNamesForTableReplica &
}
}
Strings BackupCoordinationReplicatedTables::getPartNames(const String & table_shared_id, const String & replica_name) const
Strings BackupCoordinationReplicatedTables::getPartNames(const String & table_zk_path, const String & replica_name) const
{
prepare();
auto it = table_infos.find(table_shared_id);
auto it = table_infos.find(table_zk_path);
if (it == table_infos.end())
return {};
@ -218,7 +218,7 @@ Strings BackupCoordinationReplicatedTables::getPartNames(const String & table_sh
void BackupCoordinationReplicatedTables::addMutations(MutationsForTableReplica && mutations_for_table_replica)
{
const auto & table_shared_id = mutations_for_table_replica.table_shared_id;
const auto & table_zk_path = mutations_for_table_replica.table_zk_path;
const auto & table_name_for_logs = mutations_for_table_replica.table_name_for_logs;
const auto & replica_name = mutations_for_table_replica.replica_name;
const auto & mutations = mutations_for_table_replica.mutations;
@ -226,7 +226,7 @@ void BackupCoordinationReplicatedTables::addMutations(MutationsForTableReplica &
if (prepared)
throw Exception(ErrorCodes::LOGICAL_ERROR, "addMutations() must not be called after preparing");
auto & table_info = table_infos[table_shared_id];
auto & table_info = table_infos[table_zk_path];
table_info.table_name_for_logs = table_name_for_logs;
for (const auto & [mutation_id, mutation_entry] : mutations)
table_info.mutations.emplace(mutation_id, mutation_entry);
@ -236,11 +236,11 @@ void BackupCoordinationReplicatedTables::addMutations(MutationsForTableReplica &
}
std::vector<MutationInfo>
BackupCoordinationReplicatedTables::getMutations(const String & table_shared_id, const String & replica_name) const
BackupCoordinationReplicatedTables::getMutations(const String & table_zk_path, const String & replica_name) const
{
prepare();
auto it = table_infos.find(table_shared_id);
auto it = table_infos.find(table_zk_path);
if (it == table_infos.end())
return {};
@ -257,16 +257,16 @@ BackupCoordinationReplicatedTables::getMutations(const String & table_shared_id,
void BackupCoordinationReplicatedTables::addDataPath(DataPathForTableReplica && data_path_for_table_replica)
{
const auto & table_shared_id = data_path_for_table_replica.table_shared_id;
const auto & table_zk_path = data_path_for_table_replica.table_zk_path;
const auto & data_path = data_path_for_table_replica.data_path;
auto & table_info = table_infos[table_shared_id];
auto & table_info = table_infos[table_zk_path];
table_info.data_paths.emplace(data_path);
}
Strings BackupCoordinationReplicatedTables::getDataPaths(const String & table_shared_id) const
Strings BackupCoordinationReplicatedTables::getDataPaths(const String & table_zk_path) const
{
auto it = table_infos.find(table_shared_id);
auto it = table_infos.find(table_zk_path);
if (it == table_infos.end())
return {};

View File

@ -40,7 +40,7 @@ public:
struct PartNamesForTableReplica
{
String table_shared_id;
String table_zk_path;
String table_name_for_logs;
String replica_name;
std::vector<PartNameAndChecksum> part_names_and_checksums;
@ -55,13 +55,13 @@ public:
/// Returns the names of the parts which a specified replica of a replicated table should put to the backup.
/// This is the same list as it was added by call of the function addPartNames() but without duplications and without
/// parts covered by another parts.
Strings getPartNames(const String & table_shared_id, const String & replica_name) const;
Strings getPartNames(const String & table_zk_path, const String & replica_name) const;
using MutationInfo = IBackupCoordination::MutationInfo;
struct MutationsForTableReplica
{
String table_shared_id;
String table_zk_path;
String table_name_for_logs;
String replica_name;
std::vector<MutationInfo> mutations;
@ -71,11 +71,11 @@ public:
void addMutations(MutationsForTableReplica && mutations_for_table_replica);
/// Returns all mutations of a replicated table which are not finished for some data parts added by addReplicatedPartNames().
std::vector<MutationInfo> getMutations(const String & table_shared_id, const String & replica_name) const;
std::vector<MutationInfo> getMutations(const String & table_zk_path, const String & replica_name) const;
struct DataPathForTableReplica
{
String table_shared_id;
String table_zk_path;
String data_path;
};
@ -85,7 +85,7 @@ public:
void addDataPath(DataPathForTableReplica && data_path_for_table_replica);
/// Returns all the data paths in backup added for a replicated table (see also addReplicatedDataPath()).
Strings getDataPaths(const String & table_shared_id) const;
Strings getDataPaths(const String & table_zk_path) const;
private:
void prepare() const;
@ -110,7 +110,7 @@ private:
std::unordered_set<String> data_paths;
};
std::map<String /* table_shared_id */, TableInfo> table_infos; /// Should be ordered because we need this map to be in the same order on every replica.
std::map<String /* table_zk_path */, TableInfo> table_infos; /// Should be ordered because we need this map to be in the same order on every replica.
mutable bool prepared = false;
};

View File

@ -11,6 +11,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <base/chrono_io.h>
#include <base/insertAtEnd.h>
#include <base/scope_guard.h>
@ -758,7 +759,7 @@ void BackupEntriesCollector::makeBackupEntriesForDatabasesDefs()
checkIsQueryCancelled();
ASTPtr new_create_query = database_info.create_database_query;
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext(), nullptr);
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext());
renameDatabaseAndTableNameInCreateQuery(new_create_query, renaming_map, context->getGlobalContext());
const String & metadata_path_in_backup = database_info.metadata_path_in_backup;
@ -775,7 +776,8 @@ void BackupEntriesCollector::makeBackupEntriesForTablesDefs()
checkIsQueryCancelled();
ASTPtr new_create_query = table_info.create_table_query;
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext(), &table_info.replicated_table_shared_id);
table_info.replicated_table_zk_path = tryExtractZkPathFromCreateQuery(*new_create_query, context->getGlobalContext());
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext());
renameDatabaseAndTableNameInCreateQuery(new_create_query, renaming_map, context->getGlobalContext());
const String & metadata_path_in_backup = table_info.metadata_path_in_backup;
@ -814,8 +816,8 @@ void BackupEntriesCollector::makeBackupEntriesForTableData(const QualifiedTableN
/// If this table is replicated in this case we call IBackupCoordination::addReplicatedDataPath() which will cause
/// other replicas to fill the storage's data in the backup.
/// If this table is not replicated we'll do nothing leaving the storage's data empty in the backup.
if (table_info.replicated_table_shared_id)
backup_coordination->addReplicatedDataPath(*table_info.replicated_table_shared_id, data_path_in_backup);
if (table_info.replicated_table_zk_path)
backup_coordination->addReplicatedDataPath(*table_info.replicated_table_zk_path, data_path_in_backup);
return;
}

View File

@ -164,7 +164,7 @@ private:
ASTPtr create_table_query;
String metadata_path_in_backup;
std::filesystem::path data_path_in_backup;
std::optional<String> replicated_table_shared_id;
std::optional<String> replicated_table_zk_path;
std::optional<ASTs> partitions;
};

View File

@ -103,7 +103,7 @@ bool compareRestoredTableDef(const IAST & restored_table_create_query, const IAS
auto adjust_before_comparison = [&](const IAST & query) -> ASTPtr
{
auto new_query = query.clone();
adjustCreateQueryForBackup(new_query, global_context, nullptr);
adjustCreateQueryForBackup(new_query, global_context);
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*new_query);
create.setUUID({});
create.if_not_exists = false;

View File

@ -27,9 +27,6 @@ namespace
{
/// Precondition: engine_name.starts_with("Replicated") && engine_name.ends_with("MergeTree")
if (data.replicated_table_shared_id)
*data.replicated_table_shared_id = StorageReplicatedMergeTree::tryGetTableSharedIDFromCreateQuery(*data.create_query, data.global_context);
/// Before storing the metadata in a backup we have to find a zookeeper path in its definition and turn the table's UUID in there
/// back into "{uuid}", and also we probably can remove the zookeeper path and replica name if they're default.
/// So we're kind of reverting what we had done to the table's definition in registerStorageMergeTree.cpp before we created this table.
@ -98,12 +95,9 @@ void DDLAdjustingForBackupVisitor::visit(ASTPtr ast, const Data & data)
visitCreateQuery(*create, data);
}
void adjustCreateQueryForBackup(ASTPtr ast, const ContextPtr & global_context, std::optional<String> * replicated_table_shared_id)
void adjustCreateQueryForBackup(ASTPtr ast, const ContextPtr & global_context)
{
if (replicated_table_shared_id)
*replicated_table_shared_id = {};
DDLAdjustingForBackupVisitor::Data data{ast, global_context, replicated_table_shared_id};
DDLAdjustingForBackupVisitor::Data data{ast, global_context};
DDLAdjustingForBackupVisitor::Visitor{data}.visit(ast);
}

View File

@ -12,9 +12,7 @@ class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Changes a create query to a form which is appropriate or suitable for saving in a backup.
/// Also extracts a replicated table's shared ID from the create query if this is a create query for a replicated table.
/// `replicated_table_shared_id` can be null if you don't need that.
void adjustCreateQueryForBackup(ASTPtr ast, const ContextPtr & global_context, std::optional<String> * replicated_table_shared_id);
void adjustCreateQueryForBackup(ASTPtr ast, const ContextPtr & global_context);
/// Visits ASTCreateQuery and changes it to a form which is appropriate or suitable for saving in a backup.
class DDLAdjustingForBackupVisitor
@ -24,7 +22,6 @@ public:
{
ASTPtr create_query;
ContextPtr global_context;
std::optional<String> * replicated_table_shared_id = nullptr;
};
using Visitor = InDepthNodeVisitor<DDLAdjustingForBackupVisitor, false>;

View File

@ -36,13 +36,13 @@ public:
/// Multiple replicas of the replicated table call this function and then the added part names can be returned by call of the function
/// getReplicatedPartNames().
/// Checksums are used only to control that parts under the same names on different replicas are the same.
virtual void addReplicatedPartNames(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name,
virtual void addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) = 0;
/// Returns the names of the parts which a specified replica of a replicated table should put to the backup.
/// This is the same list as it was added by call of the function addReplicatedPartNames() but without duplications and without
/// parts covered by another parts.
virtual Strings getReplicatedPartNames(const String & table_shared_id, const String & replica_name) const = 0;
virtual Strings getReplicatedPartNames(const String & table_zk_path, const String & replica_name) const = 0;
struct MutationInfo
{
@ -51,10 +51,10 @@ public:
};
/// Adds information about mutations of a replicated table.
virtual void addReplicatedMutations(const String & table_shared_id, const String & table_name_for_logs, const String & replica_name, const std::vector<MutationInfo> & mutations) = 0;
virtual void addReplicatedMutations(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name, const std::vector<MutationInfo> & mutations) = 0;
/// Returns all mutations of a replicated table which are not finished for some data parts added by addReplicatedPartNames().
virtual std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const = 0;
virtual std::vector<MutationInfo> getReplicatedMutations(const String & table_zk_path, const String & replica_name) const = 0;
/// Adds information about KeeperMap tables
virtual void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) = 0;
@ -65,10 +65,10 @@ public:
/// Adds a data path in backup for a replicated table.
/// Multiple replicas of the replicated table call this function and then all the added paths can be returned by call of the function
/// getReplicatedDataPaths().
virtual void addReplicatedDataPath(const String & table_shared_id, const String & data_path) = 0;
virtual void addReplicatedDataPath(const String & table_zk_path, const String & data_path) = 0;
/// Returns all the data paths in backup added for a replicated table (see also addReplicatedDataPath()).
virtual Strings getReplicatedDataPaths(const String & table_shared_id) const = 0;
virtual Strings getReplicatedDataPaths(const String & table_zk_path) const = 0;
/// Adds a path to access.txt file keeping access entities of a ReplicatedAccessStorage.
virtual void addReplicatedAccessFilePath(const String & access_zk_path, AccessEntityType access_entity_type, const String & file_path) = 0;

View File

@ -187,7 +187,6 @@ namespace ErrorCodes
extern const int NOT_INITIALIZED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int TABLE_IS_DROPPED;
extern const int CANNOT_BACKUP_TABLE;
extern const int SUPPORT_IS_DISABLED;
extern const int FAULT_INJECTED;
extern const int CANNOT_FORGET_PARTITION;
@ -310,8 +309,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
mode,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_name(zkutil::extractZooKeeperName(zookeeper_path_))
, zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
, full_zookeeper_path(zookeeper_path_)
, zookeeper_name(zkutil::extractZooKeeperName(full_zookeeper_path))
, zookeeper_path(zkutil::extractZooKeeperPath(full_zookeeper_path, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
, replica_name(replica_name_)
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
, reader(*this)
@ -9242,24 +9242,6 @@ void StorageReplicatedMergeTree::createTableSharedID() const
}
std::optional<String> StorageReplicatedMergeTree::tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context)
{
auto zk_path = tryExtractZkPathFromCreateQuery(create_query, global_context);
if (!zk_path)
return {};
String zk_name = zkutil::extractZooKeeperName(*zk_path);
zk_path = zkutil::extractZooKeeperPath(*zk_path, false, nullptr);
zkutil::ZooKeeperPtr zookeeper = (zk_name == getDefaultZooKeeperName()) ? global_context->getZooKeeper() : global_context->getAuxiliaryZooKeeper(zk_name);
String id;
if (!zookeeper->tryGet(fs::path(*zk_path) / "table_shared_id", id))
return {};
return id;
}
zkutil::EphemeralNodeHolderPtr StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
{
auto settings = getSettings();
@ -10419,21 +10401,10 @@ void StorageReplicatedMergeTree::adjustCreateQueryForBackup(ASTPtr & create_quer
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this, current_metadata).checkAndFindDiff(metadata_from_entry, current_metadata->getColumns(), getContext());
auto adjusted_metadata = metadata_diff.getNewMetadata(columns_from_entry, getContext(), *current_metadata);
applyMetadataChangesToCreateQuery(create_query, adjusted_metadata);
/// Check that tryGetTableSharedIDFromCreateQuery() works for this storage.
auto actual_table_shared_id = getTableSharedID();
auto expected_table_shared_id = tryGetTableSharedIDFromCreateQuery(*create_query, getContext());
if (actual_table_shared_id != expected_table_shared_id)
{
throw Exception(ErrorCodes::CANNOT_BACKUP_TABLE, "Table {} has its shared ID different from one from the create query: "
"actual shared id = {}, expected shared id = {}, create query = {}",
getStorageID().getNameForLogs(), actual_table_shared_id, expected_table_shared_id.value_or("nullopt"),
create_query);
}
}
catch (...)
{
/// We can continue making a backup with non-adjusted name.
/// We can continue making a backup with non-adjusted query.
tryLogCurrentException(log, "Failed to adjust the create query of this table for backup");
}
}
@ -10459,8 +10430,8 @@ void StorageReplicatedMergeTree::backupData(
auto parts_backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", backup_settings, read_settings, local_context);
auto coordination = backup_entries_collector.getBackupCoordination();
String shared_id = getTableSharedID();
coordination->addReplicatedDataPath(shared_id, data_path_in_backup);
coordination->addReplicatedDataPath(full_zookeeper_path, data_path_in_backup);
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
std::vector<PartNameAndChecksum> part_names_with_hashes;
@ -10469,7 +10440,7 @@ void StorageReplicatedMergeTree::backupData(
part_names_with_hashes.emplace_back(PartNameAndChecksum{part_backup_entries.part_name, part_backup_entries.part_checksum});
/// Send our list of part names to the coordination (to compare with other replicas).
coordination->addReplicatedPartNames(shared_id, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
coordination->addReplicatedPartNames(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
/// Send a list of mutations to the coordination too (we need to find the mutations which are not finished for added part names).
{
@ -10511,25 +10482,25 @@ void StorageReplicatedMergeTree::backupData(
}
if (!mutation_infos.empty())
coordination->addReplicatedMutations(shared_id, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
coordination->addReplicatedMutations(full_zookeeper_path, getStorageID().getFullTableName(), getReplicaName(), mutation_infos);
}
}
/// This task will be executed after all replicas have collected their parts and the coordination is ready to
/// give us the final list of parts to add to the BackupEntriesCollector.
auto post_collecting_task = [shared_id,
auto post_collecting_task = [my_full_zookeeper_path = full_zookeeper_path,
my_replica_name = getReplicaName(),
coordination,
my_parts_backup_entries = std::move(parts_backup_entries),
&backup_entries_collector]()
{
Strings data_paths = coordination->getReplicatedDataPaths(shared_id);
Strings data_paths = coordination->getReplicatedDataPaths(my_full_zookeeper_path);
std::vector<fs::path> data_paths_fs;
data_paths_fs.reserve(data_paths.size());
for (const auto & data_path : data_paths)
data_paths_fs.push_back(data_path);
Strings part_names = coordination->getReplicatedPartNames(shared_id, my_replica_name);
Strings part_names = coordination->getReplicatedPartNames(my_full_zookeeper_path, my_replica_name);
std::unordered_set<std::string_view> part_names_set{part_names.begin(), part_names.end()};
for (const auto & part_backup_entries : my_parts_backup_entries)
@ -10542,7 +10513,7 @@ void StorageReplicatedMergeTree::backupData(
}
}
auto mutation_infos = coordination->getReplicatedMutations(shared_id, my_replica_name);
auto mutation_infos = coordination->getReplicatedMutations(my_full_zookeeper_path, my_replica_name);
for (const auto & mutation_info : mutation_infos)
{
auto backup_entry = ReplicatedMergeTreeMutationEntry::parse(mutation_info.entry, mutation_info.id).backup();
@ -10556,8 +10527,7 @@ void StorageReplicatedMergeTree::backupData(
void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
String full_zk_path = getZooKeeperName() + getZooKeeperPath();
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(full_zk_path))
if (!restorer.getRestoreCoordination()->acquireInsertingDataIntoReplicatedTable(full_zookeeper_path))
{
/// Other replica is already restoring the data of this table.
/// We'll get them later due to replication, it's not necessary to read it from the backup.

View File

@ -330,17 +330,14 @@ public:
// Return default or custom zookeeper name for table
const String & getZooKeeperName() const { return zookeeper_name; }
const String & getZooKeeperPath() const { return zookeeper_path; }
const String & getFullZooKeeperPath() const { return full_zookeeper_path; }
// Return table id, common for different replicas
String getTableSharedID() const override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
/// Returns the same as getTableSharedID(), but extracts it from a create query.
static std::optional<String> tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context);
static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }
/// Check if there are new broken disks and enqueue part recovery tasks.
@ -420,9 +417,11 @@ private:
bool is_readonly_metric_set = false;
const String full_zookeeper_path;
static const String default_zookeeper_name;
const String zookeeper_name;
const String zookeeper_path;
const String replica_name;
const String replica_path;