#pragma once #include #include #include #include #include #include #include #include #include namespace DB { using namespace DB; struct KeeperStorageRequest; using KeeperStorageRequestPtr = std::shared_ptr; using ResponseCallback = std::function; using ChildrenSet = std::unordered_set; using SessionAndTimeout = std::unordered_map; struct KeeperStorageSnapshot; class KeeperStorage { public: int64_t session_id_counter{1}; struct Node { String data; Coordination::ACLs acls{}; bool is_sequental = false; Coordination::Stat stat{}; int32_t seq_num = 0; ChildrenSet children{}; }; struct ResponseForSession { int64_t session_id; Coordination::ZooKeeperResponsePtr response; }; using ResponsesForSessions = std::vector; struct RequestForSession { int64_t session_id; Coordination::ZooKeeperRequestPtr request; }; using RequestsForSessions = std::vector; using Container = SnapshotableHashTable; using Ephemerals = std::unordered_map>; using SessionAndWatcher = std::unordered_map>; using SessionIDs = std::vector; /// Just vector of SHA1 from user:password using AuthIDs = std::vector; using SessionAndAuth = std::unordered_map; SessionAndAuth session_and_auth; using Watches = std::map; Container container; Ephemerals ephemerals; SessionAndWatcher sessions_and_watchers; SessionExpiryQueue session_expiry_queue; SessionAndTimeout session_and_timeout; int64_t zxid{0}; bool finalized{false}; Watches watches; Watches list_watches; /// Watches for 'list' request (watches on children). void clearDeadWatches(int64_t session_id); int64_t getZXID() const { return zxid; } public: KeeperStorage(int64_t tick_time_ms); int64_t getSessionID(int64_t session_timeout_ms) { auto result = session_id_counter++; session_and_timeout.emplace(result, session_timeout_ms); session_expiry_queue.update(result, session_timeout_ms); return result; } void addSessionID(int64_t session_id, int64_t session_timeout_ms) { session_and_timeout.emplace(session_id, session_timeout_ms); session_expiry_queue.update(session_id, session_timeout_ms); } ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional new_last_zxid); void finalize(); void enableSnapshotMode() { container.enableSnapshotMode(); } void disableSnapshotMode() { container.disableSnapshotMode(); } Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); } void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); } const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; } std::unordered_set getDeadSessions() { return session_expiry_queue.getExpiredSessions(); } }; using KeeperStoragePtr = std::unique_ptr; }