From 6a962549d520e8af5734a0485a0ba749e53509eb Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 25 May 2022 16:45:32 +0200 Subject: [PATCH] Revert "Add support for preprocessing ZooKeeper operations in `clickhouse-keeper`" --- src/Coordination/KeeperServer.cpp | 17 - src/Coordination/KeeperStateMachine.cpp | 24 +- src/Coordination/KeeperStateMachine.h | 8 +- src/Coordination/KeeperStorage.cpp | 1658 ++++++----------- src/Coordination/KeeperStorage.h | 248 +-- .../WriteBufferFromNuraftBuffer.h | 1 + src/Coordination/ZooKeeperDataReader.cpp | 1 - src/Coordination/tests/gtest_coordination.cpp | 130 -- 8 files changed, 644 insertions(+), 1443 deletions(-) diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index d74ad173811..5f77e996744 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -316,22 +315,6 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items); - auto log_store = state_manager->load_log_store(); - auto next_log_idx = log_store->next_slot(); - if (next_log_idx > 0 && next_log_idx > state_machine->last_commit_index()) - { - auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, next_log_idx); - - auto idx = state_machine->last_commit_index() + 1; - for (const auto & entry : *log_entries) - { - if (entry && entry->get_val_type() == nuraft::log_val_type::app_log) - state_machine->preprocess(idx, entry->get_buf()); - - ++idx; - } - } - loadLatestConfig(); last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index fa3a5195226..be7110fa841 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -44,6 +44,7 @@ namespace else /// backward compatibility request_for_session.time = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + return request_for_session; } } @@ -113,21 +114,6 @@ void KeeperStateMachine::init() storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest); } -nuraft::ptr KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) -{ - preprocess(log_idx, data); - return nullptr; -} - -void KeeperStateMachine::preprocess(const uint64_t log_idx, nuraft::buffer & data) -{ - auto request_for_session = parseRequest(data); - if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) - return; - std::lock_guard lock(storage_and_responses_lock); - storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx); -} - nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) { auto request_for_session = parseRequest(data); @@ -196,12 +182,6 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr cluster_config = ClusterConfig::deserialize(*tmp); } -void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & /*data*/) -{ - std::lock_guard lock(storage_and_responses_lock); - storage->rollbackRequest(log_idx); -} - nuraft::ptr KeeperStateMachine::last_snapshot() { /// Just return the latest snapshot. @@ -363,7 +343,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi { /// Pure local request, just process it with storage std::lock_guard lock(storage_and_responses_lock); - auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt, true /*check_acl*/, true /*is_local*/); + auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt); for (const auto & response : responses) if (!responses_queue.push(response)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index aed96a59c13..73578e6a2ba 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -27,16 +27,16 @@ public: /// Read state from the latest snapshot void init(); - void preprocess(uint64_t log_idx, nuraft::buffer & data); - - nuraft::ptr pre_commit(uint64_t log_idx, nuraft::buffer & data) override; + /// Currently not supported + nuraft::ptr pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; } nuraft::ptr commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT /// Save new cluster config to our snapshot (copy of the config stored in StateManager) void commit_config(const uint64_t log_idx, nuraft::ptr & new_conf) override; /// NOLINT - void rollback(uint64_t log_idx, nuraft::buffer & data) override; + /// Currently not supported + void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {} uint64_t last_commit_index() override { return last_committed_idx; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 6c0699be95c..f58776cf843 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1,21 +1,19 @@ -#include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include "Common/ZooKeeper/ZooKeeperConstants.h" -#include #include -#include -#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { @@ -51,10 +49,37 @@ String getSHA1(const String & userdata) String generateDigest(const String & userdata) { std::vector user_password; - boost::split(user_password, userdata, [](char character) { return character == ':'; }); + boost::split(user_password, userdata, [](char c) { return c == ':'; }); return user_password[0] + ":" + base64Encode(getSHA1(userdata)); } +bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector & session_auths) +{ + if (node_acls.empty()) + return true; + + for (const auto & session_auth : session_auths) + if (session_auth.scheme == "super") + return true; + + for (const auto & node_acl : node_acls) + { + if (node_acl.permissions & permission) + { + if (node_acl.scheme == "world" && node_acl.id == "anyone") + return true; + + for (const auto & session_auth : session_auths) + { + if (node_acl.scheme == session_auth.scheme && node_acl.id == session_auth.id) + return true; + } + } + } + + return false; +} + bool fixupACL( const std::vector & request_acls, const std::vector & current_ids, @@ -97,12 +122,11 @@ bool fixupACL( return valid_found; } -KeeperStorage::ResponsesForSessions processWatchesImpl( - const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type) +KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type) { KeeperStorage::ResponsesForSessions result; - auto watch_it = watches.find(path); - if (watch_it != watches.end()) + auto it = watches.find(path); + if (it != watches.end()) { std::shared_ptr watch_response = std::make_shared(); watch_response->path = path; @@ -110,10 +134,10 @@ KeeperStorage::ResponsesForSessions processWatchesImpl( watch_response->zxid = -1; watch_response->type = event_type; watch_response->state = Coordination::State::CONNECTED; - for (auto watcher_session : watch_it->second) + for (auto watcher_session : it->second) result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_response}); - watches.erase(watch_it); + watches.erase(it); } auto parent_path = parentPath(path); @@ -132,11 +156,10 @@ KeeperStorage::ResponsesForSessions processWatchesImpl( for (const auto & path_to_check : paths_to_check_for_list_watches) { - watch_it = list_watches.find(path_to_check); - if (watch_it != list_watches.end()) + it = list_watches.find(path_to_check); + if (it != list_watches.end()) { - std::shared_ptr watch_list_response - = std::make_shared(); + std::shared_ptr watch_list_response = std::make_shared(); watch_list_response->path = path_to_check; watch_list_response->xid = Coordination::WATCH_XID; watch_list_response->zxid = -1; @@ -146,15 +169,14 @@ KeeperStorage::ResponsesForSessions processWatchesImpl( watch_list_response->type = Coordination::Event::DELETED; watch_list_response->state = Coordination::State::CONNECTED; - for (auto watcher_session : watch_it->second) + for (auto watcher_session : it->second) result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response}); - list_watches.erase(watch_it); + list_watches.erase(it); } } return result; } - } void KeeperStorage::Node::setData(String new_data) @@ -176,322 +198,24 @@ void KeeperStorage::Node::removeChild(StringRef child_path) } KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_) - : session_expiry_queue(tick_time_ms), superdigest(superdigest_) + : session_expiry_queue(tick_time_ms) + , superdigest(superdigest_) { container.insert("/", Node()); } -template -struct Overloaded : Ts... -{ - using Ts::operator()...; -}; - -// explicit deduction guide -// https://en.cppreference.com/w/cpp/language/class_template_argument_deduction -template -Overloaded(Ts...) -> Overloaded; - -std::shared_ptr KeeperStorage::UncommittedState::getNode(StringRef path) -{ - std::shared_ptr node{nullptr}; - - if (auto maybe_node_it = storage.container.find(path); maybe_node_it != storage.container.end()) - { - const auto & committed_node = maybe_node_it->value; - node = std::make_shared(); - node->stat = committed_node.stat; - node->seq_num = committed_node.seq_num; - node->setData(committed_node.getData()); - } - - applyDeltas( - path, - Overloaded{ - [&](const CreateNodeDelta & create_delta) - { - assert(!node); - node = std::make_shared(); - node->stat = create_delta.stat; - node->setData(create_delta.data); - }, - [&](const RemoveNodeDelta & /*remove_delta*/) - { - assert(node); - node = nullptr; - }, - [&](const UpdateNodeDelta & update_delta) - { - assert(node); - update_delta.update_fn(*node); - }, - [&](auto && /*delta*/) {}, - }); - - return node; -} - -bool KeeperStorage::UncommittedState::hasNode(StringRef path) const -{ - bool exists = storage.container.contains(std::string{path}); - applyDeltas( - path, - Overloaded{ - [&](const CreateNodeDelta & /*create_delta*/) - { - assert(!exists); - exists = true; - }, - [&](const RemoveNodeDelta & /*remove_delta*/) - { - assert(exists); - exists = false; - }, - [&](auto && /*delta*/) {}, - }); - - return exists; -} - -Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const -{ - std::optional acl_id; - if (auto maybe_node_it = storage.container.find(path); maybe_node_it != storage.container.end()) - acl_id.emplace(maybe_node_it->value.acl_id); - - const Coordination::ACLs * acls{nullptr}; - applyDeltas( - path, - Overloaded{ - [&](const CreateNodeDelta & create_delta) - { - assert(!acl_id); - acls = &create_delta.acls; - }, - [&](const RemoveNodeDelta & /*remove_delta*/) - { - assert(acl_id || acls); - acl_id.reset(); - acls = nullptr; - }, - [&](const SetACLDelta & set_acl_delta) - { - assert(acl_id || acls); - acls = &set_acl_delta.acls; - }, - [&](auto && /*delta*/) {}, - }); - - if (acls) - return *acls; - - return acl_id ? storage.acl_map.convertNumber(*acl_id) : Coordination::ACLs{}; -} - -namespace -{ - -[[noreturn]] void onStorageInconsistency() -{ - LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Inconsistency found between uncommitted and committed data. Keeper will terminate to avoid undefined behaviour."); - std::terminate(); -} - -} - -Coordination::Error KeeperStorage::commit(int64_t commit_zxid, int64_t session_id) -{ - // Deltas are added with increasing ZXIDs - // If there are no deltas for the commit_zxid (e.g. read requests), we instantly return - // on first delta - for (auto & delta : uncommitted_state.deltas) - { - if (delta.zxid > commit_zxid) - break; - - bool finish_subdelta = false; - auto result = std::visit( - [&, &path = delta.path](DeltaType & operation) -> Coordination::Error - { - if constexpr (std::same_as) - { - if (!createNode( - path, - std::move(operation.data), - operation.stat, - operation.is_sequental, - operation.is_ephemeral, - std::move(operation.acls), - session_id)) - onStorageInconsistency(); - - return Coordination::Error::ZOK; - } - else if constexpr (std::same_as) - { - auto node_it = container.find(path); - if (node_it == container.end()) - onStorageInconsistency(); - - if (operation.version != -1 && operation.version != node_it->value.stat.version) - onStorageInconsistency(); - - container.updateValue(path, operation.update_fn); - return Coordination::Error::ZOK; - } - else if constexpr (std::same_as) - { - if (!removeNode(path, operation.version)) - onStorageInconsistency(); - - return Coordination::Error::ZOK; - } - else if constexpr (std::same_as) - { - auto node_it = container.find(path); - if (node_it == container.end()) - onStorageInconsistency(); - - if (operation.version != -1 && operation.version != node_it->value.stat.aversion) - onStorageInconsistency(); - - acl_map.removeUsage(node_it->value.acl_id); - - uint64_t acl_id = acl_map.convertACLs(operation.acls); - acl_map.addUsage(acl_id); - - container.updateValue(path, [acl_id](KeeperStorage::Node & node) { node.acl_id = acl_id; }); - - return Coordination::Error::ZOK; - } - else if constexpr (std::same_as) - return operation.error; - else if constexpr (std::same_as) - { - finish_subdelta = true; - return Coordination::Error::ZOK; - } - else if constexpr (std::same_as) - { - session_and_auth[operation.session_id].emplace_back(std::move(operation.auth_id)); - return Coordination::Error::ZOK; - } - else - { - // shouldn't be called in any process functions - onStorageInconsistency(); - } - }, - delta.operation); - - if (result != Coordination::Error::ZOK) - return result; - - if (finish_subdelta) - return Coordination::Error::ZOK; - } - - return Coordination::Error::ZOK; -} - -bool KeeperStorage::createNode( - const std::string & path, - String data, - const Coordination::Stat & stat, - bool is_sequental, - bool is_ephemeral, - Coordination::ACLs node_acls, - int64_t session_id) -{ - auto parent_path = parentPath(path); - auto node_it = container.find(parent_path); - - if (node_it == container.end()) - return false; - - if (node_it->value.stat.ephemeralOwner != 0) - return false; - - if (container.contains(path)) - return false; - - KeeperStorage::Node created_node; - - uint64_t acl_id = acl_map.convertACLs(node_acls); - acl_map.addUsage(acl_id); - - created_node.acl_id = acl_id; - created_node.stat = stat; - created_node.setData(std::move(data)); - created_node.is_sequental = is_sequental; - auto [map_key, _] = container.insert(path, created_node); - /// Take child path from key owned by map. - auto child_path = getBaseName(map_key->getKey()); - container.updateValue(parent_path, [child_path](KeeperStorage::Node & parent) { parent.addChild(child_path); }); - - if (is_ephemeral) - ephemerals[session_id].emplace(path); - - return true; -}; - -bool KeeperStorage::removeNode(const std::string & path, int32_t version) -{ - auto node_it = container.find(path); - if (node_it == container.end()) - return false; - - if (version != -1 && version != node_it->value.stat.version) - return false; - - if (node_it->value.stat.numChildren) - return false; - - auto prev_node = node_it->value; - if (prev_node.stat.ephemeralOwner != 0) - { - auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner); - ephemerals_it->second.erase(path); - if (ephemerals_it->second.empty()) - ephemerals.erase(ephemerals_it); - } - - acl_map.removeUsage(prev_node.acl_id); - - container.updateValue( - parentPath(path), - [child_basename = getBaseName(node_it->key)](KeeperStorage::Node & parent) { parent.removeChild(child_basename); }); - - container.erase(path); - return true; -} - +using Undo = std::function; struct KeeperStorageRequestProcessor { Coordination::ZooKeeperRequestPtr zk_request; - explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_) : zk_request(zk_request_) { } - virtual Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const = 0; - virtual std::vector - preprocess(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/) const - { - return {}; - } - - // process the request using locally committed data - virtual Coordination::ZooKeeperResponsePtr - processLocal(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/) const - { - throw Exception{DB::ErrorCodes::LOGICAL_ERROR, "Cannot process the request locally"}; - } - - virtual KeeperStorage::ResponsesForSessions - processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const - { - return {}; - } - virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/, bool /*is_local*/) const { return true; } + explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_) + : zk_request(zk_request_) + {} + virtual std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const = 0; + virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; } + virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/) const { return true; } virtual ~KeeperStorageRequestProcessor() = default; }; @@ -499,328 +223,331 @@ struct KeeperStorageRequestProcessor struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override + std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { - return zk_request->makeResponse(); + return {zk_request->makeResponse(), {}}; } }; struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override + std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { auto response = zk_request->makeResponse(); dynamic_cast(*response).path = dynamic_cast(*zk_request).path; - return response; + return {response, {}}; } }; -namespace -{ - - Coordination::ACLs getNodeACLs(KeeperStorage & storage, StringRef path, bool is_local) - { - if (is_local) - { - auto node_it = storage.container.find(path); - if (node_it == storage.container.end()) - return {}; - - return storage.acl_map.convertNumber(node_it->value.acl_id); - } - - return storage.uncommitted_state.getACLs(path); - } - -} -bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session_id, bool is_local) -{ - const auto node_acls = getNodeACLs(*this, path, is_local); - if (node_acls.empty()) - return true; - - if (uncommitted_state.hasACL(session_id, is_local, [](const auto & auth_id) { return auth_id.scheme == "super"; })) - return true; - - - for (const auto & node_acl : node_acls) - { - if (node_acl.permissions & permission) - { - if (node_acl.scheme == "world" && node_acl.id == "anyone") - return true; - - if (uncommitted_state.hasACL( - session_id, - is_local, - [&](const auto & auth_id) { return auth_id.scheme == node_acl.scheme && auth_id.id == node_acl.id; })) - return true; - } - } - - return false; -} - - struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - KeeperStorage::ResponsesForSessions - processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); } - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { + auto & container = storage.container; auto path = zk_request->getPath(); - return storage.checkACL(parentPath(path), Coordination::ACL::Create, session_id, is_local); + auto parent_path = parentPath(path); + + auto it = container.find(parent_path); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Create, node_acls, session_auths); } - std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { + auto & container = storage.container; + auto & ephemerals = storage.ephemerals; + + Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); + Undo undo; + Coordination::ZooKeeperCreateResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperCreateRequest & request = dynamic_cast(*zk_request); - std::vector new_deltas; - auto parent_path = parentPath(request.path); - auto parent_node = storage.uncommitted_state.getNode(parent_path); - if (parent_node == nullptr) - return {{zxid, Coordination::Error::ZNONODE}}; - - else if (parent_node->stat.ephemeralOwner != 0) - return {{zxid, Coordination::Error::ZNOCHILDRENFOREPHEMERALS}}; + auto it = container.find(parent_path); + if (it == container.end()) + { + response.error = Coordination::Error::ZNONODE; + return { response_ptr, undo }; + } + else if (it->value.stat.ephemeralOwner != 0) + { + response.error = Coordination::Error::ZNOCHILDRENFOREPHEMERALS; + return { response_ptr, undo }; + } std::string path_created = request.path; if (request.is_sequential) { - auto seq_num = parent_node->seq_num; + auto seq_num = it->value.seq_num; - std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM seq_num_str.exceptions(std::ios::failbit); seq_num_str << std::setw(10) << std::setfill('0') << seq_num; path_created += seq_num_str.str(); } - - if (storage.uncommitted_state.hasNode(path_created)) - return {{zxid, Coordination::Error::ZNODEEXISTS}}; - - if (getBaseName(path_created).size == 0) - return {{zxid, Coordination::Error::ZBADARGUMENTS}}; - - Coordination::ACLs node_acls; - if (!fixupACL(request.acls, storage.session_and_auth[session_id], node_acls)) - return {{zxid, Coordination::Error::ZINVALIDACL}}; - - Coordination::Stat stat; - stat.czxid = zxid; - stat.mzxid = zxid; - stat.pzxid = zxid; - stat.ctime = time; - stat.mtime = time; - stat.numChildren = 0; - stat.version = 0; - stat.aversion = 0; - stat.cversion = 0; - stat.dataLength = request.data.length(); - stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; - - new_deltas.emplace_back( - std::move(path_created), - zxid, - KeeperStorage::CreateNodeDelta{stat, request.is_ephemeral, request.is_sequential, std::move(node_acls), request.data}); - - int32_t parent_cversion = request.parent_cversion; - - new_deltas.emplace_back( - std::string{parent_path}, - zxid, - KeeperStorage::UpdateNodeDelta{[parent_cversion, zxid](KeeperStorage::Node & node) - { - ++node.seq_num; - if (parent_cversion == -1) - ++node.stat.cversion; - else if (parent_cversion > node.stat.cversion) - node.stat.cversion = parent_cversion; - - if (zxid > node.stat.pzxid) - node.stat.pzxid = zxid; - ++node.stat.numChildren; - }}); - return new_deltas; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override - { - Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); - Coordination::ZooKeeperCreateResponse & response = dynamic_cast(*response_ptr); - - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + if (container.contains(path_created)) { - response.error = result; - return response_ptr; + response.error = Coordination::Error::ZNODEEXISTS; + return { response_ptr, undo }; + } + if (getBaseName(path_created).size == 0) + { + response.error = Coordination::Error::ZBADARGUMENTS; + return { response_ptr, undo }; } - const auto & deltas = storage.uncommitted_state.deltas; - auto create_delta_it = std::find_if( - deltas.begin(), - deltas.end(), - [zxid](const auto & delta) - { return delta.zxid == zxid && std::holds_alternative(delta.operation); }); + auto & session_auth_ids = storage.session_and_auth[session_id]; - assert(create_delta_it != deltas.end()); + KeeperStorage::Node created_node; + + Coordination::ACLs node_acls; + if (!fixupACL(request.acls, session_auth_ids, node_acls)) + { + response.error = Coordination::Error::ZINVALIDACL; + return {response_ptr, {}}; + } + + uint64_t acl_id = storage.acl_map.convertACLs(node_acls); + storage.acl_map.addUsage(acl_id); + + created_node.acl_id = acl_id; + created_node.stat.czxid = zxid; + created_node.stat.mzxid = zxid; + created_node.stat.pzxid = zxid; + created_node.stat.ctime = time; + created_node.stat.mtime = time; + created_node.stat.numChildren = 0; + created_node.stat.dataLength = request.data.length(); + created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; + created_node.is_sequental = request.is_sequential; + created_node.setData(std::move(request.data)); + + auto [map_key, _] = container.insert(path_created, created_node); + /// Take child path from key owned by map. + auto child_path = getBaseName(map_key->getKey()); + + int32_t parent_cversion = request.parent_cversion; + int64_t prev_parent_zxid; + int32_t prev_parent_cversion; + container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid, + parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent) + { + parent.addChild(child_path); + prev_parent_cversion = parent.stat.cversion; + prev_parent_zxid = parent.stat.pzxid; + + /// Increment sequential number even if node is not sequential + ++parent.seq_num; + + if (parent_cversion == -1) + ++parent.stat.cversion; + else if (parent_cversion > parent.stat.cversion) + parent.stat.cversion = parent_cversion; + + if (zxid > parent.stat.pzxid) + parent.stat.pzxid = zxid; + ++parent.stat.numChildren; + }); + + response.path_created = path_created; + + if (request.is_ephemeral) + ephemerals[session_id].emplace(path_created); + + undo = [&storage, prev_parent_zxid, prev_parent_cversion, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id] + { + storage.acl_map.removeUsage(acl_id); + + if (is_ephemeral) + storage.ephemerals[session_id].erase(path_created); + + storage.container.updateValue(parent_path, [child_path, prev_parent_zxid, prev_parent_cversion] (KeeperStorage::Node & undo_parent) + { + --undo_parent.stat.numChildren; + --undo_parent.seq_num; + undo_parent.stat.cversion = prev_parent_cversion; + undo_parent.stat.pzxid = prev_parent_zxid; + undo_parent.removeChild(child_path); + }); + + storage.container.erase(path_created); + }; - response.path_created = create_delta_it->path; response.error = Coordination::Error::ZOK; - return response_ptr; + return { response_ptr, undo }; } }; struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local); + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Read, node_acls, session_auths); } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - - std::vector - preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override - { - Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); - - if (!storage.uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - return {}; - } - - template - Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const + std::pair process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { + auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); - if constexpr (!local) + auto it = container.find(request.path); + if (it == container.end()) { - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) - { - response.error = result; - return response_ptr; - } - } - - auto & container = storage.container; - auto node_it = container.find(request.path); - if (node_it == container.end()) - { - if constexpr (local) - response.error = Coordination::Error::ZNONODE; - else - onStorageInconsistency(); + response.error = Coordination::Error::ZNONODE; } else { - response.stat = node_it->value.stat; - response.data = node_it->value.getData(); + response.stat = it->value.stat; + response.data = it->value.getData(); response.error = Coordination::Error::ZOK; } - return response_ptr; - } - - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); - } - - Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); + return { response_ptr, {} }; } }; +namespace +{ + /// Garbage required to apply log to "fuzzy" zookeeper snapshot + void updateParentPzxid(const std::string & child_path, int64_t zxid, KeeperStorage::Container & container) + { + auto parent_path = parentPath(child_path); + auto parent_it = container.find(parent_path); + if (parent_it != container.end()) + { + container.updateValue(parent_path, [zxid](KeeperStorage::Node & parent) + { + if (parent.stat.pzxid < zxid) + parent.stat.pzxid = zxid; + }); + } + } +} + struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(parentPath(zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local); + auto & container = storage.container; + auto it = container.find(parentPath(zk_request->getPath())); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Delete, node_acls, session_auths); } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector - preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /* time */) const override { - Coordination::ZooKeeperRemoveRequest & request = dynamic_cast(*zk_request); + auto & container = storage.container; + auto & ephemerals = storage.ephemerals; - std::vector new_deltas; - - const auto update_parent_pzxid = [&]() - { - auto parent_path = parentPath(request.path); - if (!storage.uncommitted_state.hasNode(parent_path)) - return; - - new_deltas.emplace_back( - std::string{parent_path}, - zxid, - KeeperStorage::UpdateNodeDelta{[zxid](KeeperStorage::Node & parent) - { - if (parent.stat.pzxid < zxid) - parent.stat.pzxid = zxid; - }}); - }; - - auto node = storage.uncommitted_state.getNode(request.path); - - if (!node) - { - if (request.restored_from_zookeeper_log) - update_parent_pzxid(); - return {{zxid, Coordination::Error::ZNONODE}}; - } - else if (request.version != -1 && request.version != node->stat.version) - return {{zxid, Coordination::Error::ZBADVERSION}}; - else if (node->stat.numChildren) - return {{zxid, Coordination::Error::ZNOTEMPTY}}; - - if (request.restored_from_zookeeper_log) - update_parent_pzxid(); - - new_deltas.emplace_back( - std::string{parentPath(request.path)}, - zxid, - KeeperStorage::UpdateNodeDelta{[](KeeperStorage::Node & parent) - { - --parent.stat.numChildren; - ++parent.stat.cversion; - }}); - - new_deltas.emplace_back(request.path, zxid, KeeperStorage::RemoveNodeDelta{request.version}); - - return new_deltas; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override - { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperRemoveResponse & response = dynamic_cast(*response_ptr); + Coordination::ZooKeeperRemoveRequest & request = dynamic_cast(*zk_request); + Undo undo; - response.error = storage.commit(zxid, session_id); - return response_ptr; + auto it = container.find(request.path); + if (it == container.end()) + { + if (request.restored_from_zookeeper_log) + updateParentPzxid(request.path, zxid, container); + response.error = Coordination::Error::ZNONODE; + } + else if (request.version != -1 && request.version != it->value.stat.version) + { + response.error = Coordination::Error::ZBADVERSION; + } + else if (it->value.stat.numChildren) + { + response.error = Coordination::Error::ZNOTEMPTY; + } + else + { + if (request.restored_from_zookeeper_log) + updateParentPzxid(request.path, zxid, container); + + auto prev_node = it->value; + if (prev_node.stat.ephemeralOwner != 0) + { + auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner); + ephemerals_it->second.erase(request.path); + if (ephemerals_it->second.empty()) + ephemerals.erase(ephemerals_it); + } + + storage.acl_map.removeUsage(prev_node.acl_id); + + container.updateValue(parentPath(request.path), [child_basename = getBaseName(it->key)] (KeeperStorage::Node & parent) + { + --parent.stat.numChildren; + ++parent.stat.cversion; + parent.removeChild(child_basename); + }); + + response.error = Coordination::Error::ZOK; + /// Erase full path from container after child removed from parent + container.erase(request.path); + + undo = [prev_node, &storage, path = request.path] + { + if (prev_node.stat.ephemeralOwner != 0) + storage.ephemerals[prev_node.stat.ephemeralOwner].emplace(path); + + storage.acl_map.addUsage(prev_node.acl_id); + + /// Dangerous place: we are adding StringRef to child into children unordered_hash set. + /// That's why we are taking getBaseName from inserted key, not from the path from request object. + auto [map_key, _] = storage.container.insert(path, prev_node); + storage.container.updateValue(parentPath(path), [child_name = getBaseName(map_key->getKey())] (KeeperStorage::Node & parent) + { + ++parent.stat.numChildren; + --parent.stat.cversion; + parent.addChild(child_name); + }); + }; + } + + return { response_ptr, undo }; } - KeeperStorage::ResponsesForSessions - processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } @@ -829,140 +556,101 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - - std::vector - preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */, int64_t /* time */) const override { - Coordination::ZooKeeperExistsRequest & request = dynamic_cast(*zk_request); + auto & container = storage.container; - if (!storage.uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - return {}; - } - - template - Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const - { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperExistsResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperExistsRequest & request = dynamic_cast(*zk_request); - if constexpr (!local) + auto it = container.find(request.path); + if (it != container.end()) { - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) - { - response.error = result; - return response_ptr; - } - } - - auto & container = storage.container; - auto node_it = container.find(request.path); - if (node_it == container.end()) - { - if constexpr (local) - response.error = Coordination::Error::ZNONODE; - else - onStorageInconsistency(); + response.stat = it->value.stat; + response.error = Coordination::Error::ZOK; } else { - response.stat = node_it->value.stat; - response.error = Coordination::Error::ZOK; + response.error = Coordination::Error::ZNONODE; } - return response_ptr; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); - } - - Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); + return { response_ptr, {} }; } }; struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(zk_request->getPath(), Coordination::ACL::Write, session_id, is_local); + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Write, node_acls, session_auths); } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t time) const override - { - Coordination::ZooKeeperSetRequest & request = dynamic_cast(*zk_request); - - std::vector new_deltas; - - if (!storage.uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - auto node = storage.uncommitted_state.getNode(request.path); - - if (request.version != -1 && request.version != node->stat.version) - return {{zxid, Coordination::Error::ZBADVERSION}}; - - new_deltas.emplace_back( - request.path, - zxid, - KeeperStorage::UpdateNodeDelta{ - [zxid, data = request.data, time](KeeperStorage::Node & value) - { - value.stat.version++; - value.stat.mzxid = zxid; - value.stat.mtime = time; - value.stat.dataLength = data.length(); - value.setData(data); - }, - request.version}); - - new_deltas.emplace_back( - parentPath(request.path).toString(), - zxid, - KeeperStorage::UpdateNodeDelta - { - [](KeeperStorage::Node & parent) - { - parent.stat.cversion++; - } - } - ); - - return new_deltas; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */, int64_t time) const override { auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperSetResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperSetRequest & request = dynamic_cast(*zk_request); + Undo undo; - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + auto it = container.find(request.path); + if (it == container.end()) { - response.error = result; - return response_ptr; + response.error = Coordination::Error::ZNONODE; + } + else if (request.version == -1 || request.version == it->value.stat.version) + { + + auto prev_node = it->value; + + auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) mutable + { + value.stat.version++; + value.stat.mzxid = zxid; + value.stat.mtime = time; + value.stat.dataLength = request.data.length(); + value.setData(std::move(request.data)); + }); + + container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent) + { + parent.stat.cversion++; + }); + + response.stat = itr->value.stat; + response.error = Coordination::Error::ZOK; + + undo = [prev_node, &container, path = request.path] + { + container.updateValue(path, [&prev_node] (KeeperStorage::Node & value) { value = prev_node; }); + container.updateValue(parentPath(path), [] (KeeperStorage::Node & parent) + { + parent.stat.cversion--; + }); + }; + } + else + { + response.error = Coordination::Error::ZBADVERSION; } - auto node_it = container.find(request.path); - if (node_it == container.end()) - onStorageInconsistency(); - - response.stat = node_it->value.stat; - response.error = Coordination::Error::ZOK; - - return response_ptr; + return { response_ptr, undo }; } - KeeperStorage::ResponsesForSessions - processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); } @@ -970,48 +658,33 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local); + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Read, node_acls, session_auths); } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector - preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override - { - Coordination::ZooKeeperListRequest & request = dynamic_cast(*zk_request); - - if (!storage.uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - return {}; - } - - - template - Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { + auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperListResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperListRequest & request = dynamic_cast(*zk_request); - if constexpr (!local) + auto it = container.find(request.path); + if (it == container.end()) { - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) - { - response.error = result; - return response_ptr; - } - } - - auto & container = storage.container; - auto node_it = container.find(request.path); - if (node_it == container.end()) - { - if constexpr (local) - response.error = Coordination::Error::ZNONODE; - else - onStorageInconsistency(); + response.error = Coordination::Error::ZNONODE; } else { @@ -1019,247 +692,174 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc if (path_prefix.empty()) throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); - const auto & children = node_it->value.getChildren(); + const auto & children = it->value.getChildren(); response.names.reserve(children.size()); for (const auto child : children) response.names.push_back(child.toString()); - response.stat = node_it->value.stat; + response.stat = it->value.stat; response.error = Coordination::Error::ZOK; } - return response_ptr; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); - } - - Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); + return { response_ptr, {} }; } }; struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local); + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Read, node_acls, session_auths); } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector - preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { - Coordination::ZooKeeperCheckRequest & request = dynamic_cast(*zk_request); + auto & container = storage.container; - if (!storage.uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - auto node = storage.uncommitted_state.getNode(request.path); - if (request.version != -1 && request.version != node->stat.version) - return {{zxid, Coordination::Error::ZBADVERSION}}; - - return {}; - } - - template - Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const - { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperCheckResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperCheckRequest & request = dynamic_cast(*zk_request); - - if constexpr (!local) + auto it = container.find(request.path); + if (it == container.end()) { - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) - { - response.error = result; - return response_ptr; - } + response.error = Coordination::Error::ZNONODE; } - - const auto on_error = [&]([[maybe_unused]] const auto error_code) + else if (request.version != -1 && request.version != it->value.stat.version) { - if constexpr (local) - response.error = error_code; - else - onStorageInconsistency(); - }; - - auto & container = storage.container; - auto node_it = container.find(request.path); - if (node_it == container.end()) - { - on_error(Coordination::Error::ZNONODE); - } - else if (request.version != -1 && request.version != node_it->value.stat.version) - { - on_error(Coordination::Error::ZBADVERSION); + response.error = Coordination::Error::ZBADVERSION; } else { response.error = Coordination::Error::ZOK; } - return response_ptr; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); - } - - Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); + return { response_ptr, {} }; } }; struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(zk_request->getPath(), Coordination::ACL::Admin, session_id, is_local); + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Admin, node_acls, session_auths); } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override { - Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); + auto & container = storage.container; - auto & uncommitted_state = storage.uncommitted_state; - if (!uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - auto node = uncommitted_state.getNode(request.path); - - if (request.version != -1 && request.version != node->stat.aversion) - return {{zxid, Coordination::Error::ZBADVERSION}}; - - - auto & session_auth_ids = storage.session_and_auth[session_id]; - Coordination::ACLs node_acls; - - if (!fixupACL(request.acls, session_auth_ids, node_acls)) - return {{zxid, Coordination::Error::ZINVALIDACL}}; - - return - { - { - request.path, - zxid, - KeeperStorage::SetACLDelta{std::move(node_acls), request.version} - }, - { - request.path, - zxid, - KeeperStorage::UpdateNodeDelta - { - [](KeeperStorage::Node & n) { ++n.stat.aversion; } - } - } - }; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override - { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperSetACLResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); - - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + auto it = container.find(request.path); + if (it == container.end()) { - response.error = result; - return response_ptr; + response.error = Coordination::Error::ZNONODE; + } + else if (request.version != -1 && request.version != it->value.stat.aversion) + { + response.error = Coordination::Error::ZBADVERSION; + } + else + { + auto & session_auth_ids = storage.session_and_auth[session_id]; + Coordination::ACLs node_acls; + + if (!fixupACL(request.acls, session_auth_ids, node_acls)) + { + response.error = Coordination::Error::ZINVALIDACL; + return {response_ptr, {}}; + } + + uint64_t acl_id = storage.acl_map.convertACLs(node_acls); + storage.acl_map.addUsage(acl_id); + + storage.container.updateValue(request.path, [acl_id] (KeeperStorage::Node & node) + { + node.acl_id = acl_id; + ++node.stat.aversion; + }); + + response.stat = it->value.stat; + response.error = Coordination::Error::ZOK; } - auto node_it = storage.container.find(request.path); - if (node_it == storage.container.end()) - onStorageInconsistency(); - response.stat = node_it->value.stat; - response.error = Coordination::Error::ZOK; - - return response_ptr; + /// It cannot be used insied multitransaction? + return { response_ptr, {} }; } }; struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { - return storage.checkACL(zk_request->getPath(), Coordination::ACL::Admin | Coordination::ACL::Read, session_id, is_local); - } + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + /// LOL, GetACL require more permissions, then SetACL... + return checkACL(Coordination::ACL::Admin | Coordination::ACL::Read, node_acls, session_auths); + } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector - preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override - { - Coordination::ZooKeeperGetACLRequest & request = dynamic_cast(*zk_request); - - if (!storage.uncommitted_state.hasNode(request.path)) - return {{zxid, Coordination::Error::ZNONODE}}; - - return {}; - } - - template - Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetACLResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperGetACLRequest & request = dynamic_cast(*zk_request); - - if constexpr (!local) - { - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) - { - response.error = result; - return response_ptr; - } - } - auto & container = storage.container; - auto node_it = container.find(request.path); - if (node_it == container.end()) + auto it = container.find(request.path); + if (it == container.end()) { - if constexpr (local) - response.error = Coordination::Error::ZNONODE; - else - onStorageInconsistency(); + response.error = Coordination::Error::ZNONODE; } else { - response.stat = node_it->value.stat; - response.acl = storage.acl_map.convertNumber(node_it->value.acl_id); + response.stat = it->value.stat; + response.acl = storage.acl_map.convertNumber(it->value.acl_id); } - return response_ptr; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); - } - - Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - return processImpl(storage, zxid, session_id, time); + return {response_ptr, {}}; } }; struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor { - bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { for (const auto & concrete_request : concrete_requests) - if (!concrete_request->checkAuth(storage, session_id, is_local)) + if (!concrete_request->checkAuth(storage, session_id)) return false; return true; } @@ -1289,124 +889,65 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro concrete_requests.push_back(std::make_shared(sub_zk_request)); break; default: - throw DB::Exception( - ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum()); + throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum()); } } } - std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { - // manually add deltas so that the result of previous request in the transaction is used in the next request - auto & saved_deltas = storage.uncommitted_state.deltas; + Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); + Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); + std::vector undo_actions; - std::vector response_errors; - response_errors.reserve(concrete_requests.size()); - for (size_t i = 0; i < concrete_requests.size(); ++i) + try { - auto new_deltas = concrete_requests[i]->preprocess(storage, zxid, session_id, time); - - if (!new_deltas.empty()) + size_t i = 0; + for (const auto & concrete_request : concrete_requests) { - if (auto * error = std::get_if(&new_deltas.back().operation)) + auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id, time); + + response.responses[i] = cur_response; + if (cur_response->error != Coordination::Error::ZOK) { - std::erase_if(saved_deltas, [zxid](const auto & delta) { return delta.zxid == zxid; }); - - response_errors.push_back(error->error); - - for (size_t j = i + 1; j < concrete_requests.size(); ++j) + for (size_t j = 0; j <= i; ++j) { - response_errors.push_back(Coordination::Error::ZRUNTIMEINCONSISTENCY); + auto response_error = response.responses[j]->error; + response.responses[j] = std::make_shared(); + response.responses[j]->error = response_error; } - return {{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}}; + for (size_t j = i + 1; j < response.responses.size(); ++j) + { + response.responses[j] = std::make_shared(); + response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY; + } + + for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it) + if (*it) + (*it)(); + + return { response_ptr, {} }; } + else + undo_actions.emplace_back(std::move(undo_action)); + + ++i; } - new_deltas.emplace_back(zxid, KeeperStorage::SubDeltaEnd{}); - response_errors.push_back(Coordination::Error::ZOK); - saved_deltas.insert(saved_deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end())); + response.error = Coordination::Error::ZOK; + return { response_ptr, {} }; + } + catch (...) + { + for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it) + if (*it) + (*it)(); + throw; } - - return {}; } - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); - Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); - - auto & deltas = storage.uncommitted_state.deltas; - // the deltas will have at least SubDeltaEnd or FailedMultiDelta - assert(!deltas.empty()); - if (auto * failed_multi = std::get_if(&deltas.front().operation)) - { - for (size_t i = 0; i < concrete_requests.size(); ++i) - { - response.responses[i] = std::make_shared(); - response.responses[i]->error = failed_multi->error_codes[i]; - } - - return response_ptr; - } - - for (size_t i = 0; i < concrete_requests.size(); ++i) - { - auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time); - - while (!deltas.empty()) - { - if (std::holds_alternative(deltas.front().operation)) - { - deltas.pop_front(); - break; - } - - deltas.pop_front(); - } - - response.responses[i] = cur_response; - } - - response.error = Coordination::Error::ZOK; - return response_ptr; - } - - Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override - { - Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); - Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); - - for (size_t i = 0; i < concrete_requests.size(); ++i) - { - auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time); - - response.responses[i] = cur_response; - if (cur_response->error != Coordination::Error::ZOK) - { - for (size_t j = 0; j <= i; ++j) - { - auto response_error = response.responses[j]->error; - response.responses[j] = std::make_shared(); - response.responses[j]->error = response_error; - } - - for (size_t j = i + 1; j < response.responses.size(); ++j) - { - response.responses[j] = std::make_shared(); - response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY; - } - - return response_ptr; - } - } - - response.error = Coordination::Error::ZOK; - return response_ptr; - } - - KeeperStorage::ResponsesForSessions - processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { KeeperStorage::ResponsesForSessions result; for (const auto & generic_request : concrete_requests) @@ -1421,7 +962,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - Coordination::ZooKeeperResponsePtr process(KeeperStorage &, int64_t, int64_t, int64_t /* time */) const override + std::pair process(KeeperStorage &, int64_t, int64_t, int64_t /* time */) const override { throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR); } @@ -1430,40 +971,36 @@ struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestPro struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override { Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast(*zk_request); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); + Coordination::ZooKeeperAuthResponse & auth_response = dynamic_cast(*response_ptr); + auto & sessions_and_auth = storage.session_and_auth; if (auth_request.scheme != "digest" || std::count(auth_request.data.begin(), auth_request.data.end(), ':') != 1) - return {{zxid, Coordination::Error::ZAUTHFAILED}}; - - std::vector new_deltas; - auto digest = generateDigest(auth_request.data); - if (digest == storage.superdigest) { - KeeperStorage::AuthID auth{"super", ""}; - new_deltas.emplace_back(zxid, KeeperStorage::AddAuthDelta{session_id, std::move(auth)}); + auth_response.error = Coordination::Error::ZAUTHFAILED; } else { - KeeperStorage::AuthID new_auth{auth_request.scheme, digest}; - if (!storage.uncommitted_state.hasACL(session_id, false, [&](const auto & auth_id) { return new_auth == auth_id; })) - new_deltas.emplace_back(zxid, KeeperStorage::AddAuthDelta{session_id, std::move(new_auth)}); + auto digest = generateDigest(auth_request.data); + if (digest == storage.superdigest) + { + KeeperStorage::AuthID auth{"super", ""}; + sessions_and_auth[session_id].emplace_back(auth); + } + else + { + KeeperStorage::AuthID auth{auth_request.scheme, digest}; + auto & session_ids = sessions_and_auth[session_id]; + if (std::find(session_ids.begin(), session_ids.end(), auth) == session_ids.end()) + sessions_and_auth[session_id].emplace_back(auth); + } + } - return new_deltas; - } - - Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override - { - Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); - Coordination::ZooKeeperAuthResponse & auth_response = dynamic_cast(*response_ptr); - - if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) - auth_response.error = result; - - return response_ptr; + return { response_ptr, {} }; } }; @@ -1489,6 +1026,7 @@ void KeeperStorage::finalize() class KeeperStorageRequestProcessorsFactory final : private boost::noncopyable { + public: using Creator = std::function; using OpNumToRequest = std::unordered_map; @@ -1501,11 +1039,11 @@ public: KeeperStorageRequestProcessorPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const { - auto request_it = op_num_to_request.find(zk_request->getOpNum()); - if (request_it == op_num_to_request.end()) + auto it = op_num_to_request.find(zk_request->getOpNum()); + if (it == op_num_to_request.end()) throw DB::Exception("Unknown operation type " + toString(zk_request->getOpNum()), ErrorCodes::LOGICAL_ERROR); - return request_it->second(zk_request); + return it->second(zk_request); } void registerRequest(Coordination::OpNum op_num, Creator creator) @@ -1519,11 +1057,10 @@ private: KeeperStorageRequestProcessorsFactory(); }; -template +template void registerKeeperRequestProcessor(KeeperStorageRequestProcessorsFactory & factory) { - factory.registerRequest( - num, [](const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared(zk_request); }); + factory.registerRequest(num, [] (const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared(zk_request); }); } @@ -1547,66 +1084,13 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory() } -void KeeperStorage::preprocessRequest( - const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, int64_t time, int64_t new_last_zxid, bool check_acl) -{ - KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); - - if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special - { - auto & deltas = uncommitted_state.deltas; - auto session_ephemerals = ephemerals.find(session_id); - if (session_ephemerals != ephemerals.end()) - { - for (const auto & ephemeral_path : session_ephemerals->second) - { - // For now just add deltas for removing the node - // On commit, ephemerals nodes will be deleted from storage - // and removed from the session - if (uncommitted_state.hasNode(ephemeral_path)) - { - deltas.emplace_back( - parentPath(ephemeral_path).toString(), - new_last_zxid, - UpdateNodeDelta{[ephemeral_path](Node & parent) - { - --parent.stat.numChildren; - ++parent.stat.cversion; - }}); - - deltas.emplace_back(ephemeral_path, new_last_zxid, RemoveNodeDelta()); - } - } - } - - return; - } - - if (check_acl && !request_processor->checkAuth(*this, session_id, false)) - { - uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); - return; - } - - auto new_deltas = request_processor->preprocess(*this, new_last_zxid, session_id, time); - uncommitted_state.deltas.insert( - uncommitted_state.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end())); -} - -KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( - const Coordination::ZooKeeperRequestPtr & zk_request, - int64_t session_id, - int64_t time, - std::optional new_last_zxid, - bool check_acl, - bool is_local) +KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl) { KeeperStorage::ResponsesForSessions results; if (new_last_zxid) { if (zxid >= *new_last_zxid) - throw Exception( - ErrorCodes::LOGICAL_ERROR, "Got new ZXID {} smaller or equal than current {}. It's a bug", *new_last_zxid, zxid); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got new ZXID {} smaller or equal than current {}. It's a bug", *new_last_zxid, zxid); zxid = *new_last_zxid; } @@ -1615,22 +1099,26 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { - commit(zxid, session_id); - - for (const auto & delta : uncommitted_state.deltas) + auto it = ephemerals.find(session_id); + if (it != ephemerals.end()) { - if (delta.zxid > zxid) - break; - - if (std::holds_alternative(delta.operation)) + for (const auto & ephemeral_path : it->second) { - auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); + container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (KeeperStorage::Node & parent) + { + --parent.stat.numChildren; + ++parent.stat.cversion; + auto base_name = getBaseName(ephemeral_path); + parent.removeChild(base_name); + }); + + container.erase(ephemeral_path); + + auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED); results.insert(results.end(), responses.begin(), responses.end()); } + ephemerals.erase(it); } - - std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); - clearDeadWatches(session_id); auto auth_it = session_and_auth.find(session_id); if (auth_it != session_and_auth.end()) @@ -1647,7 +1135,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special { KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); - auto response = storage_request->process(*this, zxid, session_id, time); + auto [response, _] = storage_request->process(*this, zxid, session_id, time); response->xid = zk_request->xid; response->zxid = getZXID(); @@ -1658,24 +1146,15 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); Coordination::ZooKeeperResponsePtr response; - if (is_local) + if (check_acl && !request_processor->checkAuth(*this, session_id)) { - assert(zk_request->isReadRequest()); - if (check_acl && !request_processor->checkAuth(*this, session_id, true)) - { - response = zk_request->makeResponse(); - /// Original ZooKeeper always throws no auth, even when user provided some credentials - response->error = Coordination::Error::ZNOAUTH; - } - else - { - response = request_processor->processLocal(*this, zxid, session_id, time); - } + response = zk_request->makeResponse(); + /// Original ZooKeeper always throws no auth, even when user provided some credentials + response->error = Coordination::Error::ZNOAUTH; } else { - response = request_processor->process(*this, zxid, session_id, time); - std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); + std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id, time); } /// Watches for this requests are added to the watches lists @@ -1683,8 +1162,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( { if (response->error == Coordination::Error::ZOK) { - auto & watches_type - = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList + auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList ? list_watches : watches; @@ -1714,16 +1192,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( return results; } -void KeeperStorage::rollbackRequest(int64_t rollback_zxid) -{ - // we can only rollback the last zxid (if there is any) - // if there is a delta with a larger zxid, we have invalid state - const auto last_zxid = uncommitted_state.deltas.back().zxid; - if (!uncommitted_state.deltas.empty() && last_zxid > rollback_zxid) - throw DB::Exception{DB::ErrorCodes::LOGICAL_ERROR, "Invalid state of deltas found while trying to rollback request. Last ZXID ({}) is larger than the requested ZXID ({})", last_zxid, rollback_zxid}; - - std::erase_if(uncommitted_state.deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; }); -} void KeeperStorage::clearDeadWatches(int64_t session_id) { diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 7d26ae24dd9..ccbddcf6e19 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -1,14 +1,14 @@ #pragma once -#include -#include -#include +#include +#include +#include #include +#include #include #include -#include -#include -#include +#include +#include #include @@ -29,6 +29,7 @@ struct KeeperStorageSnapshot; class KeeperStorage { public: + struct Node { uint64_t acl_id = 0; /// 0 -- no ACL by default @@ -40,18 +41,26 @@ public: Node() : size_bytes(sizeof(Node)) { } /// Object memory size - uint64_t sizeInBytes() const { return size_bytes; } + uint64_t sizeInBytes() const + { + return size_bytes; + } void setData(String new_data); - const auto & getData() const noexcept { return data; } + const auto & getData() const noexcept + { + return data; + } void addChild(StringRef child_path); void removeChild(StringRef child_path); - const auto & getChildren() const noexcept { return children; } - + const auto & getChildren() const noexcept + { + return children; + } private: String data; ChildrenSet children{}; @@ -76,7 +85,10 @@ public: std::string scheme; std::string id; - bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; } + bool operator==(const AuthID & other) const + { + return scheme == other.scheme && id == other.id; + } }; using RequestsForSessions = std::vector; @@ -100,146 +112,6 @@ public: /// container. Container container; - // Applying ZooKeeper request to storage consists of two steps: - // - preprocessing which, instead of applying the changes directly to storage, - // generates deltas with those changes, denoted with the request ZXID - // - processing which applies deltas with the correct ZXID to the storage - // - // Delta objects allow us two things: - // - fetch the latest, uncommitted state of an object by getting the committed - // state of that same object from the storage and applying the deltas - // in the same order as they are defined - // - quickly commit the changes to the storage - struct CreateNodeDelta - { - Coordination::Stat stat; - bool is_ephemeral; - bool is_sequental; - Coordination::ACLs acls; - String data; - }; - - struct RemoveNodeDelta - { - int32_t version{-1}; - }; - - struct UpdateNodeDelta - { - std::function update_fn; - int32_t version{-1}; - }; - - struct SetACLDelta - { - Coordination::ACLs acls; - int32_t version{-1}; - }; - - struct ErrorDelta - { - Coordination::Error error; - }; - - struct FailedMultiDelta - { - std::vector error_codes; - }; - - // Denotes end of a subrequest in multi request - struct SubDeltaEnd - { - }; - - struct AddAuthDelta - { - int64_t session_id; - AuthID auth_id; - }; - - using Operation - = std::variant; - - struct Delta - { - Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { } - - Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { } - - Delta(int64_t zxid_, Operation subdelta) : Delta("", zxid_, subdelta) { } - - String path; - int64_t zxid; - Operation operation; - }; - - struct UncommittedState - { - explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { } - - template - void applyDeltas(StringRef path, const Visitor & visitor) const - { - for (const auto & delta : deltas) - { - if (path.empty() || delta.path == path) - std::visit(visitor, delta.operation); - } - } - - bool hasACL(int64_t session_id, bool is_local, std::function predicate) - { - for (const auto & session_auth : storage.session_and_auth[session_id]) - { - if (predicate(session_auth)) - return true; - } - - if (is_local) - return false; - - - for (const auto & delta : deltas) - { - if (const auto * auth_delta = std::get_if(&delta.operation); - auth_delta && auth_delta->session_id == session_id && predicate(auth_delta->auth_id)) - return true; - } - - return false; - } - - std::shared_ptr getNode(StringRef path); - bool hasNode(StringRef path) const; - Coordination::ACLs getACLs(StringRef path) const; - - std::deque deltas; - KeeperStorage & storage; - }; - - UncommittedState uncommitted_state{*this}; - - Coordination::Error commit(int64_t zxid, int64_t session_id); - - // Create node in the storage - // Returns false if it failed to create the node, true otherwise - // We don't care about the exact failure because we should've caught it during preprocessing - bool createNode( - const std::string & path, - String data, - const Coordination::Stat & stat, - bool is_sequental, - bool is_ephemeral, - Coordination::ACLs node_acls, - int64_t session_id); - - // Remove node in the storage - // Returns false if it failed to remove the node, true otherwise - // We don't care about the exact failure because we should've caught it during preprocessing - bool removeNode(const std::string & path, int32_t version); - - bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local); - /// Mapping session_id -> set of ephemeral nodes paths Ephemerals ephemerals; /// Mapping session_id -> set of watched nodes paths @@ -258,12 +130,15 @@ public: /// Currently active watches (node_path -> subscribed sessions) Watches watches; - Watches list_watches; /// Watches for 'list' request (watches on children). + Watches list_watches; /// Watches for 'list' request (watches on children). void clearDeadWatches(int64_t session_id); /// Get current zxid - int64_t getZXID() const { return zxid; } + int64_t getZXID() const + { + return zxid; + } const String superdigest; @@ -287,53 +162,78 @@ public: /// Process user request and return response. /// check_acl = false only when converting data from ZooKeeper. - ResponsesForSessions processRequest( - const Coordination::ZooKeeperRequestPtr & request, - int64_t session_id, - int64_t time, - std::optional new_last_zxid, - bool check_acl = true, - bool is_local = false); - void preprocessRequest( - const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid, bool check_acl = true); - void rollbackRequest(int64_t rollback_zxid); + ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl = true); void finalize(); /// Set of methods for creating snapshots /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version. - void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); } + void enableSnapshotMode(size_t up_to_version) + { + container.enableSnapshotMode(up_to_version); + + } /// Turn off snapshot mode. - void disableSnapshotMode() { container.disableSnapshotMode(); } + void disableSnapshotMode() + { + container.disableSnapshotMode(); + } - Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); } + Container::const_iterator getSnapshotIteratorBegin() const + { + return container.begin(); + } /// Clear outdated data from internal container. - void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); } + void clearGarbageAfterSnapshot() + { + container.clearOutdatedNodes(); + } /// Get all active sessions - const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; } + const SessionAndTimeout & getActiveSessions() const + { + return session_and_timeout; + } /// Get all dead sessions - std::vector getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); } + std::vector getDeadSessions() const + { + return session_expiry_queue.getExpiredSessions(); + } /// Introspection functions mostly used in 4-letter commands - uint64_t getNodesCount() const { return container.size(); } + uint64_t getNodesCount() const + { + return container.size(); + } - uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); } + uint64_t getApproximateDataSize() const + { + return container.getApproximateDataSize(); + } - uint64_t getArenaDataSize() const { return container.keyArenaSize(); } + uint64_t getArenaDataSize() const + { + return container.keyArenaSize(); + } uint64_t getTotalWatchesCount() const; - uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); } + uint64_t getWatchedPathsCount() const + { + return watches.size() + list_watches.size(); + } uint64_t getSessionsWithWatchesCount() const; - uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); } + uint64_t getSessionWithEphemeralNodesCount() const + { + return ephemerals.size(); + } uint64_t getTotalEphemeralNodesCount() const; void dumpWatches(WriteBufferFromOwnString & buf) const; diff --git a/src/Coordination/WriteBufferFromNuraftBuffer.h b/src/Coordination/WriteBufferFromNuraftBuffer.h index c9ca1e2a227..d52049edcff 100644 --- a/src/Coordination/WriteBufferFromNuraftBuffer.h +++ b/src/Coordination/WriteBufferFromNuraftBuffer.h @@ -12,6 +12,7 @@ public: WriteBufferFromNuraftBuffer(); nuraft::ptr getBuffer(); + bool isFinished() const { return finalized; } ~WriteBufferFromNuraftBuffer() override; diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 4d1745edc6a..e59c67329ff 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -520,7 +520,6 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request)) return true; - storage.preprocessRequest(request, session_id, time, zxid, /* check_acl = */ false); storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false); } } diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 2742f48f49e..cf4d1eaf9f2 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1,8 +1,6 @@ #include #include -#include "Common/ZooKeeper/IKeeper.h" -#include "Coordination/KeeperStorage.h" #include "config_core.h" #if USE_NURAFT @@ -1263,7 +1261,6 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint changelog.append(entry); changelog.end_of_append_batch(0, 0); - state_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); state_machine->commit(i, changelog.entry_at(i)->get_buf()); bool snapshot_created = false; if (i % settings->snapshot_distance == 0) @@ -1308,7 +1305,6 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i) { - restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); restore_machine->commit(i, changelog.entry_at(i)->get_buf()); } @@ -1411,7 +1407,6 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) request_c->path = "/hello"; request_c->is_ephemeral = true; auto entry_c = getLogEntryFromZKRequest(0, 1, request_c); - state_machine->pre_commit(1, entry_c->get_buf()); state_machine->commit(1, entry_c->get_buf()); const auto & storage = state_machine->getStorage(); @@ -1420,7 +1415,6 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) request_d->path = "/hello"; /// Delete from other session auto entry_d = getLogEntryFromZKRequest(0, 2, request_d); - state_machine->pre_commit(2, entry_d->get_buf()); state_machine->commit(2, entry_d->get_buf()); EXPECT_EQ(storage.ephemerals.size(), 0); @@ -1783,130 +1777,6 @@ TEST_P(CoordinationTest, TestLogGap) EXPECT_EQ(changelog1.next_slot(), 61); } -template -ResponseType getSingleResponse(const auto & responses) -{ - EXPECT_FALSE(responses.empty()); - return dynamic_cast(*responses[0].response); -} - -TEST_P(CoordinationTest, TestUncommittedStateBasicCrud) -{ - using namespace DB; - using namespace Coordination; - - DB::KeeperStorage storage{500, ""}; - - constexpr std::string_view path = "/test"; - - const auto get_committed_data = [&]() -> std::optional - { - auto request = std::make_shared(); - request->path = path; - auto responses = storage.processRequest(request, 0, 0, std::nullopt, true, true); - const auto & get_response = getSingleResponse(responses); - - if (get_response.error != Error::ZOK) - return std::nullopt; - - return get_response.data; - }; - - const auto preprocess_get = [&](int64_t zxid) - { - auto get_request = std::make_shared(); - get_request->path = path; - storage.preprocessRequest(get_request, 0, 0, zxid); - return get_request; - }; - - const auto create_request = std::make_shared(); - create_request->path = path; - create_request->data = "initial_data"; - storage.preprocessRequest(create_request, 0, 0, 1); - storage.preprocessRequest(create_request, 0, 0, 2); - - ASSERT_FALSE(get_committed_data()); - - const auto after_create_get = preprocess_get(3); - - ASSERT_FALSE(get_committed_data()); - - const auto set_request = std::make_shared(); - set_request->path = path; - set_request->data = "new_data"; - storage.preprocessRequest(set_request, 0, 0, 4); - - const auto after_set_get = preprocess_get(5); - - ASSERT_FALSE(get_committed_data()); - - const auto remove_request = std::make_shared(); - remove_request->path = path; - storage.preprocessRequest(remove_request, 0, 0, 6); - storage.preprocessRequest(remove_request, 0, 0, 7); - - const auto after_remove_get = preprocess_get(8); - - ASSERT_FALSE(get_committed_data()); - - { - const auto responses = storage.processRequest(create_request, 0, 0, 1); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZOK); - } - - { - const auto responses = storage.processRequest(create_request, 0, 0, 2); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZNODEEXISTS); - } - - { - const auto responses = storage.processRequest(after_create_get, 0, 0, 3); - const auto & get_response = getSingleResponse(responses); - ASSERT_EQ(get_response.error, Error::ZOK); - ASSERT_EQ(get_response.data, "initial_data"); - } - - ASSERT_EQ(get_committed_data(), "initial_data"); - - { - const auto responses = storage.processRequest(set_request, 0, 0, 4); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZOK); - } - - { - const auto responses = storage.processRequest(after_set_get, 0, 0, 5); - const auto & get_response = getSingleResponse(responses); - ASSERT_EQ(get_response.error, Error::ZOK); - ASSERT_EQ(get_response.data, "new_data"); - } - - ASSERT_EQ(get_committed_data(), "new_data"); - - { - const auto responses = storage.processRequest(remove_request, 0, 0, 6); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZOK); - } - - { - const auto responses = storage.processRequest(remove_request, 0, 0, 7); - const auto & create_response = getSingleResponse(responses); - ASSERT_EQ(create_response.error, Error::ZNONODE); - } - - { - const auto responses = storage.processRequest(after_remove_get, 0, 0, 8); - const auto & get_response = getSingleResponse(responses); - ASSERT_EQ(get_response.error, Error::ZNONODE); - } - - ASSERT_FALSE(get_committed_data()); -} - INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, CoordinationTest,