From 0fb11ab3ff4ebc09bf4cb3265d59e09ac35ce423 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 11 May 2022 09:08:39 +0000 Subject: [PATCH] Rename uncommitted state --- src/Coordination/KeeperStateMachine.h | 2 - src/Coordination/KeeperStorage.cpp | 72 +++++++++++++-------------- src/Coordination/KeeperStorage.h | 7 +-- 3 files changed, 40 insertions(+), 41 deletions(-) diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 32dadab6570..aed96a59c13 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -29,7 +29,6 @@ public: void preprocess(uint64_t log_idx, nuraft::buffer & data); - /// Currently not supported nuraft::ptr pre_commit(uint64_t log_idx, nuraft::buffer & data) override; nuraft::ptr commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT @@ -37,7 +36,6 @@ public: /// Save new cluster config to our snapshot (copy of the config stored in StateManager) void commit_config(const uint64_t log_idx, nuraft::ptr & new_conf) override; /// NOLINT - /// Currently not supported void rollback(uint64_t log_idx, nuraft::buffer & data) override; uint64_t last_commit_index() override { return last_committed_idx; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 058f9c0ce7b..fab37aec1bf 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -189,7 +189,7 @@ struct Overloaded : Ts... template Overloaded(Ts...) -> Overloaded; -std::shared_ptr KeeperStorage::CurrentNodes::getNode(StringRef path) +std::shared_ptr KeeperStorage::UncommittedState::getNode(StringRef path) { std::shared_ptr node{nullptr}; @@ -228,7 +228,7 @@ std::shared_ptr KeeperStorage::CurrentNodes::getNode(String return node; } -bool KeeperStorage::CurrentNodes::hasNode(StringRef path) const +bool KeeperStorage::UncommittedState::hasNode(StringRef path) const { bool exists = storage.container.contains(std::string{path}); applyDeltas( @@ -250,7 +250,7 @@ bool KeeperStorage::CurrentNodes::hasNode(StringRef path) const return exists; } -Coordination::ACLs KeeperStorage::CurrentNodes::getACLs(StringRef path) const +Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const { std::optional acl_id; if (auto maybe_node_it = storage.container.find(path); maybe_node_it != storage.container.end()) @@ -290,7 +290,7 @@ namespace Coordination::Error KeeperStorage::commit(int64_t commit_zxid, int64_t session_id) { - for (auto & delta : current_nodes.deltas) + for (auto & delta : uncommitted_state.deltas) { if (delta.zxid > commit_zxid) break; @@ -519,7 +519,7 @@ namespace return storage.acl_map.convertNumber(node_it->value.acl_id); } - return storage.current_nodes.getACLs(path); + return storage.uncommitted_state.getACLs(path); } } @@ -529,7 +529,7 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session if (node_acls.empty()) return true; - if (current_nodes.hasACL(session_id, is_local, [](const auto & auth_id) { return auth_id.scheme == "super"; })) + if (uncommitted_state.hasACL(session_id, is_local, [](const auto & auth_id) { return auth_id.scheme == "super"; })) return true; @@ -540,7 +540,7 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session if (node_acl.scheme == "world" && node_acl.id == "anyone") return true; - if (current_nodes.hasACL( + if (uncommitted_state.hasACL( session_id, is_local, [&](const auto & auth_id) { return auth_id.scheme == node_acl.scheme && auth_id.id == node_acl.id; })) @@ -575,7 +575,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr std::vector new_deltas; auto parent_path = parentPath(request.path); - auto parent_node = storage.current_nodes.getNode(parent_path); + auto parent_node = storage.uncommitted_state.getNode(parent_path); if (parent_node == nullptr) return {{zxid, Coordination::Error::ZNONODE}}; @@ -594,7 +594,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr path_created += seq_num_str.str(); } - if (storage.current_nodes.hasNode(path_created)) + if (storage.uncommitted_state.hasNode(path_created)) return {{zxid, Coordination::Error::ZNODEEXISTS}}; if (getBaseName(path_created).size == 0) @@ -653,7 +653,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr return response_ptr; } - const auto & deltas = storage.current_nodes.deltas; + const auto & deltas = storage.uncommitted_state.deltas; auto create_delta_it = std::find_if( deltas.begin(), deltas.end(), @@ -683,7 +683,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce { Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); - if (!storage.current_nodes.hasNode(request.path)) + if (!storage.uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; return {}; @@ -759,7 +759,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr const auto update_parent_pzxid = [&]() { auto parent_path = parentPath(request.path); - if (!storage.current_nodes.hasNode(parent_path)) + if (!storage.uncommitted_state.hasNode(parent_path)) return; new_deltas.emplace_back( @@ -772,7 +772,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr }}); }; - auto node = storage.current_nodes.getNode(request.path); + auto node = storage.uncommitted_state.getNode(request.path); if (!node) { @@ -827,7 +827,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr { Coordination::ZooKeeperExistsRequest & request = dynamic_cast(*zk_request); - if (!storage.current_nodes.hasNode(request.path)) + if (!storage.uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; return {}; @@ -897,10 +897,10 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce std::vector new_deltas; - if (!storage.current_nodes.hasNode(request.path)) + if (!storage.uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; - auto node = storage.current_nodes.getNode(request.path); + auto node = storage.uncommitted_state.getNode(request.path); if (request.version != -1 && request.version != node->stat.version) return {{zxid, Coordination::Error::ZBADVERSION}}; @@ -978,7 +978,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc { Coordination::ZooKeeperListRequest & request = dynamic_cast(*zk_request); - if (!storage.current_nodes.hasNode(request.path)) + if (!storage.uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; return {}; @@ -1058,10 +1058,10 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro { Coordination::ZooKeeperCheckRequest & request = dynamic_cast(*zk_request); - if (!storage.current_nodes.hasNode(request.path)) + if (!storage.uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; - auto node = storage.current_nodes.getNode(request.path); + auto node = storage.uncommitted_state.getNode(request.path); if (request.version != -1 && request.version != node->stat.version) return {{zxid, Coordination::Error::ZBADVERSION}}; @@ -1135,11 +1135,11 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr { Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); - auto & current_nodes = storage.current_nodes; - if (!current_nodes.hasNode(request.path)) + auto & uncommitted_state = storage.uncommitted_state; + if (!uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; - auto node = current_nodes.getNode(request.path); + auto node = uncommitted_state.getNode(request.path); if (request.version != -1 && request.version != node->stat.aversion) return {{zxid, Coordination::Error::ZBADVERSION}}; @@ -1192,7 +1192,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr { Coordination::ZooKeeperGetACLRequest & request = dynamic_cast(*zk_request); - if (!storage.current_nodes.hasNode(request.path)) + if (!storage.uncommitted_state.hasNode(request.path)) return {{zxid, Coordination::Error::ZNONODE}}; return {}; @@ -1292,7 +1292,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { // manually add deltas so that the result of previous request in the transaction is used in the next request - auto & saved_deltas = storage.current_nodes.deltas; + auto & saved_deltas = storage.uncommitted_state.deltas; std::vector response_errors; response_errors.reserve(concrete_requests.size()); @@ -1330,7 +1330,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); - auto & deltas = storage.current_nodes.deltas; + auto & deltas = storage.uncommitted_state.deltas; if (auto * failed_multi = std::get_if(&deltas.front().operation)) { for (size_t i = 0; i < concrete_requests.size(); ++i) @@ -1440,7 +1440,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc else { KeeperStorage::AuthID new_auth{auth_request.scheme, digest}; - if (!storage.current_nodes.hasACL(session_id, false, [&](const auto & auth_id) { return new_auth == auth_id; })) + if (!storage.uncommitted_state.hasACL(session_id, false, [&](const auto & auth_id) { return new_auth == auth_id; })) new_deltas.emplace_back(zxid, KeeperStorage::AddAuthDelta{session_id, std::move(new_auth)}); } @@ -1546,13 +1546,13 @@ void KeeperStorage::preprocessRequest( if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { - auto & deltas = current_nodes.deltas; + auto & deltas = uncommitted_state.deltas; auto session_ephemerals = ephemerals.find(session_id); if (session_ephemerals != ephemerals.end()) { for (const auto & ephemeral_path : session_ephemerals->second) { - if (current_nodes.hasNode(ephemeral_path)) + if (uncommitted_state.hasNode(ephemeral_path)) { deltas.emplace_back( parentPath(ephemeral_path).toString(), @@ -1573,13 +1573,13 @@ void KeeperStorage::preprocessRequest( if (check_acl && !request_processor->checkAuth(*this, session_id, false)) { - current_nodes.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); + uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); return; } auto new_deltas = request_processor->preprocess(*this, new_last_zxid, session_id, time); - current_nodes.deltas.insert( - current_nodes.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end())); + uncommitted_state.deltas.insert( + uncommitted_state.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end())); } KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( @@ -1606,7 +1606,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( { commit(zxid, session_id); - for (const auto & delta : current_nodes.deltas) + for (const auto & delta : uncommitted_state.deltas) { if (delta.zxid > zxid) break; @@ -1618,7 +1618,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( } } - std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); + std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); clearDeadWatches(session_id); auto auth_it = session_and_auth.find(session_id); @@ -1663,7 +1663,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( else { response = request_processor->process(*this, zxid, session_id, time); - std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); + std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); } /// Watches for this requests are added to the watches lists @@ -1706,8 +1706,8 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid) { // we can only rollback the last zxid (if there is any) // if there is a delta with a larger zxid, we have invalid state - assert(current_nodes.deltas.empty() || current_nodes.deltas.back().zxid <= rollback_zxid); - std::erase_if(current_nodes.deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; }); + assert(uncommitted_state.deltas.empty() || uncommitted_state.deltas.back().zxid <= rollback_zxid); + std::erase_if(uncommitted_state.deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; }); } void KeeperStorage::clearDeadWatches(int64_t session_id) diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 6a9239f4cee..cf85c366789 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -136,6 +136,7 @@ public: std::vector error_codes; }; + // Denotes end of a subrequest in multi request struct SubDeltaEnd { }; @@ -162,9 +163,9 @@ public: Operation operation; }; - struct CurrentNodes + struct UncommittedState { - explicit CurrentNodes(KeeperStorage & storage_) : storage(storage_) { } + explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { } template void applyDeltas(StringRef path, const Visitor & visitor) const @@ -207,7 +208,7 @@ public: KeeperStorage & storage; }; - CurrentNodes current_nodes{*this}; + UncommittedState uncommitted_state{*this}; Coordination::Error commit(int64_t zxid, int64_t session_id);