From 0fa45c39540129bd68b3e50c2c5084f6959d48b6 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 11 Jun 2024 14:35:34 +0200 Subject: [PATCH] More parallel storage --- src/Coordination/KeeperDispatcher.cpp | 19 ++-- src/Coordination/KeeperServer.cpp | 5 +- src/Coordination/KeeperServer.h | 2 +- src/Coordination/KeeperStateMachine.cpp | 92 +++++++------------- src/Coordination/KeeperStateMachine.h | 5 +- src/Coordination/KeeperStorage.cpp | 111 +++++++----------------- src/Coordination/KeeperStorage.h | 19 ++-- 7 files changed, 87 insertions(+), 166 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 0fe0b8cce1f..b9518c51893 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -449,18 +449,13 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid); request_queue_it != xid_to_request_queue.end()) { - Stopwatch watch; - if (server->isLeaderAlive()) - server->putLocalReadRequest(request_queue_it->second); - else - addErrorResponses(request_queue_it->second, Coordination::Error::ZCONNECTIONLOSS); - - if (request_queue_it->second.size() > 500) - LOG_INFO( - getLogger("Speed"), - "It took {}ms for {} requests", - watch.elapsedMilliseconds(), - request_queue_it->second.size()); + for (const auto & read_request : request_queue_it->second) + { + if (server->isLeaderAlive()) + server->putLocalReadRequest(read_request); + else + addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS); + } xid_to_request_queue.erase(request_queue_it); } diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index c14658ddccf..02b05c6346c 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -533,10 +533,9 @@ nuraft::ptr getZooKeeperLogEntry(const KeeperStorage::RequestFor } -void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestsForSessions & request_for_session) +void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session) { - if (std::any_of( - request_for_session.begin(), request_for_session.end(), [](const auto & request) { return !request.request->isReadRequest(); })) + if (!request_for_session.request->isReadRequest()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally"); state_machine->processReadRequest(request_for_session); diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index c43fb35efe1..5e45a552cba 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -84,7 +84,7 @@ public: /// Put local read request and execute in state machine directly and response into /// responses queue - void putLocalReadRequest(const KeeperStorage::RequestsForSessions & request); + void putLocalReadRequest(const KeeperStorage::RequestForSession & request); bool isRecovering() const { return is_recovering; } bool reconfigEnabled() const { return enable_reconfiguration; } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 3b5b7e07f06..a5402944701 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -278,12 +278,12 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig) return true; - std::shared_lock storage_lock(storage_mutex); if (storage->isFinalized()) return false; try { + std::shared_lock storage_lock(storage_mutex); storage->preprocessRequest( request_for_session.request, request_for_session.session_id, @@ -312,7 +312,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session) { - std::lock_guard lock(process_and_responses_lock); + std::lock_guard lock(storage_mutex); KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session); if (!responses_queue.push(response)) { @@ -414,7 +414,6 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n try { - std::shared_lock storage_lock(storage_mutex); const auto op_num = request_for_session->request->getOpNum(); if (op_num == Coordination::OpNum::SessionID) { @@ -428,7 +427,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n response_for_session.session_id = -1; response_for_session.response = response; - std::lock_guard lock(process_and_responses_lock); + std::unique_lock lock(storage_mutex); session_id = storage->getSessionID(session_id_request.session_timeout_ms); LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); response->session_id = session_id; @@ -438,15 +437,18 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n { if (op_num == Coordination::OpNum::Close) { - std::lock_guard lock(request_cache_mutex); + std::lock_guard cache_lock(request_cache_mutex); parsed_request_cache.erase(request_for_session->session_id); } - std::lock_guard lock(process_and_responses_lock); - KeeperStorage::ResponsesForSessions responses_for_sessions - = storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid); - for (auto & response_for_session : responses_for_sessions) - try_push(response_for_session); + { + std::shared_lock lock(storage_mutex); + std::lock_guard response_lock(process_and_responses_lock); + KeeperStorage::ResponsesForSessions responses_for_sessions + = storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid); + for (auto & response_for_session : responses_for_sessions) + try_push(response_for_session); + } if (keeper_context->digestEnabled() && request_for_session->digest) assertDigest( @@ -549,7 +551,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) return; - std::shared_lock lock(storage_mutex); + std::unique_lock lock(storage_mutex); storage->rollbackRequest(request_for_session.zxid, allow_missing); } @@ -767,42 +769,16 @@ int KeeperStateMachine::read_logical_snp_obj( return 1; } -void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestsForSessions & request_for_session) +void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) { std::shared_lock storage_lock(storage_mutex); + std::lock_guard response_lock(process_and_responses_lock); + auto responses = storage->processRequest( + request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/); + for (const auto & response : responses) + if (!responses_queue.push(response)) + LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id); - /// Pure local request, just process it with storage - std::lock_guard lock(process_and_responses_lock); - std::vector all_responses; - if (request_for_session.size() > 100) - { - all_responses.resize(request_for_session.size()); - //LOG_INFO(getLogger("Keeper"), "Has read requests {}", request_queue_it->second.size()); - for (size_t i = 0; i < request_for_session.size(); ++i) - { - read_pool.scheduleOrThrowOnError([&, i] - { - const auto & read_request = request_for_session[i]; - all_responses[i] = storage->processRequest( - read_request.request, read_request.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/); - }); - } - read_pool.wait(); - } - else - { - all_responses.reserve(request_for_session.size()); - for (const auto & read_request : request_for_session) - { - all_responses.push_back(storage->processRequest( - read_request.request, read_request.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/)); - } - } - - for (const auto & responses : all_responses) - for (const auto & response : responses) - if (!responses_queue.push(response)) - LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id); } void KeeperStateMachine::shutdownStorage() @@ -813,25 +789,23 @@ void KeeperStateMachine::shutdownStorage() std::vector KeeperStateMachine::getDeadSessions() { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getDeadSessions(); } int64_t KeeperStateMachine::getNextZxid() const { - std::shared_lock storage_lock(storage_mutex); return storage->getNextZXID(); } KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getNodesDigest(false, /*lock_transaction_mutex=*/true); } uint64_t KeeperStateMachine::getLastProcessedZxid() const { - std::shared_lock storage_lock(storage_mutex); return storage->getZXID(); } @@ -842,61 +816,61 @@ const KeeperStorage::Stats & KeeperStateMachine::getStorageStats() const TSA_NO_ uint64_t KeeperStateMachine::getTotalWatchesCount() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getTotalWatchesCount(); } uint64_t KeeperStateMachine::getWatchedPathsCount() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getWatchedPathsCount(); } uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getSessionsWithWatchesCount(); } uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getTotalEphemeralNodesCount(); } uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getSessionWithEphemeralNodesCount(); } void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); storage->dumpWatches(buf); } void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); storage->dumpWatchesByPath(buf); } void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); storage->dumpSessionsAndEphemerals(buf); } uint64_t KeeperStateMachine::getApproximateDataSize() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getApproximateDataSize(); } uint64_t KeeperStateMachine::getKeyArenaSize() const { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); return storage->getArenaDataSize(); } @@ -937,7 +911,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const void KeeperStateMachine::recalculateStorageStats() { - std::shared_lock storage_lock(storage_mutex); + std::unique_lock storage_lock(storage_mutex); LOG_INFO(log, "Recalculating storage stats"); storage->recalculateStats(); LOG_INFO(log, "Done recalculating storage stats"); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 68b9366611d..c1702394754 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -100,7 +100,7 @@ public: ClusterConfigPtr getClusterConfig() const; /// Process local read request - void processReadRequest(const KeeperStorage::RequestsForSessions & request_for_session); + void processReadRequest(const KeeperStorage::RequestForSession & request_for_session); std::vector getDeadSessions(); @@ -132,8 +132,6 @@ public: void reconfigure(const KeeperStorage::RequestForSession& request_for_session); private: - mutable SharedMutex storage_mutex; - CommitCallback commit_callback; /// In our state machine we always have a single snapshot which is stored /// in memory in compressed (serialized) format. @@ -156,6 +154,7 @@ private: /// Mutex for snapshots mutable std::mutex snapshots_lock; + mutable SharedMutex storage_mutex; /// Lock for storage and responses_queue. It's important to process requests /// and push them to the responses queue while holding this lock. Otherwise /// we can get strange cases when, for example client send read request with diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index e62be70ede5..238ab07a276 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -412,7 +412,7 @@ Overloaded(Ts...) -> Overloaded; std::shared_ptr KeeperStorage::UncommittedState::tryGetNodeFromStorage(StringRef path) const { - std::lock_guard lock(storage.storage_mutex); + std::shared_lock lock(storage.storage_mutex); if (auto node_it = storage.container.find(path); node_it != storage.container.end()) { const auto & committed_node = node_it->value; @@ -532,7 +532,7 @@ bool KeeperStorage::UncommittedState::hasACL(int64_t session_id, bool is_local, void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta) { - assert(!delta.path.empty()); + chassert(!delta.path.empty()); std::visit( [&](const DeltaType & operation) @@ -541,12 +541,12 @@ void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta) if constexpr (std::same_as) { - assert(node); + chassert(node); node = nullptr; } else if constexpr (std::same_as) { - assert(!node); + chassert(!node); node = std::make_shared(); node->stats = operation.stat; node->setData(operation.data); @@ -554,13 +554,13 @@ void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta) } else if constexpr (std::same_as) { - assert(node); + chassert(node); node->invalidateDigestCache(); node->stats = operation.old_stats; } else if constexpr (std::same_as) { - assert(node); + chassert(node); node->invalidateDigestCache(); node->setData(operation.old_data); } @@ -569,7 +569,7 @@ void KeeperStorage::UncommittedState::rollbackDelta(const Delta & delta) acls = operation.old_acls; } - if (applied_zxids.back() != delta.zxid) + if (applied_zxids.back() == delta.zxid) applied_zxids.pop_back(); }, delta.operation); @@ -645,15 +645,16 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid) rollback_deltas.splice(rollback_deltas.end(), deltas, delta_it.base(), deltas.end()); } + rollback(std::move(rollback_deltas)); +} + +void KeeperStorage::UncommittedState::rollback(std::list rollback_deltas) +{ // we need to undo ephemeral mapping modifications // CreateNodeDelta added ephemeral for session id -> we need to remove it // RemoveNodeDelta removed ephemeral for session id -> we need to add it back for (auto & delta : rollback_deltas) { - if (delta.zxid < rollback_zxid) - break; - - chassert(delta.zxid == rollback_zxid); if (!delta.path.empty()) { std::visit( @@ -674,6 +675,8 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid) } }, delta.operation); + + rollbackDelta(delta); } else if (auto * add_auth = std::get_if(&delta.operation)) { @@ -685,8 +688,6 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid) session_and_auth.erase(add_auth->session_id); } } - - rollbackDelta(delta); } } @@ -849,13 +850,9 @@ Coordination::Error KeeperStorage::commit(std::list deltas) removeDigest(node_it->value, path); auto updated_node = container.updateValue(path, [&](auto & node) { if constexpr (std::same_as) - { node.stats = operation.new_stats; - } else - { node.setData(std::move(operation.new_data)); - } }); addDigest(updated_node->value, path); @@ -1044,7 +1041,7 @@ Coordination::ACLs getNodeACLs(KeeperStorage & storage, StringRef path, bool is_ { if (is_local) { - std::lock_guard lock(storage.storage_mutex); + std::shared_lock lock(storage.storage_mutex); auto node_it = storage.container.find(path); if (node_it == storage.container.end()) return {}; @@ -1223,6 +1220,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr zxid, KeeperStorage::CreateNodeDelta{stat, std::move(node_acls), request.data}); + digest = storage.calculateNodesDigest(digest, new_deltas); return new_deltas; } @@ -2019,16 +2017,14 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro if (!new_subdeltas.empty()) { - if (auto * error = std::get_if(&new_deltas.back().operation); + if (auto * error = std::get_if(&new_subdeltas.back().operation); error && *operation_type == OperationType::Write) { - storage.uncommitted_state.rollback(zxid); + storage.uncommitted_state.rollback(std::move(new_deltas)); response_errors.push_back(error->error); for (size_t j = i + 1; j < concrete_requests.size(); ++j) - { response_errors.push_back(Coordination::Error::ZRUNTIMEINCONSISTENCY); - } return {KeeperStorage::Delta{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}}; } @@ -2043,9 +2039,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } digest = current_digest; - storage.uncommitted_state.addDeltas(std::move(new_deltas)); - return {}; } @@ -2188,19 +2182,12 @@ void KeeperStorage::finalize() finalized = true; - { - std::lock_guard lock(ephemerals_mutex); - ephemerals.clear(); - } + ephemerals.clear(); - { - std::lock_guard lock(watches_mutex); - watches.clear(); - list_watches.clear(); - sessions_and_watchers.clear(); - } + watches.clear(); + list_watches.clear(); + sessions_and_watchers.clear(); - std::lock_guard lock(session_mutex); session_expiry_queue.clear(); } @@ -2509,10 +2496,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( KeeperStorage::ResponsesForSessions results; /// ZooKeeper update sessions expirity for each request, not only for heartbeats - { - std::lock_guard lock(session_mutex); - session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); - } + session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { @@ -2520,7 +2504,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( { if (std::holds_alternative(delta.operation)) { - std::lock_guard lock(watches_mutex); auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); results.insert(results.end(), responses.begin(), responses.end()); } @@ -2540,11 +2523,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( auto response = std::make_shared(); response->xid = zk_request->xid; response->zxid = commit_zxid; - { - std::lock_guard lock(session_mutex); - session_expiry_queue.remove(session_id); - session_and_timeout.erase(session_id); - } + session_expiry_queue.remove(session_id); + session_and_timeout.erase(session_id); results.push_back(ResponseForSession{session_id, response}); } else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special @@ -2589,7 +2569,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( /// Watches for this requests are added to the watches lists if (zk_request->has_watch) { - std::lock_guard lock(watches_mutex); if (response->error == Coordination::Error::ZOK) { static constexpr std::array list_requests{ @@ -2614,7 +2593,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( /// If this requests processed successfully we need to check watches if (response->error == Coordination::Error::ZOK) { - std::lock_guard lock(watches_mutex); auto watch_responses = request_processor->processWatches(watches, list_watches); results.insert(results.end(), watch_responses.begin(), watch_responses.end()); } @@ -2631,6 +2609,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( std::lock_guard lock(transaction_mutex); if (new_last_zxid) uncommitted_transactions.pop_front(); + if (commit_zxid < zxid) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to commit smaller ZXID, commit ZXID: {}, current ZXID {}", commit_zxid, zxid); zxid = commit_zxid; } @@ -2639,7 +2619,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS { - std::unique_lock transaction_lock(transaction_mutex); if (allow_missing && (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid < rollback_zxid)) return; @@ -2655,7 +2634,6 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing) T try { uncommitted_transactions.pop_back(); - transaction_lock.unlock(); uncommitted_state.rollback(rollback_zxid); } catch (...) @@ -2672,7 +2650,7 @@ KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed, bool lock_tr if (committed) { - std::lock_guard storage_lock(storage_mutex); + std::shared_lock storage_lock(storage_mutex); return {CURRENT_DIGEST_VERSION, nodes_digest}; } @@ -2684,7 +2662,7 @@ KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed, bool lock_tr { if (lock_transaction_mutex) transaction_lock.unlock(); - std::lock_guard storage_lock(storage_mutex); + std::shared_lock storage_lock(storage_mutex); return {CURRENT_DIGEST_VERSION, nodes_digest}; } @@ -2709,7 +2687,6 @@ void KeeperStorage::addDigest(const Node & node, const std::string_view path) /// Allocate new session id with the specified timeouts int64_t KeeperStorage::getSessionID(int64_t session_timeout_ms) { - std::lock_guard lock(session_mutex); auto result = session_id_counter++; session_and_timeout.emplace(result, session_timeout_ms); session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms); @@ -2725,71 +2702,60 @@ void KeeperStorage::addSessionID(int64_t session_id, int64_t session_timeout_ms) std::vector KeeperStorage::getDeadSessions() const { - std::lock_guard lock(session_mutex); return session_expiry_queue.getExpiredSessions(); } SessionAndTimeout KeeperStorage::getActiveSessions() const { - std::lock_guard lock(session_mutex); return session_and_timeout; } /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version. void KeeperStorage::enableSnapshotMode(size_t up_to_version) { - std::lock_guard lock(storage_mutex); container.enableSnapshotMode(up_to_version); } /// Turn off snapshot mode. void KeeperStorage::disableSnapshotMode() { - std::lock_guard lock(storage_mutex); container.disableSnapshotMode(); } KeeperStorage::Container::const_iterator KeeperStorage::getSnapshotIteratorBegin() const { - std::lock_guard lock(storage_mutex); return container.begin(); } /// Clear outdated data from internal container. void KeeperStorage::clearGarbageAfterSnapshot() { - std::lock_guard lock(storage_mutex); container.clearOutdatedNodes(); } /// Introspection functions mostly used in 4-letter commands uint64_t KeeperStorage::getNodesCount() const { - std::lock_guard lock(storage_mutex); return container.size(); } uint64_t KeeperStorage::getApproximateDataSize() const { - std::lock_guard lock(storage_mutex); return container.getApproximateDataSize(); } uint64_t KeeperStorage::getArenaDataSize() const { - std::lock_guard lock(storage_mutex); return container.keyArenaSize(); } uint64_t KeeperStorage::getWatchedPathsCount() const { - std::lock_guard lock(watches_mutex); return watches.size() + list_watches.size(); } void KeeperStorage::clearDeadWatches(int64_t session_id) { - std::lock_guard lock(watches_mutex); /// Clear all watches for this session auto watches_it = sessions_and_watchers.find(session_id); if (watches_it != sessions_and_watchers.end()) @@ -2823,7 +2789,6 @@ void KeeperStorage::clearDeadWatches(int64_t session_id) void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const { - std::lock_guard lock(watches_mutex); for (const auto & [session_id, watches_paths] : sessions_and_watchers) { buf << "0x" << getHexUIntLowercase(session_id) << "\n"; @@ -2842,7 +2807,6 @@ void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const } }; - std::lock_guard lock(watches_mutex); for (const auto & [watch_path, sessions] : watches) { buf << watch_path << "\n"; @@ -2866,17 +2830,13 @@ void KeeperStorage::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) co } }; - { - std::lock_guard lock(session_mutex); - buf << "Sessions dump (" << session_and_timeout.size() << "):\n"; + buf << "Sessions dump (" << session_and_timeout.size() << "):\n"; - for (const auto & [session_id, _] : session_and_timeout) - { - buf << "0x" << getHexUIntLowercase(session_id) << "\n"; - } + for (const auto & [session_id, _] : session_and_timeout) + { + buf << "0x" << getHexUIntLowercase(session_id) << "\n"; } - std::lock_guard ephemerals_lock(ephemerals_mutex); buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCountLocked() << "):\n"; for (const auto & [session_id, ephemeral_paths] : ephemerals) { @@ -2904,7 +2864,6 @@ const KeeperStorage::Stats & KeeperStorage::getStorageStats() const uint64_t KeeperStorage::getTotalWatchesCount() const { - std::lock_guard lock(watches_mutex); uint64_t ret = 0; for (const auto & [session, paths] : sessions_and_watchers) ret += paths.size(); @@ -2914,7 +2873,6 @@ uint64_t KeeperStorage::getTotalWatchesCount() const uint64_t KeeperStorage::getSessionWithEphemeralNodesCount() const { - std::lock_guard ephemerals_lock(ephemerals_mutex); return getSessionWithEphemeralNodesCountLocked(); } @@ -2925,13 +2883,11 @@ uint64_t KeeperStorage::getSessionWithEphemeralNodesCountLocked() const uint64_t KeeperStorage::getSessionsWithWatchesCount() const { - std::lock_guard lock(watches_mutex); return sessions_and_watchers.size(); } uint64_t KeeperStorage::getTotalEphemeralNodesCount() const { - std::lock_guard ephemerals_lock(ephemerals_mutex); uint64_t ret = 0; for (const auto & [session_id, nodes] : ephemerals) ret += nodes.size(); @@ -2941,7 +2897,6 @@ uint64_t KeeperStorage::getTotalEphemeralNodesCount() const void KeeperStorage::recalculateStats() { - std::lock_guard lock(storage_mutex); container.recalculateDataSize(); } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 0e773fe68ef..d72ae46dee2 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -370,6 +370,7 @@ public: void addDeltas(std::list new_deltas); void cleanup(int64_t commit_zxid); void rollback(int64_t rollback_zxid); + void rollback(std::list rollback_deltas); std::shared_ptr getNode(StringRef path) const; Coordination::ACLs getACLs(StringRef path) const; @@ -451,14 +452,13 @@ public: mutable std::mutex ephemerals_mutex; /// Mapping session_id -> set of ephemeral nodes paths - Ephemerals ephemerals TSA_GUARDED_BY(ephemerals_mutex); + Ephemerals ephemerals; - mutable std::mutex session_mutex; - int64_t session_id_counter TSA_GUARDED_BY(session_mutex) = 1; + int64_t session_id_counter = 1; /// Expiration queue for session, allows to get dead sessions at some point of time - SessionExpiryQueue session_expiry_queue TSA_GUARDED_BY(session_mutex); + SessionExpiryQueue session_expiry_queue; /// All active sessions with timeout - SessionAndTimeout session_and_timeout TSA_GUARDED_BY(session_mutex); + SessionAndTimeout session_and_timeout; /// ACLMap for more compact ACLs storage inside nodes. ACLMap acl_map; @@ -490,13 +490,12 @@ public: std::atomic finalized{false}; - mutable std::mutex watches_mutex; /// Mapping session_id -> set of watched nodes paths - SessionAndWatcher sessions_and_watchers TSA_GUARDED_BY(watches_mutex); + SessionAndWatcher sessions_and_watchers; /// Currently active watches (node_path -> subscribed sessions) - Watches watches TSA_GUARDED_BY(watches_mutex); - Watches list_watches TSA_GUARDED_BY(watches_mutex); /// Watches for 'list' request (watches on children). + Watches watches; + Watches list_watches; /// Watches for 'list' request (watches on children). void clearDeadWatches(int64_t session_id); @@ -606,7 +605,7 @@ public: void recalculateStats(); private: - uint64_t getSessionWithEphemeralNodesCountLocked() const TSA_REQUIRES(ephemerals_mutex); + uint64_t getSessionWithEphemeralNodesCountLocked() const; void removeDigest(const Node & node, std::string_view path); void addDigest(const Node & node, std::string_view path);