fix collector

This commit is contained in:
Mikhail Artemenko 2024-09-10 14:34:27 +00:00
parent 0bd7ef39c4
commit 1aacb48bcf
3 changed files with 123 additions and 28 deletions

View File

@ -832,6 +832,15 @@ std::shared_ptr<typename Container::Node> KeeperStorage<Container>::UncommittedS
return tryGetNodeFromStorage(path);
}
template<typename Container>
const typename Container::Node * KeeperStorage<Container>::UncommittedState::getActualNodeView(StringRef path, const Node & storage_node) const
{
if (auto node_it = nodes.find(path.toView()); node_it != nodes.end())
return node_it->second.node.get();
return &storage_node;
}
template<typename Container>
Coordination::ACLs KeeperStorage<Container>::UncommittedState::getACLs(StringRef path) const
{
@ -1686,7 +1695,7 @@ private:
{
std::deque<Step> steps;
if (checkLimits(root_node))
if (checkLimits(&root_node))
return true;
steps.push_back(Step{root_path.toString(), &root_node, 0});
@ -1698,17 +1707,20 @@ private:
StringRef path = step.path;
uint32_t level = step.level;
const SNode * node = nullptr;
const SNode * node_ptr = nullptr;
if (auto * rdb = std::get_if<SNode>(&step.node))
node = rdb;
node_ptr = rdb;
else
node = std::get<const SNode *>(step.node);
node_ptr = std::get<const SNode *>(step.node);
chassert(!path.empty());
chassert(node != nullptr);
chassert(node_ptr != nullptr);
if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, *node, level))
const auto & node = *node_ptr;
chassert(storage.uncommitted_state.getActualNodeView(path, node) != nullptr); /// explicitly check that node is not deleted
if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, node, level))
return true;
}
@ -1736,13 +1748,18 @@ private:
std::filesystem::path root_fs_path(root_path.toString());
auto children = storage.container.getChildren(root_path.toString());
for (auto && [child_name, node] : children)
for (auto && [child_name, child_node] : children)
{
if (checkLimits(node))
auto child_path = (root_fs_path / child_name).generic_string();
const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node);
if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction
continue;
if (checkLimits(actual_child_node_ptr))
return true;
auto child_path = (root_fs_path / child_name).generic_string();
steps.push_back(Step{std::move(child_path), std::move(node), level + 1});
steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1});
}
}
@ -1753,25 +1770,30 @@ private:
{
if constexpr (!Storage::use_rocksdb)
{
std::filesystem::path root_fs_path(root_path.toString());
auto node_it = storage.container.find(root_path);
if (node_it == storage.container.end())
return false;
std::filesystem::path root_fs_path(root_path.toString());
const auto & children = node_it->value.getChildren();
for (auto && child_name : children)
for (const auto & child_name : children)
{
auto child_path = (root_fs_path / child_name.toView()).generic_string();
auto child_it = storage.container.find(child_path);
chassert(child_it != storage.container.end());
const auto & child_node = child_it->value;
if (checkLimits(child_it->value))
const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node);
if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction
continue;
if (checkLimits(actual_child_node_ptr))
return true;
steps.push_back(Step{std::move(child_path), child_it->value, level + 1});
steps.push_back(Step{std::move(child_path), &child_node, level + 1});
}
}
@ -1787,14 +1809,18 @@ private:
for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it)
{
chassert(it->second.node);
const String & path = it->first;
const SNode & node = *it->second.node;
const auto actual_child_node_ptr = it->second.node.get();
if (checkLimits(node))
if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction
continue;
if (checkLimits(actual_child_node_ptr))
return true;
steps.push_back(Step{path, &node, level + 1});
const String & child_path = it->first;
const SNode & child_node = *it->second.node;
steps.push_back(Step{child_path, &child_node, level + 1});
}
addDelta(root_path, root_node, level);
@ -1820,9 +1846,10 @@ private:
by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()});
}
bool checkLimits(const SNode & node)
bool checkLimits(const SNode * node)
{
nodes_observed += node.numChildren();
chassert(node != nullptr);
nodes_observed += node->numChildren();
return nodes_observed > limit;
}
};

View File

@ -566,6 +566,7 @@ public:
void rollback(int64_t rollback_zxid);
std::shared_ptr<Node> getNode(StringRef path) const;
const Node * getActualNodeView(StringRef path, const Node & storage_node) const;
Coordination::ACLs getACLs(StringRef path) const;
void applyDelta(const Delta & delta);

View File

@ -3563,11 +3563,15 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest)
Storage storage{500, "", this->keeper_context};
int zxid = 0;
Coordination::Requests ops;
ops.push_back(zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent));
ops.push_back(zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent));
ops.push_back(zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral));
ops.push_back(zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral));
auto prepare_create_tree = []()
{
return Coordination::Requests{
zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral),
zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral),
};
};
const auto exists = [&](const String & path)
{
@ -3597,6 +3601,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest)
{
SCOPED_TRACE("Remove In Multi Tx");
int new_zxid = ++zxid;
auto ops = prepare_create_tree();
ops.push_back(zkutil::makeRemoveRequest("/A", -1));
const auto request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
@ -3612,6 +3617,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest)
{
SCOPED_TRACE("Recursive Remove In Multi Tx");
int new_zxid = ++zxid;
auto ops = prepare_create_tree();
ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4));
const auto request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
@ -3627,6 +3633,67 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest)
ASSERT_FALSE(exists("/A/B"));
ASSERT_FALSE(exists("/A/B/D"));
}
{
SCOPED_TRACE("Recursive Remove With Regular In Multi Tx");
int new_zxid = ++zxid;
auto ops = prepare_create_tree();
ops.push_back(zkutil::makeRemoveRequest("/A/C", -1));
ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4));
const auto request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
storage.preprocessRequest(request, 1, 0, new_zxid);
auto responses = storage.processRequest(request, 1, new_zxid);
ops.pop_back();
ops.pop_back();
ASSERT_EQ(responses.size(), 1);
ASSERT_TRUE(is_multi_ok(responses[0].response));
ASSERT_FALSE(exists("/A"));
ASSERT_FALSE(exists("/A/C"));
ASSERT_FALSE(exists("/A/B"));
ASSERT_FALSE(exists("/A/B/D"));
}
{
SCOPED_TRACE("Recursive Remove From Committed and Uncommitted states");
int create_zxid = ++zxid;
auto ops = prepare_create_tree();
/// First create nodes
const auto create_request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
storage.preprocessRequest(create_request, 1, 0, create_zxid);
auto create_responses = storage.processRequest(create_request, 1, create_zxid);
ASSERT_EQ(create_responses.size(), 1);
ASSERT_TRUE(is_multi_ok(create_responses[0].response));
ASSERT_TRUE(exists("/A"));
ASSERT_TRUE(exists("/A/C"));
ASSERT_TRUE(exists("/A/B"));
ASSERT_TRUE(exists("/A/B/D"));
/// Remove node A/C as a single remove request.
/// Remove all other as remove recursive request.
/// In this case we should list storage to understand the tree topology
/// but ignore already deleted nodes in uncommitted state.
int remove_zxid = ++zxid;
ops = {
zkutil::makeRemoveRequest("/A/C", -1),
zkutil::makeRemoveRecursiveRequest("/A", 3),
};
const auto remove_request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
storage.preprocessRequest(remove_request, 1, 0, remove_zxid);
auto remove_responses = storage.processRequest(remove_request, 1, remove_zxid);
ASSERT_EQ(remove_responses.size(), 1);
ASSERT_TRUE(is_multi_ok(remove_responses[0].response));
ASSERT_FALSE(exists("/A"));
ASSERT_FALSE(exists("/A/C"));
ASSERT_FALSE(exists("/A/B"));
ASSERT_FALSE(exists("/A/B/D"));
}
}
TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches)