change traverse

This commit is contained in:
Mikhail Artemenko 2024-09-06 00:08:08 +00:00
parent c3cc2a3fb1
commit cb1c11c74a

View File

@ -1642,14 +1642,17 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage
}
private:
using SNode = typename Storage::Node;
class ToDeleteTreeCollector
{
Storage & storage;
int64_t zxid;
uint32_t limit;
uint32_t nodes_observed = 0;
std::vector<typename Storage::Delta> deltas;
uint32_t max_level = 0;
uint32_t nodes_observed = 1; /// root node
std::unordered_map<uint32_t, std::vector<typename Storage::Delta>> by_level_deltas;
public:
ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_)
@ -1659,31 +1662,60 @@ private:
{
}
bool collect(StringRef root_path, const typename Storage::Node & root_node)
bool collect(StringRef root_path, const SNode & root_node)
{
if (updateStats(root_node))
std::deque<Step> steps;
steps.push_back(Step{root_path.toString(), &root_node, 0});
if (observeNode(root_node))
return true;
if (visitRocksDBNode(root_path) || visitMemNode(root_path))
return true;
while (!steps.empty())
{
Step step = std::move(steps.front());
steps.pop_front();
/// After checking committed nodes there might be some in uncommitted state,
/// for example made on the previouse step of multi transaction.
return visitRootAndUncommitted(root_path, root_node);
StringRef path = step.path;
uint32_t level = step.level;
const SNode * node = nullptr;
if (auto * rdb = std::get_if<SNode>(&step.node))
node = rdb;
else
node = std::get<const SNode *>(step.node);
chassert(!path.empty());
chassert(node != nullptr);
if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, *node, level))
return true;
}
uint32_t getNumObserved() const
{
return nodes_observed;
return false;
}
std::vector<typename Storage::Delta> extractDeltas()
{
std::vector<typename Storage::Delta> deltas;
for (ssize_t level = max_level; level >= 0; --level)
{
auto & level_deltas = by_level_deltas[static_cast<uint32_t>(level)];
deltas.insert(deltas.end(), std::make_move_iterator(level_deltas.begin()), std::make_move_iterator(level_deltas.end()));
}
return std::move(deltas);
}
private:
bool visitRocksDBNode(StringRef root_path)
struct Step
{
String path;
std::variant<SNode, const SNode *> node;
uint32_t level;
};
bool visitRocksDBNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
{
if constexpr (Storage::use_rocksdb)
{
@ -1692,25 +1724,28 @@ private:
for (auto && [child_name, node] : children)
{
auto child_path = (root_fs_path / child_name).generic_string();
if (collect(child_path, node))
if (observeNode(node))
return true;
auto child_path = (root_fs_path / child_name).generic_string();
steps.push_back(Step{std::move(child_path), std::move(node), level + 1});
}
}
return false;
}
bool visitMemNode(StringRef root_path)
bool visitMemNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
{
if constexpr (!Storage::use_rocksdb)
{
std::filesystem::path root_fs_path(root_path.toString());
auto node_it = storage.container.find(root_path);
if (node_it != storage.container.end())
{
if (node_it == storage.container.end())
return false;
auto children = node_it->value.getChildren();
for (auto && child_name : children)
{
@ -1719,59 +1754,62 @@ private:
auto child_it = storage.container.find(child_path);
chassert(child_it != storage.container.end());
if (collect(child_path, child_it->value))
if (observeNode(child_it->value))
return true;
}
steps.push_back(Step{std::move(child_path), child_it->value, level + 1});
}
}
return false;
}
bool visitRootAndUncommitted(StringRef root_path, const typename Storage::Node & root_node)
bool visitRootAndUncommitted(std::deque<Step> & steps, StringRef root_path, const SNode & root_node, uint32_t level)
{
const auto & nodes = storage.uncommitted_state.nodes;
/// nodes are sorted by paths with level locality
auto it = nodes.upper_bound(root_path.toView());
auto it = nodes.upper_bound(root_path.toString() + "/");
for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it)
{
chassert(it->second.node);
const String & path = it->first;
const typename Storage::Node & node = *it->second.node;
const SNode & node = *it->second.node;
if (updateStats(node))
if (observeNode(node))
return true;
if (visitRootAndUncommitted(path, node)) /// if child is uncommitted then all subtree is also uncommitted
return true;
steps.push_back(Step{path, &node, level + 1});
}
deltas.emplace_back(
addDelta(root_path, root_node, level);
return false;
}
void addDelta(StringRef root_path, const SNode & root_node, uint32_t level)
{
max_level = std::max(max_level, level);
by_level_deltas[level].emplace_back(
parentNodePath(root_path).toString(),
zxid,
typename Storage::UpdateNodeDelta{
[](typename Storage::Node & parent)
[](SNode & parent)
{
++parent.cversion;
parent.decreaseNumChildren();
}
});
deltas.emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()});
return false;
by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()});
}
bool updateStats(const typename Storage::Node & root_node)
bool observeNode(const SNode & node)
{
nodes_observed += 1;
if (nodes_observed + root_node.numChildren() > limit)
return true;
return false;
nodes_observed += node.numChildren();
return nodes_observed > limit;
}
};
};