diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 28bc1bf112b..493841fc50e 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -148,7 +148,8 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl( watch_response->state = Coordination::State::CONNECTED; for (auto watcher_session : watch_it->second) { - [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path); + [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase( + KeeperStorageBase::WatchInfo{.path = path, .is_list_watch = false}); chassert(erased); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_response}); } @@ -188,7 +189,8 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl( watch_list_response->state = Coordination::State::CONNECTED; for (auto watcher_session : watch_it->second) { - [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path_to_check); + [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase( + KeeperStorageBase::WatchInfo{.path = path_to_check, .is_list_watch = true}); chassert(erased); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_list_response}); } @@ -828,18 +830,12 @@ void KeeperStorage::UncommittedState::rollback(std::list rollb if constexpr (std::same_as) { if (operation.stat.ephemeralOwner != 0) - { - std::lock_guard lock(storage.ephemeral_mutex); unregisterEphemeralPath(storage.uncommitted_state.ephemerals, operation.stat.ephemeralOwner, delta.path, /*throw_if_missing=*/false); - } } else if constexpr (std::same_as) { if (operation.stat.ephemeralOwner() != 0) - { - std::lock_guard lock(storage.ephemeral_mutex); storage.uncommitted_state.ephemerals[operation.stat.ephemeralOwner()].emplace(delta.path); - } } }, delta.operation); @@ -1159,7 +1155,6 @@ bool KeeperStorage::createNode( ++committed_ephemeral_nodes; std::lock_guard lock(ephemeral_mutex); committed_ephemerals[stat.ephemeralOwner].emplace(path); - unregisterEphemeralPath(uncommitted_state.ephemerals, stat.ephemeralOwner, path, /*throw_if_missing=*/false); } return true; @@ -1454,10 +1449,7 @@ std::list preprocess( return {typename Storage::Delta{zxid, Coordination::Error::ZINVALIDACL}}; if (zk_request.is_ephemeral) - { - std::lock_guard lock(storage.ephemeral_mutex); storage.uncommitted_state.ephemerals[session_id].emplace(path_created); - } int32_t parent_cversion = zk_request.parent_cversion; @@ -1721,7 +1713,6 @@ std::list preprocess( if (node->stats.isEphemeral()) { - std::lock_guard ephemeral_lock(storage.ephemeral_mutex); /// try deleting the ephemeral node from the uncommitted state unregisterEphemeralPath(storage.uncommitted_state.ephemerals, node->stats.ephemeralOwner(), zk_request.path, /*throw_if_missing=*/false); } @@ -2027,7 +2018,6 @@ std::list preprocess( const auto * remove_delta = std::get_if(&delta.operation); if (remove_delta && remove_delta->stat.ephemeralOwner()) { - std::lock_guard lock(storage.ephemeral_mutex); unregisterEphemeralPath( storage.uncommitted_state.ephemerals, remove_delta->stat.ephemeralOwner(), delta.path, /*throw_if_missing=*/false); } @@ -2281,7 +2271,17 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList const auto children = get_children(); response->names.reserve(children->size()); - chassert(static_cast(node_it->value.stats.numChildren()) == children->size()); +#ifdef DEBUG_OR_SANITIZER_BUILD + if (!zk_request.path.starts_with(keeper_system_path) && static_cast(node_it->value.stats.numChildren()) != children->size()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Difference between numChildren ({}) and actual children size ({}) for '{}'", + node_it->value.stats.numChildren(), + children->size(), + zk_request.path); + } +#endif const auto add_child = [&](const auto & child) { @@ -3004,13 +3004,25 @@ void KeeperStorage::preprocessRequest( StringHashForHeterogeneousLookup, StringHashForHeterogeneousLookup::transparent_key_equal> parent_updates; - const auto process_ephemerals_for_session = [&](auto & current_ephemerals) + + const auto process_ephemerals_for_session + = [&](const auto & current_ephemerals, auto & processed_ephemeral_nodes, bool check_processed_nodes) { auto session_ephemerals = current_ephemerals.find(session_id); if (session_ephemerals != current_ephemerals.end()) { for (const auto & ephemeral_path : session_ephemerals->second) { + if (check_processed_nodes) + { + if (processed_ephemeral_nodes.contains(ephemeral_path)) + continue; + } + else + { + processed_ephemeral_nodes.insert(ephemeral_path); + } + auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false); /// maybe the node is deleted or recreated with different session_id in the uncommitted state @@ -3041,12 +3053,16 @@ void KeeperStorage::preprocessRequest( { /// storage lock should always be taken before ephemeral lock std::shared_lock storage_lock(storage_mutex); + + std::unordered_set processed_ephemeral_nodes; + process_ephemerals_for_session(uncommitted_state.ephemerals, processed_ephemeral_nodes, /*check_processed_nodes=*/false); + std::lock_guard ephemeral_lock(ephemeral_mutex); - process_ephemerals_for_session(committed_ephemerals); - process_ephemerals_for_session(uncommitted_state.ephemerals); - uncommitted_state.ephemerals.erase(session_id); + process_ephemerals_for_session(committed_ephemerals, processed_ephemeral_nodes, /*check_processed_nodes=*/true); } + uncommitted_state.ephemerals.erase(session_id); + for (auto & [parent_path, parent_update_delta] : parent_updates) { new_deltas.emplace_back @@ -3234,24 +3250,27 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process static constexpr std::array list_requests{ Coordination::OpNum::List, Coordination::OpNum::SimpleList, Coordination::OpNum::FilteredList}; - auto & watches_type = std::find(list_requests.begin(), list_requests.end(), zk_request->getOpNum()) != list_requests.end() - ? list_watches - : watches; + auto is_list_watch + = std::find(list_requests.begin(), list_requests.end(), zk_request->getOpNum()) != list_requests.end(); - auto add_watch_result = watches_type[zk_request->getPath()].emplace(session_id); - if (add_watch_result.second) + auto & watches_type = is_list_watch ? list_watches : watches; + + auto [watch_it, path_inserted] = watches_type.try_emplace(zk_request->getPath()); + auto [path_it, session_inserted] = watch_it->second.emplace(session_id); + if (session_inserted) { ++total_watches_count; - sessions_and_watchers[session_id].emplace(zk_request->getPath()); + sessions_and_watchers[session_id].emplace(WatchInfo{.path = watch_it->first, .is_list_watch = is_list_watch}); } } else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) { - auto add_watch_result = watches[zk_request->getPath()].emplace(session_id); - if (add_watch_result.second) + auto [watch_it, path_inserted] = watches.try_emplace(zk_request->getPath()); + auto session_insert_info = watch_it->second.emplace(session_id); + if (session_insert_info.second) { ++total_watches_count; - sessions_and_watchers[session_id].emplace(zk_request->getPath()); + sessions_and_watchers[session_id].emplace(WatchInfo{.path = watch_it->first, .is_list_watch = false}); } } } @@ -3448,27 +3467,26 @@ void KeeperStorage::clearDeadWatches(int64_t session_id) if (watches_it == sessions_and_watchers.end()) return; - for (const auto & watch_path : watches_it->second) + for (const auto [watch_path, is_list_watch] : watches_it->second) { - /// Maybe it's a normal watch - auto watch = watches.find(watch_path); - if (watch != watches.end()) - { - auto & watches_for_path = watch->second; - watches_for_path.erase(session_id); - if (watches_for_path.empty()) - watches.erase(watch); - } - - /// Maybe it's a list watch - auto list_watch = list_watches.find(watch_path); - if (list_watch != list_watches.end()) + if (is_list_watch) { + auto list_watch = list_watches.find(watch_path); + chassert(list_watch != list_watches.end()); auto & list_watches_for_path = list_watch->second; list_watches_for_path.erase(session_id); if (list_watches_for_path.empty()) list_watches.erase(list_watch); } + else + { + auto watch = watches.find(watch_path); + chassert(watch != watches.end()); + auto & watches_for_path = watch->second; + watches_for_path.erase(session_id); + if (watches_for_path.empty()) + watches.erase(watch); + } } total_watches_count -= watches_it->second.size(); @@ -3481,7 +3499,7 @@ void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const for (const auto & [session_id, watches_paths] : sessions_and_watchers) { buf << "0x" << getHexUIntLowercase(session_id) << "\n"; - for (const String & path : watches_paths) + for (const auto [path, is_list_watch] : watches_paths) buf << "\t" << path << "\n"; } } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 993ed626f96..03f2084df7c 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -6,6 +6,7 @@ #include #include #include +#include "Common/StringHashForHeterogeneousLookup.h" #include #include @@ -314,13 +315,36 @@ public: }; using Ephemerals = std::unordered_map>; - using SessionAndWatcher = std::unordered_map>; + struct WatchInfo + { + std::string_view path; + bool is_list_watch; + + bool operator==(const WatchInfo &) const = default; + }; + + struct WatchInfoHash + { + auto operator()(WatchInfo info) const + { + SipHash hash; + hash.update(info.path); + hash.update(info.is_list_watch); + return hash.get64(); + } + }; + + using SessionAndWatcher = std::unordered_map>; using SessionIDs = std::unordered_set; /// Just vector of SHA1 from user:password using AuthIDs = std::vector; using SessionAndAuth = std::unordered_map; - using Watches = std::unordered_map; + using Watches = std::unordered_map< + String /* path, relative of root_path */, + SessionIDs, + StringHashForHeterogeneousLookup, + StringHashForHeterogeneousLookup::transparent_key_equal>; // Applying ZooKeeper request to storage consists of two steps: // - preprocessing which, instead of applying the changes directly to storage,