#pragma once #include #include #include #include #include #include #include #include #include namespace DB { struct KeeperStorageRequestProcessor; using KeeperStorageRequestProcessorPtr = std::shared_ptr; using ResponseCallback = std::function; using ChildrenSet = std::unordered_set; using SessionAndTimeout = std::unordered_map; struct KeeperStorageSnapshot; /// Keeper state machine almost equal to the ZooKeeper's state machine. /// Implements all logic of operations, data changes, sessions allocation. /// In-memory and not thread safe. class KeeperStorage { public: struct Node { String data; uint64_t acl_id = 0; /// 0 -- no ACL by default bool is_sequental = false; Coordination::Stat stat{}; int32_t seq_num = 0; ChildrenSet children{}; /// Object memory size uint64_t sizeInBytes() const; }; struct ResponseForSession { int64_t session_id; Coordination::ZooKeeperResponsePtr response; }; using ResponsesForSessions = std::vector; struct RequestForSession { int64_t session_id; Coordination::ZooKeeperRequestPtr request; }; struct AuthID { std::string scheme; std::string id; bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; } }; using RequestsForSessions = std::vector; using Container = SnapshotableHashTable; using Ephemerals = std::unordered_map>; using SessionAndWatcher = std::unordered_map>; using SessionIDs = std::vector; /// Just vector of SHA1 from user:password using AuthIDs = std::vector; using SessionAndAuth = std::unordered_map; using Watches = std::map; public: int64_t session_id_counter{1}; SessionAndAuth session_and_auth; /// Main hashtable with nodes. Contain all information about data. /// All other structures expect session_and_timeout can be restored from /// container. Container container; /// Mapping session_id -> set of ephemeral nodes paths Ephemerals ephemerals; /// Mapping sessuib_id -> set of watched nodes paths SessionAndWatcher sessions_and_watchers; /// Expiration queue for session, allows to get dead sessions at some point of time SessionExpiryQueue session_expiry_queue; /// All active sessions with timeout SessionAndTimeout session_and_timeout; /// ACLMap for more compact ACLs storage inside nodes. ACLMap acl_map; /// Global id of all requests applied to storage int64_t zxid{0}; bool finalized{false}; /// Currently active watches (node_path -> subscribed sessions) Watches watches; Watches list_watches; /// Watches for 'list' request (watches on children). void clearDeadWatches(int64_t session_id); /// Get current zxid int64_t getZXID() const { return zxid; } const String superdigest; public: KeeperStorage(int64_t tick_time_ms, const String & superdigest_); /// Allocate new session id with the specified timeouts int64_t getSessionID(int64_t session_timeout_ms) { auto result = session_id_counter++; session_and_timeout.emplace(result, session_timeout_ms); session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms); return result; } /// Add session id. Used when restoring KeeperStorage from snapshot. void addSessionID(int64_t session_id, int64_t session_timeout_ms) { session_and_timeout.emplace(session_id, session_timeout_ms); session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms); } /// Process user request and return response. /// check_acl = false only when converting data from ZooKeeper. ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional new_last_zxid, bool check_acl = true); void finalize(); /// Set of methods for creating snapshots /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version. void enableSnapshotMode() { container.enableSnapshotMode(); } /// Turn off snapshot mode. void disableSnapshotMode() { container.disableSnapshotMode(); } Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); } /// Clear outdated data from internal container. void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); } /// Get all active sessions const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; } /// Get all dead sessions std::vector getDeadSessions() { return session_expiry_queue.getExpiredSessions(); } /// Introspection functions mostly used in 4-letter commands uint64_t getNodesCount() const { return container.size(); } uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); } uint64_t getTotalWatchesCount() const; uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); } uint64_t getSessionsWithWatchesCount() const; uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); } uint64_t getTotalEphemeralNodesCount() const; void dumpWatches(WriteBufferFromOwnString & buf) const; void dumpWatchesByPath(WriteBufferFromOwnString & buf) const; void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const; }; using KeeperStoragePtr = std::unique_ptr; }