More parallel storage

This commit is contained in:
Antonio Andelic 2024-06-11 14:35:34 +02:00
parent c802d7d58a
commit 0fa45c3954
7 changed files with 87 additions and 166 deletions

View File

@ -449,18 +449,13 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid);
request_queue_it != xid_to_request_queue.end())
{
Stopwatch watch;
if (server->isLeaderAlive())
server->putLocalReadRequest(request_queue_it->second);
else
addErrorResponses(request_queue_it->second, Coordination::Error::ZCONNECTIONLOSS);
if (request_queue_it->second.size() > 500)
LOG_INFO(
getLogger("Speed"),
"It took {}ms for {} requests",
watch.elapsedMilliseconds(),
request_queue_it->second.size());
for (const auto & read_request : request_queue_it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
xid_to_request_queue.erase(request_queue_it);
}

View File

@ -533,10 +533,9 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestFor
}
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestsForSessions & request_for_session)
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
if (std::any_of(
request_for_session.begin(), request_for_session.end(), [](const auto & request) { return !request.request->isReadRequest(); }))
if (!request_for_session.request->isReadRequest())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
state_machine->processReadRequest(request_for_session);

View File

@ -84,7 +84,7 @@ public:
/// Put local read request and execute in state machine directly and response into
/// responses queue
void putLocalReadRequest(const KeeperStorage::RequestsForSessions & request);
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
bool isRecovering() const { return is_recovering; }
bool reconfigEnabled() const { return enable_reconfiguration; }

View File

@ -278,12 +278,12 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
std::shared_lock storage_lock(storage_mutex);
if (storage->isFinalized())
return false;
try
{
std::shared_lock storage_lock(storage_mutex);
storage->preprocessRequest(
request_for_session.request,
request_for_session.session_id,
@ -312,7 +312,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session)
{
std::lock_guard lock(process_and_responses_lock);
std::lock_guard lock(storage_mutex);
KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response))
{
@ -414,7 +414,6 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
try
{
std::shared_lock storage_lock(storage_mutex);
const auto op_num = request_for_session->request->getOpNum();
if (op_num == Coordination::OpNum::SessionID)
{
@ -428,7 +427,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
response_for_session.session_id = -1;
response_for_session.response = response;
std::lock_guard lock(process_and_responses_lock);
std::unique_lock lock(storage_mutex);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
@ -438,15 +437,18 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
{
if (op_num == Coordination::OpNum::Close)
{
std::lock_guard lock(request_cache_mutex);
std::lock_guard cache_lock(request_cache_mutex);
parsed_request_cache.erase(request_for_session->session_id);
}
std::lock_guard lock(process_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
try_push(response_for_session);
{
std::shared_lock lock(storage_mutex);
std::lock_guard response_lock(process_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
try_push(response_for_session);
}
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(
@ -549,7 +551,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::shared_lock lock(storage_mutex);
std::unique_lock lock(storage_mutex);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
@ -767,42 +769,16 @@ int KeeperStateMachine::read_logical_snp_obj(
return 1;
}
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestsForSessions & request_for_session)
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
std::shared_lock storage_lock(storage_mutex);
std::lock_guard response_lock(process_and_responses_lock);
auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses)
if (!responses_queue.push(response))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
/// Pure local request, just process it with storage
std::lock_guard lock(process_and_responses_lock);
std::vector<KeeperStorage::ResponsesForSessions> all_responses;
if (request_for_session.size() > 100)
{
all_responses.resize(request_for_session.size());
//LOG_INFO(getLogger("Keeper"), "Has read requests {}", request_queue_it->second.size());
for (size_t i = 0; i < request_for_session.size(); ++i)
{
read_pool.scheduleOrThrowOnError([&, i]
{
const auto & read_request = request_for_session[i];
all_responses[i] = storage->processRequest(
read_request.request, read_request.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
});
}
read_pool.wait();
}
else
{
all_responses.reserve(request_for_session.size());
for (const auto & read_request : request_for_session)
{
all_responses.push_back(storage->processRequest(
read_request.request, read_request.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/));
}
}
for (const auto & responses : all_responses)
for (const auto & response : responses)
if (!responses_queue.push(response))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
}
void KeeperStateMachine::shutdownStorage()
@ -813,25 +789,23 @@ void KeeperStateMachine::shutdownStorage()
std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getDeadSessions();
}
int64_t KeeperStateMachine::getNextZxid() const
{
std::shared_lock storage_lock(storage_mutex);
return storage->getNextZXID();
}
KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getNodesDigest(false, /*lock_transaction_mutex=*/true);
}
uint64_t KeeperStateMachine::getLastProcessedZxid() const
{
std::shared_lock storage_lock(storage_mutex);
return storage->getZXID();
}
@ -842,61 +816,61 @@ const KeeperStorage::Stats & KeeperStateMachine::getStorageStats() const TSA_NO_
uint64_t KeeperStateMachine::getTotalWatchesCount() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getTotalWatchesCount();
}
uint64_t KeeperStateMachine::getWatchedPathsCount() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getWatchedPathsCount();
}
uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getSessionsWithWatchesCount();
}
uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getTotalEphemeralNodesCount();
}
uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getSessionWithEphemeralNodesCount();
}
void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
storage->dumpWatches(buf);
}
void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
storage->dumpWatchesByPath(buf);
}
void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
storage->dumpSessionsAndEphemerals(buf);
}
uint64_t KeeperStateMachine::getApproximateDataSize() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getApproximateDataSize();
}
uint64_t KeeperStateMachine::getKeyArenaSize() const
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
return storage->getArenaDataSize();
}
@ -937,7 +911,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
void KeeperStateMachine::recalculateStorageStats()
{
std::shared_lock storage_lock(storage_mutex);
std::unique_lock storage_lock(storage_mutex);
LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");

View File

@ -100,7 +100,7 @@ public:
ClusterConfigPtr getClusterConfig() const;
/// Process local read request
void processReadRequest(const KeeperStorage::RequestsForSessions & request_for_session);
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
std::vector<int64_t> getDeadSessions();
@ -132,8 +132,6 @@ public:
void reconfigure(const KeeperStorage::RequestForSession& request_for_session);
private:
mutable SharedMutex storage_mutex;
CommitCallback commit_callback;
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
@ -156,6 +154,7 @@ private:
/// Mutex for snapshots
mutable std::mutex snapshots_lock;
mutable SharedMutex storage_mutex;
/// Lock for storage and responses_queue. It's important to process requests
/// and push them to the responses queue while holding this lock. Otherwise
/// we can get strange cases when, for example client send read request with

View File

@ -412,7 +412,7 @@ Overloaded(Ts...) -> Overloaded<Ts...>;
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::tryGetNodeFromStorage(StringRef path) const
{
std::lock_guard lock(storage.storage_mutex);
std::shared_lock lock(storage.storage_mutex);
if (auto node_it = storage.container.find(path); node_it != storage.container.end())
{
const auto & committed_node = node_it->value;
@ -532,7 +532,7 @@ bool KeeperStorage::UncommittedState::hasACL(int64_t session_id, bool is_local,
void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta)
{
assert(!delta.path.empty());
chassert(!delta.path.empty());
std::visit(
[&]<typename DeltaType>(const DeltaType & operation)
@ -541,12 +541,12 @@ void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta)
if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
{
assert(node);
chassert(node);
node = nullptr;
}
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
{
assert(!node);
chassert(!node);
node = std::make_shared<Node>();
node->stats = operation.stat;
node->setData(operation.data);
@ -554,13 +554,13 @@ void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta)
}
else if constexpr (std::same_as<DeltaType, UpdateNodeStatDelta>)
{
assert(node);
chassert(node);
node->invalidateDigestCache();
node->stats = operation.old_stats;
}
else if constexpr (std::same_as<DeltaType, UpdateNodeDataDelta>)
{
assert(node);
chassert(node);
node->invalidateDigestCache();
node->setData(operation.old_data);
}
@ -569,7 +569,7 @@ void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta)
acls = operation.old_acls;
}
if (applied_zxids.back() != delta.zxid)
if (applied_zxids.back() == delta.zxid)
applied_zxids.pop_back();
},
delta.operation);
@ -645,15 +645,16 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
rollback_deltas.splice(rollback_deltas.end(), deltas, delta_it.base(), deltas.end());
}
rollback(std::move(rollback_deltas));
}
void KeeperStorage::UncommittedState::rollback(std::list<Delta> rollback_deltas)
{
// we need to undo ephemeral mapping modifications
// CreateNodeDelta added ephemeral for session id -> we need to remove it
// RemoveNodeDelta removed ephemeral for session id -> we need to add it back
for (auto & delta : rollback_deltas)
{
if (delta.zxid < rollback_zxid)
break;
chassert(delta.zxid == rollback_zxid);
if (!delta.path.empty())
{
std::visit(
@ -674,6 +675,8 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
}
},
delta.operation);
rollbackDelta(delta);
}
else if (auto * add_auth = std::get_if<AddAuthDelta>(&delta.operation))
{
@ -685,8 +688,6 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
session_and_auth.erase(add_auth->session_id);
}
}
rollbackDelta(delta);
}
}
@ -849,13 +850,9 @@ Coordination::Error KeeperStorage::commit(std::list<Delta> deltas)
removeDigest(node_it->value, path);
auto updated_node = container.updateValue(path, [&](auto & node) {
if constexpr (std::same_as<DeltaType, KeeperStorage::UpdateNodeStatDelta>)
{
node.stats = operation.new_stats;
}
else
{
node.setData(std::move(operation.new_data));
}
});
addDigest(updated_node->value, path);
@ -1044,7 +1041,7 @@ Coordination::ACLs getNodeACLs(KeeperStorage & storage, StringRef path, bool is_
{
if (is_local)
{
std::lock_guard lock(storage.storage_mutex);
std::shared_lock lock(storage.storage_mutex);
auto node_it = storage.container.find(path);
if (node_it == storage.container.end())
return {};
@ -1223,6 +1220,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
zxid,
KeeperStorage::CreateNodeDelta{stat, std::move(node_acls), request.data});
digest = storage.calculateNodesDigest(digest, new_deltas);
return new_deltas;
}
@ -2019,16 +2017,14 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
if (!new_subdeltas.empty())
{
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation);
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_subdeltas.back().operation);
error && *operation_type == OperationType::Write)
{
storage.uncommitted_state.rollback(zxid);
storage.uncommitted_state.rollback(std::move(new_deltas));
response_errors.push_back(error->error);
for (size_t j = i + 1; j < concrete_requests.size(); ++j)
{
response_errors.push_back(Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
return {KeeperStorage::Delta{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}};
}
@ -2043,9 +2039,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
}
digest = current_digest;
storage.uncommitted_state.addDeltas(std::move(new_deltas));
return {};
}
@ -2188,19 +2182,12 @@ void KeeperStorage::finalize()
finalized = true;
{
std::lock_guard lock(ephemerals_mutex);
ephemerals.clear();
}
ephemerals.clear();
{
std::lock_guard lock(watches_mutex);
watches.clear();
list_watches.clear();
sessions_and_watchers.clear();
}
watches.clear();
list_watches.clear();
sessions_and_watchers.clear();
std::lock_guard lock(session_mutex);
session_expiry_queue.clear();
}
@ -2509,10 +2496,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
KeeperStorage::ResponsesForSessions results;
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
{
std::lock_guard lock(session_mutex);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
}
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
@ -2520,7 +2504,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
{
if (std::holds_alternative<RemoveNodeDelta>(delta.operation))
{
std::lock_guard lock(watches_mutex);
auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end());
}
@ -2540,11 +2523,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = commit_zxid;
{
std::lock_guard lock(session_mutex);
session_expiry_queue.remove(session_id);
session_and_timeout.erase(session_id);
}
session_expiry_queue.remove(session_id);
session_and_timeout.erase(session_id);
results.push_back(ResponseForSession{session_id, response});
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
@ -2589,7 +2569,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
/// Watches for this requests are added to the watches lists
if (zk_request->has_watch)
{
std::lock_guard lock(watches_mutex);
if (response->error == Coordination::Error::ZOK)
{
static constexpr std::array list_requests{
@ -2614,7 +2593,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
/// If this requests processed successfully we need to check watches
if (response->error == Coordination::Error::ZOK)
{
std::lock_guard lock(watches_mutex);
auto watch_responses = request_processor->processWatches(watches, list_watches);
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}
@ -2631,6 +2609,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
std::lock_guard lock(transaction_mutex);
if (new_last_zxid)
uncommitted_transactions.pop_front();
if (commit_zxid < zxid)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to commit smaller ZXID, commit ZXID: {}, current ZXID {}", commit_zxid, zxid);
zxid = commit_zxid;
}
@ -2639,7 +2619,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS
{
std::unique_lock transaction_lock(transaction_mutex);
if (allow_missing && (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid < rollback_zxid))
return;
@ -2655,7 +2634,6 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing) T
try
{
uncommitted_transactions.pop_back();
transaction_lock.unlock();
uncommitted_state.rollback(rollback_zxid);
}
catch (...)
@ -2672,7 +2650,7 @@ KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed, bool lock_tr
if (committed)
{
std::lock_guard storage_lock(storage_mutex);
std::shared_lock storage_lock(storage_mutex);
return {CURRENT_DIGEST_VERSION, nodes_digest};
}
@ -2684,7 +2662,7 @@ KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed, bool lock_tr
{
if (lock_transaction_mutex)
transaction_lock.unlock();
std::lock_guard storage_lock(storage_mutex);
std::shared_lock storage_lock(storage_mutex);
return {CURRENT_DIGEST_VERSION, nodes_digest};
}
@ -2709,7 +2687,6 @@ void KeeperStorage::addDigest(const Node & node, const std::string_view path)
/// Allocate new session id with the specified timeouts
int64_t KeeperStorage::getSessionID(int64_t session_timeout_ms)
{
std::lock_guard lock(session_mutex);
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
@ -2725,71 +2702,60 @@ void KeeperStorage::addSessionID(int64_t session_id, int64_t session_timeout_ms)
std::vector<int64_t> KeeperStorage::getDeadSessions() const
{
std::lock_guard lock(session_mutex);
return session_expiry_queue.getExpiredSessions();
}
SessionAndTimeout KeeperStorage::getActiveSessions() const
{
std::lock_guard lock(session_mutex);
return session_and_timeout;
}
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void KeeperStorage::enableSnapshotMode(size_t up_to_version)
{
std::lock_guard lock(storage_mutex);
container.enableSnapshotMode(up_to_version);
}
/// Turn off snapshot mode.
void KeeperStorage::disableSnapshotMode()
{
std::lock_guard lock(storage_mutex);
container.disableSnapshotMode();
}
KeeperStorage::Container::const_iterator KeeperStorage::getSnapshotIteratorBegin() const
{
std::lock_guard lock(storage_mutex);
return container.begin();
}
/// Clear outdated data from internal container.
void KeeperStorage::clearGarbageAfterSnapshot()
{
std::lock_guard lock(storage_mutex);
container.clearOutdatedNodes();
}
/// Introspection functions mostly used in 4-letter commands
uint64_t KeeperStorage::getNodesCount() const
{
std::lock_guard lock(storage_mutex);
return container.size();
}
uint64_t KeeperStorage::getApproximateDataSize() const
{
std::lock_guard lock(storage_mutex);
return container.getApproximateDataSize();
}
uint64_t KeeperStorage::getArenaDataSize() const
{
std::lock_guard lock(storage_mutex);
return container.keyArenaSize();
}
uint64_t KeeperStorage::getWatchedPathsCount() const
{
std::lock_guard lock(watches_mutex);
return watches.size() + list_watches.size();
}
void KeeperStorage::clearDeadWatches(int64_t session_id)
{
std::lock_guard lock(watches_mutex);
/// Clear all watches for this session
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())
@ -2823,7 +2789,6 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(watches_mutex);
for (const auto & [session_id, watches_paths] : sessions_and_watchers)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
@ -2842,7 +2807,6 @@ void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
}
};
std::lock_guard lock(watches_mutex);
for (const auto & [watch_path, sessions] : watches)
{
buf << watch_path << "\n";
@ -2866,17 +2830,13 @@ void KeeperStorage::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) co
}
};
{
std::lock_guard lock(session_mutex);
buf << "Sessions dump (" << session_and_timeout.size() << "):\n";
buf << "Sessions dump (" << session_and_timeout.size() << "):\n";
for (const auto & [session_id, _] : session_and_timeout)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
}
for (const auto & [session_id, _] : session_and_timeout)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
}
std::lock_guard ephemerals_lock(ephemerals_mutex);
buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCountLocked() << "):\n";
for (const auto & [session_id, ephemeral_paths] : ephemerals)
{
@ -2904,7 +2864,6 @@ const KeeperStorage::Stats & KeeperStorage::getStorageStats() const
uint64_t KeeperStorage::getTotalWatchesCount() const
{
std::lock_guard lock(watches_mutex);
uint64_t ret = 0;
for (const auto & [session, paths] : sessions_and_watchers)
ret += paths.size();
@ -2914,7 +2873,6 @@ uint64_t KeeperStorage::getTotalWatchesCount() const
uint64_t KeeperStorage::getSessionWithEphemeralNodesCount() const
{
std::lock_guard ephemerals_lock(ephemerals_mutex);
return getSessionWithEphemeralNodesCountLocked();
}
@ -2925,13 +2883,11 @@ uint64_t KeeperStorage::getSessionWithEphemeralNodesCountLocked() const
uint64_t KeeperStorage::getSessionsWithWatchesCount() const
{
std::lock_guard lock(watches_mutex);
return sessions_and_watchers.size();
}
uint64_t KeeperStorage::getTotalEphemeralNodesCount() const
{
std::lock_guard ephemerals_lock(ephemerals_mutex);
uint64_t ret = 0;
for (const auto & [session_id, nodes] : ephemerals)
ret += nodes.size();
@ -2941,7 +2897,6 @@ uint64_t KeeperStorage::getTotalEphemeralNodesCount() const
void KeeperStorage::recalculateStats()
{
std::lock_guard lock(storage_mutex);
container.recalculateDataSize();
}

View File

@ -370,6 +370,7 @@ public:
void addDeltas(std::list<Delta> new_deltas);
void cleanup(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
void rollback(std::list<Delta> rollback_deltas);
std::shared_ptr<Node> getNode(StringRef path) const;
Coordination::ACLs getACLs(StringRef path) const;
@ -451,14 +452,13 @@ public:
mutable std::mutex ephemerals_mutex;
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals TSA_GUARDED_BY(ephemerals_mutex);
Ephemerals ephemerals;
mutable std::mutex session_mutex;
int64_t session_id_counter TSA_GUARDED_BY(session_mutex) = 1;
int64_t session_id_counter = 1;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue TSA_GUARDED_BY(session_mutex);
SessionExpiryQueue session_expiry_queue;
/// All active sessions with timeout
SessionAndTimeout session_and_timeout TSA_GUARDED_BY(session_mutex);
SessionAndTimeout session_and_timeout;
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
@ -490,13 +490,12 @@ public:
std::atomic<bool> finalized{false};
mutable std::mutex watches_mutex;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers TSA_GUARDED_BY(watches_mutex);
SessionAndWatcher sessions_and_watchers;
/// Currently active watches (node_path -> subscribed sessions)
Watches watches TSA_GUARDED_BY(watches_mutex);
Watches list_watches TSA_GUARDED_BY(watches_mutex); /// Watches for 'list' request (watches on children).
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
void clearDeadWatches(int64_t session_id);
@ -606,7 +605,7 @@ public:
void recalculateStats();
private:
uint64_t getSessionWithEphemeralNodesCountLocked() const TSA_REQUIRES(ephemerals_mutex);
uint64_t getSessionWithEphemeralNodesCountLocked() const;
void removeDigest(const Node & node, std::string_view path);
void addDigest(const Node & node, std::string_view path);