Initial implementation

This commit is contained in:
Antonio Andelic 2023-09-04 14:49:49 +00:00
parent 365bc584ec
commit b43c3d75a2
6 changed files with 638 additions and 459 deletions

View File

@ -277,7 +277,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
}
}
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context)
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context) TSA_NO_THREAD_SAFETY_ANALYSIS
{
uint8_t version;
readBinary(version, in);

View File

@ -271,8 +271,6 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
std::lock_guard lock(storage_and_responses_lock);
if (storage->isFinalized())
return false;
@ -422,7 +420,6 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
parsed_request_cache.erase(request_for_session->session_id);
}
std::lock_guard lock(storage_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)

View File

@ -142,7 +142,7 @@ private:
CoordinationSettingsPtr coordination_settings;
/// Main state machine logic
KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock);
KeeperStoragePtr storage;
/// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager snapshot_manager;

File diff suppressed because it is too large Load Diff

View File

@ -11,6 +11,8 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Coordination/KeeperContext.h>
#include <base/defines.h>
#include <absl/container/flat_hash_set.h>
namespace DB
@ -138,11 +140,13 @@ public:
int64_t session_id_counter{1};
SessionAndAuth session_and_auth;
mutable std::mutex auth_mutex;
SessionAndAuth session_and_auth TSA_GUARDED_BY(auth_mutex);
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
mutable std::mutex container_mutex;
Container container;
// Applying ZooKeeper request to storage consists of two steps:
@ -166,18 +170,33 @@ public:
struct RemoveNodeDelta
{
int32_t version{-1};
int64_t ephemeral_owner{0};
Coordination::Stat stat;
Coordination::ACLs acls;
String data;
};
struct UpdateNodeDelta
struct UpdateNodeStatDelta
{
std::function<void(Node &)> update_fn;
explicit UpdateNodeStatDelta(const KeeperStorage::Node & node);
Coordination::Stat old_stats;
Coordination::Stat new_stats;
int32_t old_seq_num;
int32_t new_seq_num;
int32_t version{-1};
};
struct UpdateNodeDataDelta
{
std::string old_data;
std::string new_data;
int32_t version{-1};
};
struct SetACLDelta
{
Coordination::ACLs acls;
Coordination::ACLs old_acls;
Coordination::ACLs new_acls;
int32_t version{-1};
};
@ -199,11 +218,19 @@ public:
struct AddAuthDelta
{
int64_t session_id;
AuthID auth_id;
std::shared_ptr<AuthID> auth_id;
};
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
using Operation = std::variant<
CreateNodeDelta,
RemoveNodeDelta,
UpdateNodeStatDelta,
UpdateNodeDataDelta,
SetACLDelta,
AddAuthDelta,
ErrorDelta,
SubDeltaEnd,
FailedMultiDelta>;
struct Delta
{
@ -222,15 +249,16 @@ public:
{
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
void addDelta(Delta new_delta);
void addDeltas(std::vector<Delta> new_deltas);
void addDeltas(std::list<Delta> new_deltas);
void commit(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
std::shared_ptr<Node> getNode(StringRef path) const;
Coordination::ACLs getACLs(StringRef path) const;
void applyDeltas(const std::list<Delta> & new_deltas);
void applyDelta(const Delta & delta);
void rollbackDelta(const Delta & delta);
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
{
@ -238,13 +266,13 @@ public:
{
for (const auto & auth : auth_ids)
{
using TAuth = std::remove_reference_t<decltype(auth)>;
using TAuth = std::remove_cvref_t<decltype(auth)>;
const AuthID * auth_ptr = nullptr;
if constexpr (std::is_pointer_v<TAuth>)
auth_ptr = auth;
else
if constexpr (std::same_as<TAuth, AuthID>)
auth_ptr = &auth;
else
auth_ptr = auth.get();
if (predicate(*auth_ptr))
return true;
@ -253,30 +281,34 @@ public:
};
if (is_local)
{
std::lock_guard lock(storage.auth_mutex);
return check_auth(storage.session_and_auth[session_id]);
if (check_auth(storage.session_and_auth[session_id]))
return true;
}
// check if there are uncommitted
const auto auth_it = session_and_auth.find(session_id);
if (auth_it == session_and_auth.end())
return false;
return check_auth(auth_it->second);
if (check_auth(auth_it->second))
return true;
std::lock_guard lock(storage.auth_mutex);
return check_auth(storage.session_and_auth[session_id]);
}
void forEachAuthInSession(int64_t session_id, std::function<void(const AuthID &)> func) const;
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
std::unordered_map<int64_t, std::list<std::shared_ptr<AuthID>>> session_and_auth;
struct UncommittedNode
{
std::shared_ptr<Node> node{nullptr};
Coordination::ACLs acls{};
int64_t zxid{0};
std::optional<Coordination::ACLs> acls{};
std::vector<int64_t> applied_zxids{};
};
struct Hash
@ -303,9 +335,9 @@ public:
};
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
std::list<Delta> deltas;
mutable std::mutex deltas_mutex;
std::list<Delta> deltas TSA_GUARDED_BY(deltas_mutex);
KeeperStorage & storage;
};
@ -315,7 +347,7 @@ public:
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_zxid);
Coordination::Error commit(int64_t zxid);
Coordination::Error commit(std::list<Delta> deltas);
// Create node in the storage
// Returns false if it failed to create the node, true otherwise
@ -348,8 +380,10 @@ public:
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
mutable std::mutex transaction_mutex;
/// Global id of all requests applied to storage
int64_t zxid{0};
int64_t zxid = 0;
// older Keeper node (pre V5 snapshots) can create snapshots and receive logs from newer Keeper nodes
// this can lead to some inconsistencies, e.g. from snapshot it will use log_idx as zxid
@ -364,7 +398,7 @@ public:
Digest nodes_digest;
};
std::deque<TransactionInfo> uncommitted_transactions;
std::list<TransactionInfo> uncommitted_transactions;
uint64_t nodes_digest{0};
@ -377,15 +411,10 @@ public:
void clearDeadWatches(int64_t session_id);
/// Get current committed zxid
int64_t getZXID() const { return zxid; }
int64_t getZXID() const;
int64_t getNextZXID() const
{
if (uncommitted_transactions.empty())
return zxid + 1;
return uncommitted_transactions.back().zxid + 1;
}
int64_t getNextZXID() const;
int64_t getNextZXIDLocked(std::lock_guard<std::mutex> & lock) const;
Digest getNodesDigest(bool committed) const;
@ -393,7 +422,7 @@ public:
const String superdigest;
bool initialized{false};
std::atomic<bool> initialized{false};
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, bool initialize_system_nodes = true);
@ -415,7 +444,7 @@ public:
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
UInt64 calculateNodesDigest(UInt64 current_digest, const std::vector<Delta> & new_deltas) const;
UInt64 calculateNodesDigest(UInt64 current_digest, const std::list<Delta> & new_deltas) const;
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.

View File

@ -147,7 +147,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
return max_zxid;
}
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log)
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path);
int64_t zxid = getZxidFromName(snapshot_path);
@ -474,7 +474,7 @@ bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request)
}
bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*log*/)
bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*log*/) TSA_NO_THREAD_SAFETY_ANALYSIS
{
int64_t checksum;
Coordination::read(checksum, in);
@ -553,7 +553,7 @@ void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string
LOG_INFO(log, "Finished {} deserialization, totally read {} records", log_path, counter);
}
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log)
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
namespace fs = std::filesystem;
std::map<int64_t, std::string> existing_logs;