add remove recursive support

This commit is contained in:
Mikhail Artemenko 2024-09-04 22:59:18 +00:00
parent 6c57adee7c
commit c6777af485
2 changed files with 161 additions and 14 deletions

View File

@ -1513,29 +1513,37 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}};
}
else if (request.version != -1 && request.version != node->version)
if (request.version != -1 && request.version != node->version)
return {typename Storage::Delta{zxid, Coordination::Error::ZBADVERSION}};
else if (node->numChildren() != 0)
ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit);
bool limit_exceeded = collector.collect(request.path, *node);
if (limit_exceeded)
return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
new_deltas.emplace_back(
std::string{parentNodePath(request.path)},
zxid,
typename Storage::UpdateNodeDelta{[](typename Storage::Node & parent)
{
++parent.cversion;
parent.decreaseNumChildren();
}});
auto delete_deltas = collector.extractDeltas();
new_deltas.emplace_back(request.path, zxid, typename Storage::RemoveNodeDelta{request.version, node->ephemeralOwner()});
for (const auto & delta : delete_deltas)
std::visit(
Overloaded{
[&](const typename Storage::RemoveNodeDelta & remove_delta)
{
if (remove_delta.ephemeral_owner)
storage.unregisterEphemeralPath(remove_delta.ephemeral_owner, delta.path);
},
[](auto && /* delta */) {},
},
delta.operation);
if (node->isEphemeral())
storage.unregisterEphemeralPath(node->ephemeralOwner(), request.path);
new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end()));
digest = storage.calculateNodesDigest(digest, new_deltas);
@ -1556,6 +1564,134 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
}
private:
class ToDeleteTreeCollector
{
Storage & storage;
int64_t zxid;
uint32_t limit;
uint32_t nodes_observed = 0;
std::vector<typename Storage::Delta> deltas;
public:
ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_)
: storage(storage_)
, zxid(zxid_)
, limit(limit_)
{
}
bool collect(StringRef root_path, const typename Storage::Node & root_node)
{
if (updateStats(root_node))
return true;
if (visitRocksDBNode(root_path) || visitMemNode(root_path))
return true;
/// After checking committed nodes there might be some in uncommitted state,
/// for example made on the previouse step of multi transaction.
return visitRootAndUncommitted(root_path, root_node);
}
uint32_t getNumObserved() const
{
return nodes_observed;
}
std::vector<typename Storage::Delta> extractDeltas()
{
return std::move(deltas);
}
private:
bool visitRocksDBNode(StringRef root_path)
{
if constexpr (Storage::use_rocksdb)
{
auto children = storage.container.getChildren(root_path.toString());
for (auto && [child_path, node] : children)
if (collect(child_path, node))
return true;
}
return false;
}
bool visitMemNode(StringRef root_path)
{
if constexpr (!Storage::use_rocksdb)
{
std::filesystem::path root_fs_path(root_path.toString());
auto node_it = storage.container.find(root_path);
if (node_it != storage.container.end())
{
auto children = node_it->value.getChildren();
for (auto && child_name : children)
{
auto child_path = (root_fs_path / child_name.toView()).generic_string();
auto child_it = storage.container.find(child_path);
chassert(child_it != storage.container.end());
if (collect(child_path, child_it->value))
return true;
}
}
}
return false;
}
bool visitRootAndUncommitted(StringRef root_path, const typename Storage::Node & root_node)
{
const auto & nodes = storage.uncommitted_state.nodes;
/// nodes are sorted by paths with level locality
auto it = nodes.upper_bound(root_path.toView());
for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it)
{
chassert(it->second.node);
const typename Storage::Node & node = *it->second.node;
if (updateStats(node))
return true;
if (visitRootAndUncommitted(it->first, node)) /// if child is uncommitted then all subtree is also uncommitted
return true;
}
deltas.emplace_back(
parentNodePath(root_path).toString(),
zxid,
typename Storage::UpdateNodeDelta{
[](typename Storage::Node & parent)
{
++parent.cversion;
parent.decreaseNumChildren();
}
});
deltas.emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()});
return false;
}
bool updateStats(const typename Storage::Node & root_node)
{
nodes_observed += 1 + root_node.numChildren(); /// root + all known children
if (nodes_observed > limit)
return true;
return false;
}
};
};
template<typename Storage>

View File

@ -609,7 +609,18 @@ public:
using is_transparent = void; // required to make find() work with different type than key_type
};
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
struct PathCmp
{
using is_transparent = std::true_type;
auto operator()(const std::string_view a,
const std::string_view b) const
{
return a.size() < b.size() || (a.size() == b.size() && a < b);
}
};
mutable std::map<std::string, UncommittedNode, PathCmp> nodes;
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
std::list<Delta> deltas;