ClickHouse/src/Coordination/KeeperStorage.h

416 lines
13 KiB
C++
Raw Normal View History

2020-10-30 14:16:47 +00:00
#pragma once
2022-05-09 08:32:25 +00:00
#include <unordered_map>
#include <vector>
#include <Coordination/ACLMap.h>
2022-05-09 08:32:25 +00:00
#include <Coordination/SessionExpiryQueue.h>
2021-02-26 13:53:34 +00:00
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
2022-05-09 08:32:25 +00:00
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
2020-10-30 14:16:47 +00:00
2022-01-21 14:26:50 +00:00
#include <absl/container/flat_hash_set.h>
2020-10-30 14:16:47 +00:00
namespace DB
2020-10-30 14:16:47 +00:00
{
struct KeeperStorageRequestProcessor;
using KeeperStorageRequestProcessorPtr = std::shared_ptr<KeeperStorageRequestProcessor>;
2020-11-10 13:43:10 +00:00
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
2022-01-21 14:26:50 +00:00
using ChildrenSet = absl::flat_hash_set<StringRef, StringRefHash>;
2021-03-01 13:33:34 +00:00
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
2021-03-29 08:24:56 +00:00
struct KeeperStorageSnapshot;
2020-10-30 14:16:47 +00:00
/// Keeper state machine almost equal to the ZooKeeper's state machine.
/// Implements all logic of operations, data changes, sessions allocation.
/// In-memory and not thread safe.
2021-03-29 08:24:56 +00:00
class KeeperStorage
2020-10-30 14:16:47 +00:00
{
2020-10-30 19:57:30 +00:00
public:
2020-10-30 14:16:47 +00:00
struct Node
{
uint64_t acl_id = 0; /// 0 -- no ACL by default
2020-10-30 14:16:47 +00:00
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
2021-12-09 10:05:11 +00:00
uint64_t size_bytes; // save size to avoid calculate every time
2021-10-27 12:26:42 +00:00
2022-04-05 06:27:03 +00:00
Node() : size_bytes(sizeof(Node)) { }
2021-11-19 09:30:58 +00:00
/// Object memory size
2022-05-09 08:32:25 +00:00
uint64_t sizeInBytes() const { return size_bytes; }
2022-04-05 06:27:03 +00:00
void setData(String new_data);
2022-05-09 08:32:25 +00:00
const auto & getData() const noexcept { return data; }
2022-04-05 06:27:03 +00:00
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
2022-05-09 08:32:25 +00:00
const auto & getChildren() const noexcept { return children; }
2022-05-13 13:43:42 +00:00
void invalidateDigestCache() const;
UInt64 getDigest(std::string_view path) const;
void setDigest(UInt64 digest);
2022-04-05 06:27:03 +00:00
private:
String data;
ChildrenSet children{};
2022-05-13 13:43:42 +00:00
mutable std::optional<UInt64> cached_digest;
2020-10-30 14:16:47 +00:00
};
2022-05-16 12:12:29 +00:00
enum DigestVersion : uint8_t
{
NO_DIGEST = 0,
V0 = 1
};
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V0;
2021-01-19 14:22:28 +00:00
struct ResponseForSession
{
int64_t session_id;
Coordination::ZooKeeperResponsePtr response;
};
2021-01-19 14:45:45 +00:00
using ResponsesForSessions = std::vector<ResponseForSession>;
2021-01-19 14:22:28 +00:00
2022-05-16 12:12:29 +00:00
struct Digest
{
DigestVersion version{DigestVersion::NO_DIGEST};
uint64_t value{0};
};
static bool checkDigest(const Digest & first, const Digest & second)
{
if (first.version != second.version)
return true;
if (first.version == DigestVersion::NO_DIGEST)
return true;
return first.value == second.value;
}
2021-01-19 14:22:28 +00:00
struct RequestForSession
2020-11-26 14:57:32 +00:00
{
int64_t session_id;
2022-01-06 13:14:45 +00:00
int64_t time;
2021-01-19 14:22:28 +00:00
Coordination::ZooKeeperRequestPtr request;
2022-05-12 08:58:36 +00:00
int64_t zxid{0};
2022-05-16 12:12:29 +00:00
Digest digest;
2020-11-26 14:57:32 +00:00
};
2021-05-22 16:07:47 +00:00
struct AuthID
{
std::string scheme;
std::string id;
2022-05-09 08:32:25 +00:00
bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; }
2021-05-22 16:07:47 +00:00
};
2021-01-19 14:45:45 +00:00
using RequestsForSessions = std::vector<RequestForSession>;
2021-01-19 14:22:28 +00:00
2021-02-26 13:53:34 +00:00
using Container = SnapshotableHashTable<Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
2021-01-19 14:22:28 +00:00
using SessionIDs = std::vector<int64_t>;
2020-10-30 14:16:47 +00:00
2021-05-21 21:19:22 +00:00
/// Just vector of SHA1 from user:password
2021-05-22 16:07:47 +00:00
using AuthIDs = std::vector<AuthID>;
2021-05-21 21:19:22 +00:00
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
2021-01-19 14:22:28 +00:00
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
2020-10-30 14:16:47 +00:00
2021-11-18 20:17:22 +00:00
int64_t session_id_counter{1};
SessionAndAuth session_and_auth;
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
2020-10-30 14:16:47 +00:00
Container container;
2022-05-17 06:45:51 +00:00
// Applying ZooKeeper request to storage consists of two steps:
// - preprocessing which, instead of applying the changes directly to storage,
// generates deltas with those changes, denoted with the request ZXID
// - processing which applies deltas with the correct ZXID to the storage
//
// Delta objects allow us two things:
// - fetch the latest, uncommitted state of an object by getting the committed
// state of that same object from the storage and applying the deltas
// in the same order as they are defined
// - quickly commit the changes to the storage
2022-05-09 08:32:25 +00:00
struct CreateNodeDelta
2022-05-05 10:32:41 +00:00
{
2022-05-06 12:25:25 +00:00
Coordination::Stat stat;
bool is_ephemeral;
bool is_sequental;
Coordination::ACLs acls;
String data;
};
struct RemoveNodeDelta
{
int32_t version{-1};
};
struct UpdateNodeDelta
{
std::function<void(Node &)> update_fn;
int32_t version{-1};
};
struct SetACLDelta
{
Coordination::ACLs acls;
int32_t version{-1};
};
struct ErrorDelta
{
Coordination::Error error;
};
2022-05-09 07:02:11 +00:00
struct FailedMultiDelta
{
std::vector<Coordination::Error> error_codes;
};
2022-05-11 09:08:39 +00:00
// Denotes end of a subrequest in multi request
2022-05-09 07:02:11 +00:00
struct SubDeltaEnd
{
};
struct AddAuthDelta
{
int64_t session_id;
AuthID auth_id;
};
2022-05-13 13:43:42 +00:00
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
2022-05-06 12:25:25 +00:00
struct Delta
{
2022-05-09 08:32:25 +00:00
Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { }
2022-05-05 10:32:41 +00:00
2022-05-09 08:32:25 +00:00
Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { }
2022-05-09 07:02:11 +00:00
Delta(int64_t zxid_, Operation subdelta) : Delta("", zxid_, subdelta) { }
2022-05-06 12:25:25 +00:00
String path;
2022-05-05 10:32:41 +00:00
int64_t zxid;
2022-05-06 12:25:25 +00:00
Operation operation;
2022-05-05 10:32:41 +00:00
};
2022-05-11 09:08:39 +00:00
struct UncommittedState
2022-05-05 10:32:41 +00:00
{
2022-05-11 09:08:39 +00:00
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
template <typename Visitor>
2022-05-13 13:43:42 +00:00
void applyDeltas(StringRef path, const Visitor & visitor, std::optional<int64_t> last_zxid = std::nullopt) const
2022-05-06 12:25:25 +00:00
{
for (const auto & delta : deltas)
{
2022-05-13 13:43:42 +00:00
if (last_zxid && delta.zxid >= last_zxid)
break;
if (path.empty() || delta.path == path)
2022-05-06 12:25:25 +00:00
std::visit(visitor, delta.operation);
}
}
2022-05-17 06:45:51 +00:00
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
{
for (const auto & session_auth : storage.session_and_auth[session_id])
{
if (predicate(session_auth))
return true;
}
if (is_local)
return false;
for (const auto & delta : deltas)
{
2022-05-17 06:45:51 +00:00
if (const auto * auth_delta = std::get_if<KeeperStorage::AddAuthDelta>(&delta.operation);
auth_delta && auth_delta->session_id == session_id && predicate(auth_delta->auth_id))
return true;
}
return false;
}
2022-05-13 13:43:42 +00:00
std::shared_ptr<Node> getNode(StringRef path, std::optional<int64_t> last_zxid = std::nullopt) const;
2022-05-06 12:25:25 +00:00
bool hasNode(StringRef path) const;
2022-05-09 09:16:05 +00:00
Coordination::ACLs getACLs(StringRef path) const;
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
std::deque<Delta> deltas;
2022-05-05 10:32:41 +00:00
KeeperStorage & storage;
};
2022-05-11 09:08:39 +00:00
UncommittedState uncommitted_state{*this};
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
Coordination::Error commit(int64_t zxid, int64_t session_id);
2022-05-17 06:45:51 +00:00
// Create node in the storage
// Returns false if it failed to create the node, true otherwise
// We don't care about the exact failure because we should've caught it during preprocessing
2022-05-09 08:32:25 +00:00
bool createNode(
const std::string & path,
String data,
const Coordination::Stat & stat,
bool is_sequental,
bool is_ephemeral,
Coordination::ACLs node_acls,
int64_t session_id);
2022-05-17 06:45:51 +00:00
// Remove node in the storage
// Returns false if it failed to remove the node, true otherwise
// We don't care about the exact failure because we should've caught it during preprocessing
2022-05-06 12:25:25 +00:00
bool removeNode(const std::string & path, int32_t version);
bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local);
/// Mapping session_id -> set of ephemeral nodes paths
2020-11-19 16:06:19 +00:00
Ephemerals ephemerals;
2022-04-05 06:27:03 +00:00
/// Mapping session_id -> set of watched nodes paths
2020-11-26 14:57:32 +00:00
SessionAndWatcher sessions_and_watchers;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;
/// All active sessions with timeout
SessionAndTimeout session_and_timeout;
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
2020-10-30 14:16:47 +00:00
/// Global id of all requests applied to storage
2021-01-21 13:53:10 +00:00
int64_t zxid{0};
2022-05-13 13:43:42 +00:00
struct TransactionInfo
{
int64_t zxid;
2022-05-16 12:12:29 +00:00
Digest nodes_digest;
2022-05-13 13:43:42 +00:00
};
std::deque<TransactionInfo> uncommitted_transactions;
2022-05-16 12:12:29 +00:00
uint64_t nodes_digest{0};
2022-05-13 13:43:42 +00:00
2021-01-21 13:53:10 +00:00
bool finalized{false};
2020-10-30 14:16:47 +00:00
/// Currently active watches (node_path -> subscribed sessions)
2020-10-30 14:16:47 +00:00
Watches watches;
2022-05-09 08:32:25 +00:00
Watches list_watches; /// Watches for 'list' request (watches on children).
2020-10-30 14:16:47 +00:00
2021-01-19 14:22:28 +00:00
void clearDeadWatches(int64_t session_id);
2020-10-30 14:16:47 +00:00
2022-05-12 08:58:36 +00:00
/// Get current committed zxid
2022-05-09 08:32:25 +00:00
int64_t getZXID() const { return zxid; }
2020-10-30 14:16:47 +00:00
2022-05-12 08:58:36 +00:00
int64_t getNextZXID() const
{
2022-05-13 13:43:42 +00:00
if (uncommitted_transactions.empty())
2022-05-12 08:58:36 +00:00
return zxid + 1;
2022-05-13 13:43:42 +00:00
return uncommitted_transactions.back().zxid + 1;
2022-05-12 08:58:36 +00:00
}
2022-05-16 12:12:29 +00:00
Digest getNodesDigest(bool committed) const;
2022-05-12 08:58:36 +00:00
2022-05-16 13:08:10 +00:00
const bool digest_enabled;
2021-05-22 16:07:47 +00:00
const String superdigest;
2022-05-16 13:08:10 +00:00
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, bool digest_enabled_ = true);
2021-01-19 14:22:28 +00:00
/// Allocate new session id with the specified timeouts
2021-02-04 08:28:11 +00:00
int64_t getSessionID(int64_t session_timeout_ms)
2021-01-21 14:34:34 +00:00
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
2021-09-02 11:40:54 +00:00
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
2021-01-21 14:34:34 +00:00
}
2021-01-21 20:01:25 +00:00
/// Add session id. Used when restoring KeeperStorage from snapshot.
2021-03-01 13:33:34 +00:00
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
2021-09-02 11:40:54 +00:00
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
2021-03-01 13:33:34 +00:00
}
2022-05-16 12:12:29 +00:00
UInt64 calculateNodesDigest(UInt64 current_digest, int64_t current_zxid) const;
2022-05-13 13:43:42 +00:00
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
2022-05-09 08:32:25 +00:00
ResponsesForSessions processRequest(
const Coordination::ZooKeeperRequestPtr & request,
int64_t session_id,
int64_t time,
std::optional<int64_t> new_last_zxid,
bool check_acl = true,
bool is_local = false);
2022-05-09 09:16:05 +00:00
void preprocessRequest(
2022-05-13 13:43:42 +00:00
const Coordination::ZooKeeperRequestPtr & request,
int64_t session_id,
int64_t time,
int64_t new_last_zxid,
2022-05-16 13:08:10 +00:00
bool check_acl = true,
Digest digest = {DigestVersion::NO_DIGEST, 0});
2022-05-10 13:04:35 +00:00
void rollbackRequest(int64_t rollback_zxid);
void finalize();
/// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
2022-05-09 08:32:25 +00:00
void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); }
2021-02-26 14:54:59 +00:00
/// Turn off snapshot mode.
2022-05-09 08:32:25 +00:00
void disableSnapshotMode() { container.disableSnapshotMode(); }
2021-02-26 14:54:59 +00:00
2022-05-09 08:32:25 +00:00
Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
2021-02-26 14:54:59 +00:00
/// Clear outdated data from internal container.
2022-05-09 08:32:25 +00:00
void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
2021-02-26 14:54:59 +00:00
/// Get all active sessions
2022-05-09 08:32:25 +00:00
const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
2021-02-26 14:54:59 +00:00
/// Get all dead sessions
2022-05-09 08:32:25 +00:00
std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
/// Introspection functions mostly used in 4-letter commands
2022-05-09 08:32:25 +00:00
uint64_t getNodesCount() const { return container.size(); }
2021-10-27 12:26:42 +00:00
2022-05-09 08:32:25 +00:00
uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
2021-10-27 12:26:42 +00:00
2022-05-09 08:32:25 +00:00
uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
2022-01-19 11:46:29 +00:00
2021-11-18 20:17:22 +00:00
uint64_t getTotalWatchesCount() const;
2021-11-05 10:21:34 +00:00
2022-05-09 08:32:25 +00:00
uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
2021-11-05 10:21:34 +00:00
2021-11-18 20:17:22 +00:00
uint64_t getSessionsWithWatchesCount() const;
2022-05-09 08:32:25 +00:00
uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
2021-11-18 20:17:22 +00:00
uint64_t getTotalEphemeralNodesCount() const;
2021-10-27 12:26:42 +00:00
2021-11-05 10:21:34 +00:00
void dumpWatches(WriteBufferFromOwnString & buf) const;
void dumpWatchesByPath(WriteBufferFromOwnString & buf) const;
2021-11-18 20:17:22 +00:00
void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const;
2022-05-16 13:08:10 +00:00
private:
void removeDigest(const Node & node, std::string_view path);
void addDigest(const Node & node, std::string_view path);
2020-10-30 14:16:47 +00:00
};
2020-11-11 13:55:28 +00:00
2021-03-29 08:24:56 +00:00
using KeeperStoragePtr = std::unique_ptr<KeeperStorage>;
2021-03-18 20:55:11 +00:00
2020-10-30 14:16:47 +00:00
}