diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 0aa6b6d7ca0..9b2c8947d95 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -357,7 +357,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi { /// Pure local request, just process it with storage std::lock_guard lock(storage_and_responses_lock); - auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt); + auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt, true /*check_acl*/, true /*is_local*/); for (const auto & response : responses) if (!responses_queue.push(response)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id); diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 3e30c1be26f..2f7f4357f8c 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -446,6 +446,14 @@ struct KeeperStorageRequestProcessor { return {}; } + + // process the request using locally committed data + virtual Coordination::ZooKeeperResponsePtr + processLocal(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/) const + { + throw Exception{DB::ErrorCodes::LOGICAL_ERROR, "Cannot process the request locally"}; + } + virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { @@ -499,11 +507,11 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr auto path = zk_request->getPath(); auto parent_path = parentPath(path); - auto it = container.find(parent_path); - if (it == container.end()) + auto node_it = container.find(parent_path); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -587,8 +595,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperCreateResponse & response = dynamic_cast(*response_ptr); - auto result = storage.commit(zxid, session_id); - if (result != Coordination::Error::ZOK) + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) { response.error = result; return response_ptr; @@ -615,11 +622,11 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto node_it = container.find(zk_request->getPath()); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -640,28 +647,56 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce return {}; } - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override + template + Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const { - auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); - auto it = container.find(request.path); - if (it == container.end()) + if constexpr (local) { - response.error = Coordination::Error::ZNONODE; + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + { + response.error = result; + return response_ptr; + } + } + + const auto on_error = [&]([[maybe_unused]] const auto error_code) + { + if constexpr (local) + response.error = error_code; + else + fail(); + }; + + auto & container = storage.container; + auto node_it = container.find(request.path); + if (node_it == container.end()) + { + on_error(Coordination::Error::ZNONODE); } else { - response.stat = it->value.stat; - response.data = it->value.getData(); + response.stat = node_it->value.stat; + response.data = node_it->value.getData(); response.error = Coordination::Error::ZOK; } return response_ptr; } + + + Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } + + Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } }; struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor @@ -669,11 +704,11 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(parentPath(zk_request->getPath())); - if (it == container.end()) + auto node_it = container.find(parentPath(zk_request->getPath())); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -740,13 +775,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperRemoveResponse & response = dynamic_cast(*response_ptr); - auto result = storage.commit(zxid, session_id); - if (result != Coordination::Error::ZOK) - { - response.error = result; - return response_ptr; - } - + response.error = storage.commit(zxid, session_id); return response_ptr; } @@ -772,28 +801,54 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr return {}; } - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */, int64_t /* time */) const override + template + Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const { - auto & container = storage.container; - Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperExistsResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperExistsRequest & request = dynamic_cast(*zk_request); - auto it = container.find(request.path); - if (it != container.end()) + if constexpr (local) { - response.stat = it->value.stat; - response.error = Coordination::Error::ZOK; + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + { + response.error = result; + return response_ptr; + } + } + + const auto on_error = [&]([[maybe_unused]] const auto error_code) + { + if constexpr (local) + response.error = error_code; + else + fail(); + }; + + auto & container = storage.container; + auto node_it = container.find(request.path); + if (node_it == container.end()) + { + on_error(Coordination::Error::ZNONODE); } else { - response.error = Coordination::Error::ZNONODE; + response.stat = node_it->value.stat; + response.error = Coordination::Error::ZOK; } return response_ptr; } + + Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } + + Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } }; struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor @@ -801,11 +856,11 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto node_it = container.find(zk_request->getPath()); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -857,18 +912,17 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce Coordination::ZooKeeperSetResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperSetRequest & request = dynamic_cast(*zk_request); - auto result = storage.commit(zxid, session_id); - if (result != Coordination::Error::ZOK) + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) { response.error = result; return response_ptr; } - auto it = container.find(request.path); - if (it == container.end()) + auto node_it = container.find(request.path); + if (node_it == container.end()) fail(); - response.stat = it->value.stat; + response.stat = node_it->value.stat; response.error = Coordination::Error::ZOK; return response_ptr; @@ -886,11 +940,11 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto node_it = container.find(zk_request->getPath()); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -910,18 +964,36 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc return {}; } - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override + + template + Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const { - auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperListResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperListRequest & request = dynamic_cast(*zk_request); - auto it = container.find(request.path); - if (it == container.end()) + if constexpr (local) { - response.error = Coordination::Error::ZNONODE; + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + { + response.error = result; + return response_ptr; + } + } + + const auto on_error = [&]([[maybe_unused]] const auto error_code) + { + if constexpr (local) + response.error = error_code; + else + fail(); + }; + + auto & container = storage.container; + auto node_it = container.find(request.path); + if (node_it == container.end()) + { + on_error(Coordination::Error::ZNONODE); } else { @@ -929,18 +1001,28 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc if (path_prefix.empty()) throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); - const auto & children = it->value.getChildren(); + const auto & children = node_it->value.getChildren(); response.names.reserve(children.size()); for (const auto child : children) response.names.push_back(child.toString()); - response.stat = it->value.stat; + response.stat = node_it->value.stat; response.error = Coordination::Error::ZOK; } return response_ptr; } + + Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } + + Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } }; struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor @@ -948,11 +1030,11 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto node_it = container.find(zk_request->getPath()); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -976,22 +1058,39 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro return {}; } - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override + template + Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const { - auto & container = storage.container; - Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperCheckResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperCheckRequest & request = dynamic_cast(*zk_request); - auto it = container.find(request.path); - if (it == container.end()) + + if constexpr (local) { - response.error = Coordination::Error::ZNONODE; + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + { + response.error = result; + return response_ptr; + } } - else if (request.version != -1 && request.version != it->value.stat.version) + + const auto on_error = [&]([[maybe_unused]] const auto error_code) { - response.error = Coordination::Error::ZBADVERSION; + if constexpr (local) + response.error = error_code; + else + fail(); + }; + + auto & container = storage.container; + auto node_it = container.find(request.path); + if (node_it == container.end()) + { + on_error(Coordination::Error::ZNONODE); + } + else if (request.version != -1 && request.version != node_it->value.stat.version) + { + on_error(Coordination::Error::ZBADVERSION); } else { @@ -1000,6 +1099,16 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro return response_ptr; } + + Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } + + Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } }; @@ -1008,11 +1117,11 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto node_it = container.find(zk_request->getPath()); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -1053,17 +1162,16 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr Coordination::ZooKeeperSetACLResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); - auto result = storage.commit(zxid, session_id); - if (result != Coordination::Error::ZOK) + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) { response.error = result; return response_ptr; } - auto it = storage.container.find(request.path); - if (it == storage.container.end()) + auto node_it = storage.container.find(request.path); + if (node_it == storage.container.end()) fail(); - response.stat = it->value.stat; + response.stat = node_it->value.stat; response.error = Coordination::Error::ZOK; return response_ptr; @@ -1075,11 +1183,11 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto node_it = container.find(zk_request->getPath()); + if (node_it == container.end()) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id); if (node_acls.empty()) return true; @@ -1100,26 +1208,54 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr return {}; } - Coordination::ZooKeeperResponsePtr - process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override + template + Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetACLResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperGetACLRequest & request = dynamic_cast(*zk_request); - auto & container = storage.container; - auto it = container.find(request.path); - if (it == container.end()) + + if constexpr (local) { - response.error = Coordination::Error::ZNONODE; + if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK) + { + response.error = result; + return response_ptr; + } + } + + const auto on_error = [&]([[maybe_unused]] const auto error_code) + { + if constexpr (local) + response.error = error_code; + else + fail(); + }; + + auto & container = storage.container; + auto node_it = container.find(request.path); + if (node_it == container.end()) + { + on_error(Coordination::Error::ZNONODE); } else { - response.stat = it->value.stat; - response.acl = storage.acl_map.convertNumber(it->value.acl_id); + response.stat = node_it->value.stat; + response.acl = storage.acl_map.convertNumber(node_it->value.acl_id); } return response_ptr; } + + Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } + + Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + return processImpl(storage, zxid, session_id, time); + } }; struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor @@ -1165,6 +1301,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro std::vector preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { + // manually add deltas so that the result of previous request in the transaction is used in the next request auto & saved_deltas = storage.current_nodes.deltas; std::vector response_errors; @@ -1211,10 +1348,9 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro return response_ptr; } - size_t i = 0; - for (const auto & concrete_request : concrete_requests) + for (size_t i = 0; i < concrete_requests.size(); ++i) { - auto cur_response = concrete_request->process(storage, zxid, session_id, time); + auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time); while (!deltas.empty()) { @@ -1228,8 +1364,39 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } response.responses[i] = cur_response; + } - ++i; + response.error = Coordination::Error::ZOK; + return response_ptr; + } + + Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override + { + Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); + Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); + + for (size_t i = 0; i < concrete_requests.size(); ++i) + { + auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time); + + response.responses[i] = cur_response; + if (cur_response->error != Coordination::Error::ZOK) + { + for (size_t j = 0; j <= i; ++j) + { + auto response_error = response.responses[j]->error; + response.responses[j] = std::make_shared(); + response.responses[j]->error = response_error; + } + + for (size_t j = i + 1; j < response.responses.size(); ++j) + { + response.responses[j] = std::make_shared(); + response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY; + } + + return response_ptr; + } } response.error = Coordination::Error::ZOK; @@ -1389,7 +1556,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( int64_t session_id, int64_t time, std::optional new_last_zxid, - bool check_acl) + bool check_acl, + bool is_local) { KeeperStorage::ResponsesForSessions results; if (new_last_zxid) @@ -1462,7 +1630,11 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( } else { - response = request_processor->process(*this, zxid, session_id, time); + if (is_local) + response = request_processor->processLocal(*this, zxid, session_id, time); + else + response = request_processor->process(*this, zxid, session_id, time); + std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 007c0e60b1d..e25539c7bad 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -1,14 +1,14 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include #include #include +#include +#include +#include +#include +#include +#include +#include #include @@ -29,7 +29,6 @@ struct KeeperStorageSnapshot; class KeeperStorage { public: - struct Node { uint64_t acl_id = 0; /// 0 -- no ACL by default @@ -41,26 +40,18 @@ public: Node() : size_bytes(sizeof(Node)) { } /// Object memory size - uint64_t sizeInBytes() const - { - return size_bytes; - } + uint64_t sizeInBytes() const { return size_bytes; } void setData(String new_data); - const auto & getData() const noexcept - { - return data; - } + const auto & getData() const noexcept { return data; } void addChild(StringRef child_path); void removeChild(StringRef child_path); - const auto & getChildren() const noexcept - { - return children; - } + const auto & getChildren() const noexcept { return children; } + private: String data; ChildrenSet children{}; @@ -85,10 +76,7 @@ public: std::string scheme; std::string id; - bool operator==(const AuthID & other) const - { - return scheme == other.scheme && id == other.id; - } + bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; } }; using RequestsForSessions = std::vector; @@ -112,7 +100,7 @@ public: /// container. Container container; - struct CreateNodeDelta + struct CreateNodeDelta { Coordination::Stat stat; bool is_ephemeral; @@ -152,27 +140,18 @@ public: { }; - using Operation = std::variant; + using Operation + = std::variant; struct Delta { - Delta(String path_, int64_t zxid_, Operation operation_) - : path(std::move(path_)) - , zxid(zxid_) - , operation(std::move(operation_)) - {} + Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { } - Delta(int64_t zxid_, Coordination::Error error) - : Delta("", zxid_, ErrorDelta{error}) - {} + Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { } - Delta(int64_t zxid_, SubDeltaEnd subdelta) - : Delta("", zxid_, subdelta) - {} + Delta(int64_t zxid_, SubDeltaEnd subdelta) : Delta("", zxid_, subdelta) { } - Delta(int64_t zxid_, FailedMultiDelta failed_multi) - : Delta("", zxid_, failed_multi) - {} + Delta(int64_t zxid_, FailedMultiDelta failed_multi) : Delta("", zxid_, failed_multi) { } String path; int64_t zxid; @@ -181,7 +160,7 @@ public: struct CurrentNodes { - explicit CurrentNodes(KeeperStorage & storage_) : storage(storage_) {} + explicit CurrentNodes(KeeperStorage & storage_) : storage(storage_) { } template void applyDeltas(StringRef path, const Visitor & visitor) const @@ -206,7 +185,14 @@ public: Coordination::Error commit(int64_t zxid, int64_t session_id); - bool createNode(const std::string & path, String data, const Coordination::Stat & stat, bool is_sequental, bool is_ephemeral, Coordination::ACLs node_acls, int64_t session_id); + bool createNode( + const std::string & path, + String data, + const Coordination::Stat & stat, + bool is_sequental, + bool is_ephemeral, + Coordination::ACLs node_acls, + int64_t session_id); bool removeNode(const std::string & path, int32_t version); /// Mapping session_id -> set of ephemeral nodes paths @@ -228,15 +214,12 @@ public: /// Currently active watches (node_path -> subscribed sessions) Watches watches; - Watches list_watches; /// Watches for 'list' request (watches on children). + Watches list_watches; /// Watches for 'list' request (watches on children). void clearDeadWatches(int64_t session_id); /// Get current zxid - int64_t getZXID() const - { - return zxid; - } + int64_t getZXID() const { return zxid; } const String superdigest; @@ -260,7 +243,13 @@ public: /// Process user request and return response. /// check_acl = false only when converting data from ZooKeeper. - ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl = true); + ResponsesForSessions processRequest( + const Coordination::ZooKeeperRequestPtr & request, + int64_t session_id, + int64_t time, + std::optional new_last_zxid, + bool check_acl = true, + bool is_local = false); void preprocessRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid); void finalize(); @@ -268,70 +257,37 @@ public: /// Set of methods for creating snapshots /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version. - void enableSnapshotMode(size_t up_to_version) - { - container.enableSnapshotMode(up_to_version); - } + void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); } /// Turn off snapshot mode. - void disableSnapshotMode() - { - container.disableSnapshotMode(); - } + void disableSnapshotMode() { container.disableSnapshotMode(); } - Container::const_iterator getSnapshotIteratorBegin() const - { - return container.begin(); - } + Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); } /// Clear outdated data from internal container. - void clearGarbageAfterSnapshot() - { - container.clearOutdatedNodes(); - } + void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); } /// Get all active sessions - const SessionAndTimeout & getActiveSessions() const - { - return session_and_timeout; - } + const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; } /// Get all dead sessions - std::vector getDeadSessions() const - { - return session_expiry_queue.getExpiredSessions(); - } + std::vector getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); } /// Introspection functions mostly used in 4-letter commands - uint64_t getNodesCount() const - { - return container.size(); - } + uint64_t getNodesCount() const { return container.size(); } - uint64_t getApproximateDataSize() const - { - return container.getApproximateDataSize(); - } + uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); } - uint64_t getArenaDataSize() const - { - return container.keyArenaSize(); - } + uint64_t getArenaDataSize() const { return container.keyArenaSize(); } uint64_t getTotalWatchesCount() const; - uint64_t getWatchedPathsCount() const - { - return watches.size() + list_watches.size(); - } + uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); } uint64_t getSessionsWithWatchesCount() const; - uint64_t getSessionWithEphemeralNodesCount() const - { - return ephemerals.size(); - } + uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); } uint64_t getTotalEphemeralNodesCount() const; void dumpWatches(WriteBufferFromOwnString & buf) const;