Rename uncommitted state

This commit is contained in:
Antonio Andelic 2022-05-11 09:08:39 +00:00
parent 15672a8374
commit 0fb11ab3ff
3 changed files with 40 additions and 41 deletions

View File

@ -29,7 +29,6 @@ public:
void preprocess(uint64_t log_idx, nuraft::buffer & data);
/// Currently not supported
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT
@ -37,7 +36,6 @@ public:
/// Save new cluster config to our snapshot (copy of the config stored in StateManager)
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT
/// Currently not supported
void rollback(uint64_t log_idx, nuraft::buffer & data) override;
uint64_t last_commit_index() override { return last_committed_idx; }

View File

@ -189,7 +189,7 @@ struct Overloaded : Ts...
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
std::shared_ptr<KeeperStorage::Node> KeeperStorage::CurrentNodes::getNode(StringRef path)
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(StringRef path)
{
std::shared_ptr<Node> node{nullptr};
@ -228,7 +228,7 @@ std::shared_ptr<KeeperStorage::Node> KeeperStorage::CurrentNodes::getNode(String
return node;
}
bool KeeperStorage::CurrentNodes::hasNode(StringRef path) const
bool KeeperStorage::UncommittedState::hasNode(StringRef path) const
{
bool exists = storage.container.contains(std::string{path});
applyDeltas(
@ -250,7 +250,7 @@ bool KeeperStorage::CurrentNodes::hasNode(StringRef path) const
return exists;
}
Coordination::ACLs KeeperStorage::CurrentNodes::getACLs(StringRef path) const
Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const
{
std::optional<uint64_t> acl_id;
if (auto maybe_node_it = storage.container.find(path); maybe_node_it != storage.container.end())
@ -290,7 +290,7 @@ namespace
Coordination::Error KeeperStorage::commit(int64_t commit_zxid, int64_t session_id)
{
for (auto & delta : current_nodes.deltas)
for (auto & delta : uncommitted_state.deltas)
{
if (delta.zxid > commit_zxid)
break;
@ -519,7 +519,7 @@ namespace
return storage.acl_map.convertNumber(node_it->value.acl_id);
}
return storage.current_nodes.getACLs(path);
return storage.uncommitted_state.getACLs(path);
}
}
@ -529,7 +529,7 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session
if (node_acls.empty())
return true;
if (current_nodes.hasACL(session_id, is_local, [](const auto & auth_id) { return auth_id.scheme == "super"; }))
if (uncommitted_state.hasACL(session_id, is_local, [](const auto & auth_id) { return auth_id.scheme == "super"; }))
return true;
@ -540,7 +540,7 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session
if (node_acl.scheme == "world" && node_acl.id == "anyone")
return true;
if (current_nodes.hasACL(
if (uncommitted_state.hasACL(
session_id,
is_local,
[&](const auto & auth_id) { return auth_id.scheme == node_acl.scheme && auth_id.id == node_acl.id; }))
@ -575,7 +575,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
std::vector<KeeperStorage::Delta> new_deltas;
auto parent_path = parentPath(request.path);
auto parent_node = storage.current_nodes.getNode(parent_path);
auto parent_node = storage.uncommitted_state.getNode(parent_path);
if (parent_node == nullptr)
return {{zxid, Coordination::Error::ZNONODE}};
@ -594,7 +594,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
path_created += seq_num_str.str();
}
if (storage.current_nodes.hasNode(path_created))
if (storage.uncommitted_state.hasNode(path_created))
return {{zxid, Coordination::Error::ZNODEEXISTS}};
if (getBaseName(path_created).size == 0)
@ -653,7 +653,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
return response_ptr;
}
const auto & deltas = storage.current_nodes.deltas;
const auto & deltas = storage.uncommitted_state.deltas;
auto create_delta_it = std::find_if(
deltas.begin(),
deltas.end(),
@ -683,7 +683,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
{
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
if (!storage.uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {};
@ -759,7 +759,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
const auto update_parent_pzxid = [&]()
{
auto parent_path = parentPath(request.path);
if (!storage.current_nodes.hasNode(parent_path))
if (!storage.uncommitted_state.hasNode(parent_path))
return;
new_deltas.emplace_back(
@ -772,7 +772,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
}});
};
auto node = storage.current_nodes.getNode(request.path);
auto node = storage.uncommitted_state.getNode(request.path);
if (!node)
{
@ -827,7 +827,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
{
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
if (!storage.uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {};
@ -897,10 +897,10 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta> new_deltas;
if (!storage.current_nodes.hasNode(request.path))
if (!storage.uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
auto node = storage.current_nodes.getNode(request.path);
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {{zxid, Coordination::Error::ZBADVERSION}};
@ -978,7 +978,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
{
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
if (!storage.uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {};
@ -1058,10 +1058,10 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
{
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
if (!storage.uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
auto node = storage.current_nodes.getNode(request.path);
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {{zxid, Coordination::Error::ZBADVERSION}};
@ -1135,11 +1135,11 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
{
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
auto & current_nodes = storage.current_nodes;
if (!current_nodes.hasNode(request.path))
auto & uncommitted_state = storage.uncommitted_state;
if (!uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
auto node = current_nodes.getNode(request.path);
auto node = uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.aversion)
return {{zxid, Coordination::Error::ZBADVERSION}};
@ -1192,7 +1192,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
{
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
if (!storage.uncommitted_state.hasNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {};
@ -1292,7 +1292,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
// manually add deltas so that the result of previous request in the transaction is used in the next request
auto & saved_deltas = storage.current_nodes.deltas;
auto & saved_deltas = storage.uncommitted_state.deltas;
std::vector<Coordination::Error> response_errors;
response_errors.reserve(concrete_requests.size());
@ -1330,7 +1330,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
auto & deltas = storage.current_nodes.deltas;
auto & deltas = storage.uncommitted_state.deltas;
if (auto * failed_multi = std::get_if<KeeperStorage::FailedMultiDelta>(&deltas.front().operation))
{
for (size_t i = 0; i < concrete_requests.size(); ++i)
@ -1440,7 +1440,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
else
{
KeeperStorage::AuthID new_auth{auth_request.scheme, digest};
if (!storage.current_nodes.hasACL(session_id, false, [&](const auto & auth_id) { return new_auth == auth_id; }))
if (!storage.uncommitted_state.hasACL(session_id, false, [&](const auto & auth_id) { return new_auth == auth_id; }))
new_deltas.emplace_back(zxid, KeeperStorage::AddAuthDelta{session_id, std::move(new_auth)});
}
@ -1546,13 +1546,13 @@ void KeeperStorage::preprocessRequest(
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
auto & deltas = current_nodes.deltas;
auto & deltas = uncommitted_state.deltas;
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
{
for (const auto & ephemeral_path : session_ephemerals->second)
{
if (current_nodes.hasNode(ephemeral_path))
if (uncommitted_state.hasNode(ephemeral_path))
{
deltas.emplace_back(
parentPath(ephemeral_path).toString(),
@ -1573,13 +1573,13 @@ void KeeperStorage::preprocessRequest(
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
{
current_nodes.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
return;
}
auto new_deltas = request_processor->preprocess(*this, new_last_zxid, session_id, time);
current_nodes.deltas.insert(
current_nodes.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
uncommitted_state.deltas.insert(
uncommitted_state.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
}
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
@ -1606,7 +1606,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
{
commit(zxid, session_id);
for (const auto & delta : current_nodes.deltas)
for (const auto & delta : uncommitted_state.deltas)
{
if (delta.zxid > zxid)
break;
@ -1618,7 +1618,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
}
}
std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
clearDeadWatches(session_id);
auto auth_it = session_and_auth.find(session_id);
@ -1663,7 +1663,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
else
{
response = request_processor->process(*this, zxid, session_id, time);
std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
}
/// Watches for this requests are added to the watches lists
@ -1706,8 +1706,8 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid)
{
// we can only rollback the last zxid (if there is any)
// if there is a delta with a larger zxid, we have invalid state
assert(current_nodes.deltas.empty() || current_nodes.deltas.back().zxid <= rollback_zxid);
std::erase_if(current_nodes.deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; });
assert(uncommitted_state.deltas.empty() || uncommitted_state.deltas.back().zxid <= rollback_zxid);
std::erase_if(uncommitted_state.deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; });
}
void KeeperStorage::clearDeadWatches(int64_t session_id)

View File

@ -136,6 +136,7 @@ public:
std::vector<Coordination::Error> error_codes;
};
// Denotes end of a subrequest in multi request
struct SubDeltaEnd
{
};
@ -162,9 +163,9 @@ public:
Operation operation;
};
struct CurrentNodes
struct UncommittedState
{
explicit CurrentNodes(KeeperStorage & storage_) : storage(storage_) { }
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
template <typename Visitor>
void applyDeltas(StringRef path, const Visitor & visitor) const
@ -207,7 +208,7 @@ public:
KeeperStorage & storage;
};
CurrentNodes current_nodes{*this};
UncommittedState uncommitted_state{*this};
Coordination::Error commit(int64_t zxid, int64_t session_id);