Ignore system paths from snapshots and logstore

This commit is contained in:
Antonio Andelic 2022-07-23 13:46:59 +00:00
parent 46a6fbd7aa
commit 6aff87d4b5
3 changed files with 46 additions and 10 deletions

View File

@ -58,6 +58,8 @@ struct Stat
int32_t dataLength{0}; /// NOLINT
int32_t numChildren{0}; /// NOLINT
int64_t pzxid{0};
bool operator==(const Stat &) const = default;
};
enum class Error : int32_t

View File

@ -148,10 +148,26 @@ namespace
namespace
{
bool isChildSystemPath(const std::string_view path)
enum class PathMatchResult
{
auto [first_it, second_it] = std::mismatch(path.begin(), path.end(), keeper_system_path.begin(), keeper_system_path.end());
return first_it != path.end() && *first_it == '/' && second_it == keeper_system_path.end();
NOT_MATCH,
EXACT,
IS_CHILD
};
PathMatchResult matchPath(const std::string_view path, const std::string_view match_to)
{
using enum PathMatchResult;
auto [first_it, second_it] = std::mismatch(path.begin(), path.end(), match_to.begin(), match_to.end());
if (second_it != match_to.end())
return NOT_MATCH;
if (first_it == path.end())
return EXACT;
return *first_it == '/' ? IS_CHILD : NOT_MATCH;
}
}
@ -200,7 +216,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
const auto & path = it->key;
// write only the root system path because of digest
if (isChildSystemPath(path.toView()))
if (matchPath(path.toView(), keeper_system_path) == PathMatchResult::IS_CHILD)
{
++it;
continue;
@ -336,20 +352,35 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
if (recalculate_digest)
storage.nodes_digest = 0;
size_t current_size = 0;
while (current_size < snapshot_container_size)
const auto is_node_empty = [](const auto & node)
{
return node.getData().empty() && node.stat == Coordination::Stat{};
};
for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read)
{
std::string path;
readBinary(path, in);
KeeperStorage::Node node{};
readNode(node, in, current_version, storage.acl_map);
using enum PathMatchResult;
auto match_result = matchPath(path, keeper_system_path);
if ((match_result == EXACT && !is_node_empty(node)) || match_result == IS_CHILD)
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "Cannot read node on path {} from a snapshot because it is used as a system node.", path);
if (match_result == IS_CHILD)
continue;
node = KeeperStorage::Node{};
}
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
current_size++;
if (recalculate_digest)
storage.nodes_digest += node.getDigest(path);
}

View File

@ -808,8 +808,11 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
path_created += seq_num_str.str();
}
if (storage.uncommitted_state.getNode(path_created))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
if (path_created.starts_with(keeper_system_path))
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to create a node inside the internal Keeper path ({}) which is not allowed. Path: {}", keeper_system_path, path_created);
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
if (getBaseName(path_created).size == 0)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};