More mutex

This commit is contained in:
Antonio Andelic 2023-09-06 13:04:08 +00:00
parent 9238520490
commit 6be1d0724a
7 changed files with 299 additions and 152 deletions

View File

@ -45,6 +45,7 @@ uint64_t ACLMap::convertACLs(const Coordination::ACLs & acls)
if (acls.empty())
return 0;
std::lock_guard lock(map_mutex);
if (acl_to_num.contains(acls))
return acl_to_num[acls];
@ -62,6 +63,7 @@ Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
if (acls_id == 0)
return Coordination::ACLs{};
std::lock_guard lock(map_mutex);
if (!num_to_acl.contains(acls_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown ACL id {}. It's a bug", acls_id);
@ -70,6 +72,7 @@ Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
{
std::lock_guard lock(map_mutex);
num_to_acl[acls_id] = acls;
acl_to_num[acls] = acls_id;
max_acl_id = std::max(acls_id + 1, max_acl_id); /// max_acl_id pointer next slot
@ -77,11 +80,13 @@ void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
void ACLMap::addUsage(uint64_t acl_id)
{
std::lock_guard lock(map_mutex);
usage_counter[acl_id]++;
}
void ACLMap::removeUsage(uint64_t acl_id)
{
std::lock_guard lock(map_mutex);
if (!usage_counter.contains(acl_id))
return;

View File

@ -32,6 +32,8 @@ private:
NumToACLMap num_to_acl;
UsageCounter usage_counter;
uint64_t max_acl_id{1};
mutable std::mutex map_mutex;
public:
/// Convert ACL to number. If it's new ACL than adds it to map

View File

@ -271,6 +271,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
std::shared_lock storage_lock(storage_mutex);
if (storage->isFinalized())
return false;
@ -291,14 +292,14 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
}
if (keeper_context->digestEnabled() && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
assertDigest(*request_for_session.digest, storage->getNodesDigest(false, /*lock_transaction_mutex=*/true), *request_for_session.request, false);
return true;
}
void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session)
{
std::lock_guard _(storage_and_responses_lock);
std::lock_guard lock(process_and_responses_lock);
KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response))
{
@ -393,6 +394,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
}
};
std::shared_lock storage_lock(storage_mutex);
const auto op_num = request_for_session->request->getOpNum();
if (op_num == Coordination::OpNum::SessionID)
{
@ -406,7 +408,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
response_for_session.session_id = -1;
response_for_session.response = response;
std::lock_guard lock(storage_and_responses_lock);
std::lock_guard lock(process_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
@ -420,13 +422,14 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
parsed_request_cache.erase(request_for_session->session_id);
}
std::lock_guard lock(process_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
try_push(response_for_session);
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, true);
assertDigest(*request_for_session->digest, storage->getNodesDigest(true, /*lock_transaction_mutex=*/true), *request_for_session->request, true);
}
ProfileEvents::increment(ProfileEvents::KeeperCommits);
@ -462,8 +465,6 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
}
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock);
SnapshotDeserializationResult snapshot_deserialization_result;
if (latest_snapshot_ptr)
snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
@ -471,6 +472,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
snapshot_deserialization_result
= snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx()));
std::unique_lock storage_lock(storage_mutex);
/// maybe some logs were preprocessed with log idx larger than the snapshot idx
/// we have to apply them to the new storage
storage->applyUncommittedState(*snapshot_deserialization_result.storage, snapshot_deserialization_result.storage->getZXID());
@ -509,14 +511,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
void KeeperStateMachine::rollbackRequestNoLock(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing)
{
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::shared_lock lock(storage_mutex);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
@ -535,7 +530,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock);
std::unique_lock lock(storage_mutex);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig());
}
@ -582,8 +577,6 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
{
/// Destroy snapshot with lock
std::lock_guard lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
@ -732,8 +725,10 @@ int KeeperStateMachine::read_logical_snp_obj(
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
std::shared_lock storage_lock(storage_mutex);
/// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock);
std::lock_guard lock(process_and_responses_lock);
auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses)
@ -743,96 +738,97 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
void KeeperStateMachine::shutdownStorage()
{
std::lock_guard lock(storage_and_responses_lock);
std::unique_lock storage_lock(storage_mutex);
storage->finalize();
}
std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getDeadSessions();
}
int64_t KeeperStateMachine::getNextZxid() const
{
std::shared_lock storage_lock(storage_mutex);
return storage->getNextZXID();
}
KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const
{
std::lock_guard lock(storage_and_responses_lock);
return storage->getNodesDigest(false);
std::shared_lock storage_lock(storage_mutex);
return storage->getNodesDigest(false, /*lock_transaction_mutex=*/true);
}
uint64_t KeeperStateMachine::getLastProcessedZxid() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getZXID();
}
uint64_t KeeperStateMachine::getNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getNodesCount();
}
uint64_t KeeperStateMachine::getTotalWatchesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getTotalWatchesCount();
}
uint64_t KeeperStateMachine::getWatchedPathsCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getWatchedPathsCount();
}
uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getSessionsWithWatchesCount();
}
uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getTotalEphemeralNodesCount();
}
uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getSessionWithEphemeralNodesCount();
}
void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
storage->dumpWatches(buf);
}
void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
storage->dumpWatchesByPath(buf);
}
void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
storage->dumpSessionsAndEphemerals(buf);
}
uint64_t KeeperStateMachine::getApproximateDataSize() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getApproximateDataSize();
}
uint64_t KeeperStateMachine::getKeyArenaSize() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getArenaDataSize();
}
@ -858,7 +854,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
void KeeperStateMachine::recalculateStorageStats()
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");

View File

@ -132,6 +132,8 @@ public:
void reconfigure(const KeeperStorage::RequestForSession& request_for_session);
private:
mutable SharedMutex storage_mutex;
CommitCallback commit_callback;
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
@ -161,7 +163,7 @@ private:
/// we can get strange cases when, for example client send read request with
/// watch and after that receive watch response and only receive response
/// for request.
mutable std::mutex storage_and_responses_lock;
mutable std::mutex process_and_responses_lock;
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, std::shared_ptr<KeeperStorage::RequestForSession>>> parsed_request_cache;
uint64_t min_request_size_to_cache{0};
@ -189,7 +191,6 @@ private:
KeeperSnapshotManagerS3 * snapshot_manager_s3;
KeeperStorage::ResponseForSession processReconfiguration(
const KeeperStorage::RequestForSession& request_for_session)
TSA_REQUIRES(storage_and_responses_lock);
const KeeperStorage::RequestForSession& request_for_session);
};
}

View File

@ -535,7 +535,10 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
{
if (operation.stat.ephemeralOwner != 0)
{
std::lock_guard lock(storage.ephemerals_mutex);
storage.ephemerals[operation.stat.ephemeralOwner].emplace(delta.path);
}
}
},
delta.operation);
@ -635,12 +638,12 @@ namespace
/// Get current committed zxid
int64_t KeeperStorage::getZXID() const
{
{
std::lock_guard lock(transaction_mutex);
return zxid;
}
int64_t KeeperStorage::getNextZXIDLocked(std::lock_guard<std::mutex> &) const
int64_t KeeperStorage::getNextZXIDLocked() const
{
if (uncommitted_transactions.empty())
return zxid + 1;
@ -651,7 +654,7 @@ int64_t KeeperStorage::getNextZXIDLocked(std::lock_guard<std::mutex> &) const
int64_t KeeperStorage::getNextZXID() const
{
std::lock_guard lock(transaction_mutex);
return getNextZXIDLocked(lock);
return getNextZXIDLocked();
}
void KeeperStorage::applyUncommittedState(KeeperStorage & other, int64_t last_zxid) TSA_NO_THREAD_SAFETY_ANALYSIS
@ -683,7 +686,7 @@ Coordination::Error KeeperStorage::commit(std::list<Delta> deltas)
for (auto & delta : deltas)
{
auto result = std::visit(
[&, &path = delta.path]<typename DeltaType>(DeltaType & operation) -> Coordination::Error
[&, &path = delta.path]<typename DeltaType>(DeltaType & operation) TSA_REQUIRES(storage_mutex) -> Coordination::Error
{
if constexpr (std::same_as<DeltaType, KeeperStorage::CreateNodeDelta>)
{
@ -906,6 +909,7 @@ Coordination::ACLs getNodeACLs(KeeperStorage & storage, StringRef path, bool is_
{
if (is_local)
{
std::lock_guard lock(storage.storage_mutex);
auto node_it = storage.container.find(path);
if (node_it == storage.container.end())
return {};
@ -966,6 +970,7 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session
void KeeperStorage::unregisterEphemeralPath(int64_t session_id, const std::string & path)
{
std::lock_guard ephemerals_lock(ephemerals_mutex);
auto ephemerals_it = ephemerals.find(session_id);
if (ephemerals_it == ephemerals.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session {} is missing ephemeral path");
@ -1038,7 +1043,10 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
return {KeeperStorage::Delta{zxid, Coordination::Error::ZINVALIDACL}};
if (request.is_ephemeral)
{
std::lock_guard lock(storage.ephemerals_mutex);
storage.ephemerals[session_id].emplace(path_created);
}
int32_t parent_cversion = request.parent_cversion;
@ -1080,7 +1088,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
@ -1133,7 +1141,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
}
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
@ -1176,12 +1184,12 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
return processImpl<false>(storage, std::move(deltas));
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
ProfileEvents::increment(ProfileEvents::KeeperGetRequest);
return processImpl<true>(storage, std::move(deltas));
@ -1271,7 +1279,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
@ -1304,7 +1312,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
}
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
@ -1337,12 +1345,12 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
return response_ptr;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
return processImpl<false>(storage, std::move(deltas));
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
ProfileEvents::increment(ProfileEvents::KeeperExistsRequest);
return processImpl<true>(storage, std::move(deltas));
@ -1405,7 +1413,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
auto & container = storage.container;
@ -1458,7 +1466,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
@ -1492,7 +1500,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
const auto & children = node_it->value.getChildren();
response.names.reserve(children.size());
const auto add_child = [&](const auto child)
const auto add_child = [&](const auto child) TSA_REQUIRES(storage.storage_mutex)
{
using enum Coordination::ListRequestType;
@ -1527,12 +1535,12 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
return response_ptr;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex) TSA_REQUIRES(storage.storage_mutex)
{
return processImpl<false>(storage, std::move(deltas));
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex) TSA_REQUIRES(storage.storage_mutex)
{
ProfileEvents::increment(ProfileEvents::KeeperListRequest);
return processImpl<true>(storage, std::move(deltas));
@ -1579,7 +1587,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
}
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
@ -1625,12 +1633,12 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
return response_ptr;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
return processImpl<false>(storage, std::move(deltas));
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
return processImpl<true>(storage, std::move(deltas));
@ -1690,7 +1698,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetACLResponse & response = dynamic_cast<Coordination::ZooKeeperSetACLResponse &>(*response_ptr);
@ -1733,7 +1741,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
}
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr);
@ -1766,12 +1774,12 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
return response_ptr;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
return processImpl<false>(storage, std::move(deltas));
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
return processImpl<true>(storage, std::move(deltas));
}
@ -2015,7 +2023,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, std::list<KeeperStorage::Delta> deltas) const override TSA_REQUIRES(storage.storage_mutex)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperAuthResponse & auth_response = dynamic_cast<Coordination::ZooKeeperAuthResponse &>(*response_ptr);
@ -2034,11 +2042,19 @@ void KeeperStorage::finalize()
finalized = true;
ephemerals.clear();
{
std::lock_guard lock(ephemerals_mutex);
ephemerals.clear();
}
watches.clear();
list_watches.clear();
sessions_and_watchers.clear();
{
std::lock_guard lock(watches_mutex);
watches.clear();
list_watches.clear();
sessions_and_watchers.clear();
}
std::lock_guard lock(session_mutex);
session_expiry_queue.clear();
}
@ -2206,7 +2222,8 @@ void KeeperStorage::preprocessRequest(
{
std::lock_guard lock(transaction_mutex);
int64_t last_zxid = getNextZXIDLocked(lock) - 1;
int64_t last_zxid = getNextZXIDLocked() - 1;
auto current_digest = getNodesDigest(false, /*lock_transaction_mutex=*/false);
if (uncommitted_transactions.empty())
{
@ -2219,7 +2236,7 @@ void KeeperStorage::preprocessRequest(
}
else
{
if (last_zxid == new_last_zxid && digest && checkDigest(*digest, getNodesDigest(false)))
if (last_zxid == new_last_zxid && digest && checkDigest(*digest, current_digest))
// we found the preprocessed request with the same ZXID, we can skip it
return;
@ -2230,8 +2247,8 @@ void KeeperStorage::preprocessRequest(
new_last_zxid, last_zxid);
}
new_digest = current_digest.value;
transaction = &uncommitted_transactions.emplace_back(TransactionInfo{.zxid = new_last_zxid, .nodes_digest = {}});
new_digest = getNodesDigest(false).value;
}
std::list<Delta> new_deltas;
@ -2252,32 +2269,35 @@ void KeeperStorage::preprocessRequest(
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
{
for (const auto & ephemeral_path : session_ephemerals->second)
std::lock_guard lock(ephemerals_mutex);
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
{
auto parent_node_path = parentNodePath(ephemeral_path);
auto parent_node = uncommitted_state.getNode(parent_node_path);
UpdateNodeStatDelta parent_update_delta(*parent_node);
++parent_update_delta.new_stats.cversion;
--parent_update_delta.new_stats.numChildren;
for (const auto & ephemeral_path : session_ephemerals->second)
{
auto parent_node_path = parentNodePath(ephemeral_path);
auto parent_node = uncommitted_state.getNode(parent_node_path);
UpdateNodeStatDelta parent_update_delta(*parent_node);
++parent_update_delta.new_stats.cversion;
--parent_update_delta.new_stats.numChildren;
new_deltas.emplace_back
(
parent_node_path.toString(),
new_last_zxid,
std::move(parent_update_delta)
);
new_deltas.emplace_back
(
parent_node_path.toString(),
new_last_zxid,
std::move(parent_update_delta)
);
auto node = uncommitted_state.getNode(ephemeral_path);
new_deltas.emplace_back(
ephemeral_path,
transaction->zxid,
RemoveNodeDelta{.stat = node->stat, .acls = uncommitted_state.getACLs(ephemeral_path), .data = node->getData()});
auto node = uncommitted_state.getNode(ephemeral_path);
new_deltas.emplace_back(
ephemeral_path,
transaction->zxid,
RemoveNodeDelta{.stat = node->stat, .acls = uncommitted_state.getACLs(ephemeral_path), .data = node->getData()});
}
ephemerals.erase(session_ephemerals);
}
ephemerals.erase(session_ephemerals);
}
new_digest = calculateNodesDigest(new_digest, new_deltas);
@ -2331,7 +2351,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
std::list<Delta> deltas;
{
std::lock_guard lock(uncommitted_state.deltas_mutex);
auto it = uncommitted_state.deltas.begin();
auto it = uncommitted_state.deltas.begin();
for (; it != uncommitted_state.deltas.end() && it->zxid == commit_zxid; ++it)
;
@ -2341,7 +2361,10 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
KeeperStorage::ResponsesForSessions results;
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
{
std::lock_guard lock(session_mutex);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
}
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
@ -2349,6 +2372,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
{
if (std::holds_alternative<RemoveNodeDelta>(delta.operation))
{
std::lock_guard lock(watches_mutex);
auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end());
}
@ -2368,8 +2392,11 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = commit_zxid;
session_expiry_queue.remove(session_id);
session_and_timeout.erase(session_id);
{
std::lock_guard lock(session_mutex);
session_expiry_queue.remove(session_id);
session_and_timeout.erase(session_id);
}
results.push_back(ResponseForSession{session_id, response});
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
@ -2392,7 +2419,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
if (is_local)
{
std::lock_guard lock(storage_mutex);
assert(zk_request->isReadRequest());
if (check_acl && !request_processor->checkAuth(*this, session_id, true))
{
@ -2414,6 +2440,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
/// Watches for this requests are added to the watches lists
if (zk_request->has_watch)
{
std::lock_guard lock(watches_mutex);
if (response->error == Coordination::Error::ZOK)
{
static constexpr std::array list_requests{
@ -2436,6 +2463,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
/// If this requests processed successfully we need to check watches
if (response->error == Coordination::Error::ZOK)
{
std::lock_guard lock(watches_mutex);
auto watch_responses = request_processor->processWatches(watches, list_watches);
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}
@ -2456,7 +2484,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
return results;
}
void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS
{
std::unique_lock transaction_lock(transaction_mutex);
if (allow_missing && (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid < rollback_zxid))
@ -2484,13 +2512,28 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
}
}
KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed) const
KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed, bool lock_transaction_mutex) const TSA_NO_THREAD_SAFETY_ANALYSIS
{
if (!keeper_context->digestEnabled())
return {.version = DigestVersion::NO_DIGEST};
if (committed || uncommitted_transactions.empty())
if (committed)
{
std::lock_guard storage_lock(storage_mutex);
return {CURRENT_DIGEST_VERSION, nodes_digest};
}
std::unique_lock transaction_lock(transaction_mutex, std::defer_lock);
if (lock_transaction_mutex)
transaction_lock.lock();
if (uncommitted_transactions.empty())
{
if (lock_transaction_mutex)
transaction_lock.unlock();
std::lock_guard storage_lock(storage_mutex);
return {CURRENT_DIGEST_VERSION, nodes_digest};
}
return uncommitted_transactions.back().nodes_digest;
}
@ -2510,8 +2553,89 @@ void KeeperStorage::addDigest(const Node & node, const std::string_view path)
}
}
/// Allocate new session id with the specified timeouts
int64_t KeeperStorage::getSessionID(int64_t session_timeout_ms)
{
std::lock_guard lock(session_mutex);
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
/// Add session id. Used when restoring KeeperStorage from snapshot.
void KeeperStorage::addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
std::vector<int64_t> KeeperStorage::getDeadSessions() const
{
std::lock_guard lock(session_mutex);
return session_expiry_queue.getExpiredSessions();
}
const SessionAndTimeout & KeeperStorage::getActiveSessions() const
{
return session_and_timeout;
}
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void KeeperStorage::enableSnapshotMode(size_t up_to_version)
{
std::lock_guard lock(storage_mutex);
container.enableSnapshotMode(up_to_version);
}
/// Turn off snapshot mode.
void KeeperStorage::disableSnapshotMode()
{
std::lock_guard lock(storage_mutex);
container.disableSnapshotMode();
}
KeeperStorage::Container::const_iterator KeeperStorage::getSnapshotIteratorBegin() const
{
std::lock_guard lock(storage_mutex);
return container.begin();
}
/// Clear outdated data from internal container.
void KeeperStorage::clearGarbageAfterSnapshot()
{
std::lock_guard lock(storage_mutex);
container.clearOutdatedNodes();
}
/// Introspection functions mostly used in 4-letter commands
uint64_t KeeperStorage::getNodesCount() const
{
std::lock_guard lock(storage_mutex);
return container.size();
}
uint64_t KeeperStorage::getApproximateDataSize() const
{
std::lock_guard lock(storage_mutex);
return container.getApproximateDataSize();
}
uint64_t KeeperStorage::getArenaDataSize() const
{
std::lock_guard lock(storage_mutex);
return container.keyArenaSize();
}
uint64_t KeeperStorage::getWatchedPathsCount() const
{
std::lock_guard lock(watches_mutex);
return watches.size() + list_watches.size();
}
void KeeperStorage::clearDeadWatches(int64_t session_id)
{
std::lock_guard lock(watches_mutex);
/// Clear all watches for this session
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())
@ -2545,6 +2669,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(watches_mutex);
for (const auto & [session_id, watches_paths] : sessions_and_watchers)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
@ -2563,6 +2688,7 @@ void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
}
};
std::lock_guard lock(watches_mutex);
for (const auto & [watch_path, sessions] : watches)
{
buf << watch_path << "\n";
@ -2586,14 +2712,18 @@ void KeeperStorage::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) co
}
};
buf << "Sessions dump (" << session_and_timeout.size() << "):\n";
for (const auto & [session_id, _] : session_and_timeout)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
std::lock_guard lock(session_mutex);
buf << "Sessions dump (" << session_and_timeout.size() << "):\n";
for (const auto & [session_id, _] : session_and_timeout)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
}
}
buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCount() << "):\n";
std::lock_guard ephemerals_lock(ephemerals_mutex);
buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCountLocked() << "):\n";
for (const auto & [session_id, ephemeral_paths] : ephemerals)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
@ -2603,6 +2733,7 @@ void KeeperStorage::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) co
uint64_t KeeperStorage::getTotalWatchesCount() const
{
std::lock_guard lock(watches_mutex);
uint64_t ret = 0;
for (const auto & [path, subscribed_sessions] : watches)
ret += subscribed_sessions.size();
@ -2613,9 +2744,21 @@ uint64_t KeeperStorage::getTotalWatchesCount() const
return ret;
}
uint64_t KeeperStorage::getSessionWithEphemeralNodesCount() const
{
std::lock_guard ephemerals_lock(ephemerals_mutex);
return getSessionWithEphemeralNodesCountLocked();
}
uint64_t KeeperStorage::getSessionWithEphemeralNodesCountLocked() const
{
return ephemerals.size();
}
uint64_t KeeperStorage::getSessionsWithWatchesCount() const
{
std::unordered_set<int64_t> counter;
std::lock_guard lock(watches_mutex);
for (const auto & [path, subscribed_sessions] : watches)
counter.insert(subscribed_sessions.begin(), subscribed_sessions.end());
@ -2627,6 +2770,7 @@ uint64_t KeeperStorage::getSessionsWithWatchesCount() const
uint64_t KeeperStorage::getTotalEphemeralNodesCount() const
{
std::lock_guard ephemerals_lock(ephemerals_mutex);
uint64_t ret = 0;
for (const auto & [session_id, nodes] : ephemerals)
ret += nodes.size();
@ -2636,6 +2780,7 @@ uint64_t KeeperStorage::getTotalEphemeralNodesCount() const
void KeeperStorage::recalculateStats()
{
std::lock_guard lock(storage_mutex);
container.recalculateDataSize();
}

View File

@ -139,18 +139,14 @@ public:
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
mutable SharedMutex main_mutex;
mutable std::mutex storage_mutex;
int64_t session_id_counter{1};
SessionAndAuth session_and_auth;
SessionAndAuth session_and_auth TSA_GUARDED_BY(storage_mutex);
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
Container container;
Container container TSA_GUARDED_BY(storage_mutex);
// Applying ZooKeeper request to storage consists of two steps:
// - preprocessing which, instead of applying the changes directly to storage,
@ -284,7 +280,10 @@ public:
};
if (is_local)
{
std::lock_guard lock(storage.storage_mutex);
return check_auth(storage.session_and_auth[session_id]);
}
// check if there are uncommitted
const auto auth_it = session_and_auth.find(session_id);
@ -347,7 +346,7 @@ public:
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_zxid);
Coordination::Error commit(std::list<Delta> deltas);
Coordination::Error commit(std::list<Delta> deltas) TSA_REQUIRES(storage_mutex);
// Create node in the storage
// Returns false if it failed to create the node, true otherwise
@ -357,25 +356,27 @@ public:
String data,
const Coordination::Stat & stat,
bool is_sequental,
Coordination::ACLs node_acls);
Coordination::ACLs node_acls) TSA_REQUIRES(storage_mutex);
// Remove node in the storage
// Returns false if it failed to remove the node, true otherwise
// We don't care about the exact failure because we should've caught it during preprocessing
bool removeNode(const std::string & path, int32_t version);
bool removeNode(const std::string & path, int32_t version) TSA_REQUIRES(storage_mutex);
bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local);
void unregisterEphemeralPath(int64_t session_id, const std::string & path);
mutable std::mutex ephemerals_mutex;
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
Ephemerals ephemerals TSA_GUARDED_BY(ephemerals_mutex);
mutable std::mutex session_mutex;
int64_t session_id_counter TSA_GUARDED_BY(session_mutex) = 1;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;
SessionExpiryQueue session_expiry_queue TSA_GUARDED_BY(session_mutex);
/// All active sessions with timeout
SessionAndTimeout session_and_timeout;
SessionAndTimeout session_and_timeout TSA_GUARDED_BY(session_mutex);
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
@ -383,7 +384,7 @@ public:
mutable std::mutex transaction_mutex;
/// Global id of all requests applied to storage
int64_t zxid = 0;
int64_t zxid TSA_GUARDED_BY(transaction_mutex) = 0;
// older Keeper node (pre V5 snapshots) can create snapshots and receive logs from newer Keeper nodes
// this can lead to some inconsistencies, e.g. from snapshot it will use log_idx as zxid
@ -398,15 +399,20 @@ public:
Digest nodes_digest;
};
std::list<TransactionInfo> uncommitted_transactions;
std::list<TransactionInfo> uncommitted_transactions TSA_GUARDED_BY(transaction_mutex);
uint64_t nodes_digest{0};
uint64_t nodes_digest TSA_GUARDED_BY(storage_mutex) = 0;
std::atomic<bool> finalized{false};
mutable std::mutex watches_mutex;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers TSA_GUARDED_BY(watches_mutex);
/// Currently active watches (node_path -> subscribed sessions)
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
Watches watches TSA_GUARDED_BY(watches_mutex);
Watches list_watches TSA_GUARDED_BY(watches_mutex); /// Watches for 'list' request (watches on children).
void clearDeadWatches(int64_t session_id);
@ -414,9 +420,9 @@ public:
int64_t getZXID() const;
int64_t getNextZXID() const;
int64_t getNextZXIDLocked(std::lock_guard<std::mutex> & lock) const;
int64_t getNextZXIDLocked() const TSA_REQUIRES(transaction_mutex);
Digest getNodesDigest(bool committed) const;
Digest getNodesDigest(bool committed, bool lock_transaction_mutex) const;
KeeperContextPtr keeper_context;
@ -426,23 +432,13 @@ public:
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, bool initialize_system_nodes = true);
void initializeSystemNodes();
void initializeSystemNodes() TSA_NO_THREAD_SAFETY_ANALYSIS;
/// Allocate new session id with the specified timeouts
int64_t getSessionID(int64_t session_timeout_ms)
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
int64_t getSessionID(int64_t session_timeout_ms);
/// Add session id. Used when restoring KeeperStorage from snapshot.
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
void addSessionID(int64_t session_id, int64_t session_timeout_ms) TSA_NO_THREAD_SAFETY_ANALYSIS;
UInt64 calculateNodesDigest(UInt64 current_digest, const std::list<Delta> & new_deltas) const;
@ -470,36 +466,36 @@ public:
/// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); }
void enableSnapshotMode(size_t up_to_version);
/// Turn off snapshot mode.
void disableSnapshotMode() { container.disableSnapshotMode(); }
void disableSnapshotMode();
Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
Container::const_iterator getSnapshotIteratorBegin() const;
/// Clear outdated data from internal container.
void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
void clearGarbageAfterSnapshot();
/// Get all active sessions
const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
const SessionAndTimeout & getActiveSessions() const;
/// Get all dead sessions
std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
std::vector<int64_t> getDeadSessions() const;
/// Introspection functions mostly used in 4-letter commands
uint64_t getNodesCount() const { return container.size(); }
uint64_t getNodesCount() const;
uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
uint64_t getApproximateDataSize() const;
uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
uint64_t getArenaDataSize() const;
uint64_t getTotalWatchesCount() const;
uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
uint64_t getWatchedPathsCount() const;
uint64_t getSessionsWithWatchesCount() const;
uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
uint64_t getSessionWithEphemeralNodesCount() const;
uint64_t getTotalEphemeralNodesCount() const;
void dumpWatches(WriteBufferFromOwnString & buf) const;
@ -508,8 +504,10 @@ public:
void recalculateStats();
private:
void removeDigest(const Node & node, std::string_view path);
void addDigest(const Node & node, std::string_view path);
uint64_t getSessionWithEphemeralNodesCountLocked() const TSA_REQUIRES(ephemerals_mutex);
void removeDigest(const Node & node, std::string_view path) TSA_REQUIRES(storage_mutex);
void addDigest(const Node & node, std::string_view path) TSA_REQUIRES(storage_mutex);
};
using KeeperStoragePtr = std::unique_ptr<KeeperStorage>;

View File

@ -90,7 +90,7 @@ void deserializeACLMap(KeeperStorage & storage, ReadBuffer & in)
}
}
int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * log)
int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
int64_t max_zxid = 0;
std::string path;