From 8db3dddb3dc38dc4b75a1a62ece44e2c382482a6 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 17 Sep 2024 14:15:33 +0200 Subject: [PATCH] Fix watches count and list request --- src/Coordination/KeeperStorage.cpp | 207 +++++++++++++++++------------ src/Coordination/KeeperStorage.h | 1 + 2 files changed, 121 insertions(+), 87 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 397e5db1f71..28bc1bf112b 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -130,7 +130,11 @@ void unregisterEphemeralPath(KeeperStorageBase::Ephemerals & ephemerals, int64_t } KeeperStorageBase::ResponsesForSessions processWatchesImpl( - const String & path, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches, Coordination::Event event_type) + const String & path, + KeeperStorageBase::Watches & watches, + KeeperStorageBase::Watches & list_watches, + KeeperStorageBase::SessionAndWatcher & sessions_and_watchers, + Coordination::Event event_type) { KeeperStorageBase::ResponsesForSessions result; auto watch_it = watches.find(path); @@ -143,7 +147,11 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl( watch_response->type = event_type; watch_response->state = Coordination::State::CONNECTED; for (auto watcher_session : watch_it->second) + { + [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path); + chassert(erased); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_response}); + } watches.erase(watch_it); } @@ -179,7 +187,11 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl( watch_list_response->state = Coordination::State::CONNECTED; for (auto watcher_session : watch_it->second) + { + [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path_to_check); + chassert(erased); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_list_response}); + } list_watches.erase(watch_it); } @@ -1322,8 +1334,12 @@ std::list preprocess( } template T> -KeeperStorageBase::ResponsesForSessions -processWatches(const T & /*zk_request*/, KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) +KeeperStorageBase::ResponsesForSessions processWatches( + const T & /*zk_request*/, + KeeperStorageBase::DeltaRange /*deltas*/, + KeeperStorageBase::Watches & /*watches*/, + KeeperStorageBase::Watches & /*list_watches*/, + KeeperStorageBase::SessionAndWatcher & /*sessions_and_watchers*/) { return {}; } @@ -1371,9 +1387,10 @@ KeeperStorageBase::ResponsesForSessions processWatches( const Coordination::ZooKeeperCreateRequest & zk_request, KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::Watches & watches, - KeeperStorageBase::Watches & list_watches) + KeeperStorageBase::Watches & list_watches, + KeeperStorageBase::SessionAndWatcher & sessions_and_watchers) { - return processWatchesImpl(zk_request.getPath(), watches, list_watches, Coordination::Event::CREATED); + return processWatchesImpl(zk_request.getPath(), watches, list_watches, sessions_and_watchers, Coordination::Event::CREATED); } template @@ -1616,9 +1633,10 @@ KeeperStorageBase::ResponsesForSessions processWatches( const Coordination::ZooKeeperRemoveRequest & zk_request, KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::Watches & watches, - KeeperStorageBase::Watches & list_watches) + KeeperStorageBase::Watches & list_watches, + KeeperStorageBase::SessionAndWatcher & sessions_and_watchers) { - return processWatchesImpl(zk_request.getPath(), watches, list_watches, Coordination::Event::DELETED); + return processWatchesImpl(zk_request.getPath(), watches, list_watches, sessions_and_watchers, Coordination::Event::DELETED); } template @@ -1903,7 +1921,8 @@ KeeperStorageBase::ResponsesForSessions processWatches( const Coordination::ZooKeeperRemoveRecursiveRequest & /*zk_request*/, KeeperStorageBase::DeltaRange deltas, KeeperStorageBase::Watches & watches, - KeeperStorageBase::Watches & list_watches) + KeeperStorageBase::Watches & list_watches, + KeeperStorageBase::SessionAndWatcher & sessions_and_watchers) { KeeperStorageBase::ResponsesForSessions responses; for (const auto & delta : deltas) @@ -1911,7 +1930,7 @@ KeeperStorageBase::ResponsesForSessions processWatches( const auto * remove_delta = std::get_if(&delta.operation); if (remove_delta) { - auto new_responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); + auto new_responses = processWatchesImpl(delta.path, watches, list_watches, sessions_and_watchers, Coordination::Event::DELETED); responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end())); } } @@ -2110,9 +2129,10 @@ KeeperStorageBase::ResponsesForSessions processWatches( const Coordination::ZooKeeperSetRequest & zk_request, KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::Watches & watches, - KeeperStorageBase::Watches & list_watches) + KeeperStorageBase::Watches & list_watches, + KeeperStorageBase::SessionAndWatcher & sessions_and_watchers) { - return processWatchesImpl(zk_request.getPath(), watches, list_watches, Coordination::Event::CHANGED); + return processWatchesImpl(zk_request.getPath(), watches, list_watches, sessions_and_watchers, Coordination::Event::CHANGED); } template @@ -2250,15 +2270,18 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList if (path_prefix.empty()) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Path cannot be empty"); - const auto & get_children = [&]() + const auto get_children = [&]() { if constexpr (Storage::use_rocksdb) - return container.getChildren(zk_request.path); + return std::optional{container.getChildren(zk_request.path)}; else - return node_it->value.getChildren(); + return &node_it->value.getChildren(); }; - const auto & children = get_children(); - response->names.reserve(children.size()); + + const auto children = get_children(); + response->names.reserve(children->size()); + + chassert(static_cast(node_it->value.stats.numChildren()) == children->size()); const auto add_child = [&](const auto & child) { @@ -2290,7 +2313,7 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); }; - for (const auto & child : children) + for (const auto & child : *children) { if (add_child(child)) { @@ -2572,7 +2595,8 @@ KeeperStorageBase::ResponsesForSessions processWatches( const Coordination::ZooKeeperMultiRequest & zk_request, KeeperStorageBase::DeltaRange deltas, KeeperStorageBase::Watches & watches, - KeeperStorageBase::Watches & list_watches) + KeeperStorageBase::Watches & list_watches, + KeeperStorageBase::SessionAndWatcher & sessions_and_watchers) { KeeperStorageBase::ResponsesForSessions result; @@ -2581,7 +2605,7 @@ KeeperStorageBase::ResponsesForSessions processWatches( { auto subdeltas = extractSubdeltas(deltas); auto responses = callOnConcreteRequestType( - *generic_request, [&](const auto & subrequest) { return processWatches(subrequest, subdeltas, watches, list_watches); }); + *generic_request, [&](const auto & subrequest) { return processWatches(subrequest, subdeltas, watches, list_watches, sessions_and_watchers); }); result.insert(result.end(), responses.begin(), responses.end()); } return result; @@ -2974,52 +2998,47 @@ void KeeperStorage::preprocessRequest( if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { + std::unordered_map< + std::string, + UpdateNodeStatDelta, + StringHashForHeterogeneousLookup, + StringHashForHeterogeneousLookup::transparent_key_equal> + parent_updates; + const auto process_ephemerals_for_session = [&](auto & current_ephemerals) { - const auto process_ephemerals_for_session = [&](auto & current_ephemerals) + auto session_ephemerals = current_ephemerals.find(session_id); + if (session_ephemerals != current_ephemerals.end()) { - auto session_ephemerals = current_ephemerals.find(session_id); - if (session_ephemerals != current_ephemerals.end()) + for (const auto & ephemeral_path : session_ephemerals->second) { - std::unordered_map parent_updates; - for (const auto & ephemeral_path : session_ephemerals->second) + auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false); + + /// maybe the node is deleted or recreated with different session_id in the uncommitted state + if (!node || node->stats.ephemeralOwner() != session_id) + continue; + + auto parent_node_path = parentNodePath(ephemeral_path).toView(); + + auto parent_update_it = parent_updates.find(parent_node_path); + if (parent_update_it == parent_updates.end()) { - auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false); - - /// maybe the node is deleted or recreated with different session_id in the uncommitted state - if (!node || node->stats.ephemeralOwner() != session_id) - continue; - - auto parent_node_path = parentNodePath(ephemeral_path).toView(); - - auto parent_update_it = parent_updates.find(parent_node_path); - if (parent_update_it == parent_updates.end()) - { - auto parent_node = uncommitted_state.getNode(StringRef{parent_node_path}, /*should_lock_storage=*/false); - std::tie(parent_update_it, std::ignore) = parent_updates.emplace(parent_node_path, *parent_node); - } - - auto & parent_update_delta = parent_update_it->second; - ++parent_update_delta.new_stats.cversion; - parent_update_delta.new_stats.decreaseNumChildren(); - - new_deltas.emplace_back( - ephemeral_path, - transaction->zxid, - RemoveNodeDelta{.stat = node->stats, .acls = uncommitted_state.getACLs(ephemeral_path), .data = std::string{node->getData()}}); + auto parent_node = uncommitted_state.getNode(StringRef{parent_node_path}, /*should_lock_storage=*/false); + std::tie(parent_update_it, std::ignore) = parent_updates.emplace(parent_node_path, *parent_node); } - for (auto & [parent_path, parent_update_delta] : parent_updates) - { - new_deltas.emplace_back - ( - std::string{parent_path}, - new_last_zxid, - std::move(parent_update_delta) - ); - } + auto & parent_update_delta = parent_update_it->second; + ++parent_update_delta.new_stats.cversion; + parent_update_delta.new_stats.decreaseNumChildren(); + + new_deltas.emplace_back( + ephemeral_path, + transaction->zxid, + RemoveNodeDelta{.stat = node->stats, .acls = uncommitted_state.getACLs(ephemeral_path), .data = std::string{node->getData()}}); } - }; + } + }; + { /// storage lock should always be taken before ephemeral lock std::shared_lock storage_lock(storage_mutex); std::lock_guard ephemeral_lock(ephemeral_mutex); @@ -3028,6 +3047,16 @@ void KeeperStorage::preprocessRequest( uncommitted_state.ephemerals.erase(session_id); } + for (auto & [parent_path, parent_update_delta] : parent_updates) + { + new_deltas.emplace_back + ( + parent_path, + new_last_zxid, + std::move(parent_update_delta) + ); + } + new_deltas.emplace_back(transaction->zxid, CloseSessionDelta{session_id}); new_digest = calculateNodesDigest(new_digest, new_deltas); return; @@ -3132,7 +3161,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process { if (std::holds_alternative(delta.operation)) { - auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); + auto responses = processWatchesImpl(delta.path, watches, list_watches, sessions_and_watchers, Coordination::Event::DELETED); results.insert(results.end(), responses.begin(), responses.end()); } } @@ -3211,20 +3240,27 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process auto add_watch_result = watches_type[zk_request->getPath()].emplace(session_id); if (add_watch_result.second) + { + ++total_watches_count; sessions_and_watchers[session_id].emplace(zk_request->getPath()); + } } else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) { auto add_watch_result = watches[zk_request->getPath()].emplace(session_id); if (add_watch_result.second) + { + ++total_watches_count; sessions_and_watchers[session_id].emplace(zk_request->getPath()); + } } } /// If this requests processed successfully we need to check watches if (response->error == Coordination::Error::ZOK) { - auto watch_responses = processWatches(concrete_zk_request, deltas_range, watches, list_watches); + auto watch_responses = processWatches(concrete_zk_request, deltas_range, watches, list_watches, sessions_and_watchers); + total_watches_count -= watch_responses.size(); results.insert(results.end(), watch_responses.begin(), watch_responses.end()); } @@ -3409,33 +3445,34 @@ void KeeperStorage::clearDeadWatches(int64_t session_id) { /// Clear all watches for this session auto watches_it = sessions_and_watchers.find(session_id); - if (watches_it != sessions_and_watchers.end()) - { - for (const auto & watch_path : watches_it->second) - { - /// Maybe it's a normal watch - auto watch = watches.find(watch_path); - if (watch != watches.end()) - { - auto & watches_for_path = watch->second; - watches_for_path.erase(session_id); - if (watches_for_path.empty()) - watches.erase(watch); - } + if (watches_it == sessions_and_watchers.end()) + return; - /// Maybe it's a list watch - auto list_watch = list_watches.find(watch_path); - if (list_watch != list_watches.end()) - { - auto & list_watches_for_path = list_watch->second; - list_watches_for_path.erase(session_id); - if (list_watches_for_path.empty()) - list_watches.erase(list_watch); - } + for (const auto & watch_path : watches_it->second) + { + /// Maybe it's a normal watch + auto watch = watches.find(watch_path); + if (watch != watches.end()) + { + auto & watches_for_path = watch->second; + watches_for_path.erase(session_id); + if (watches_for_path.empty()) + watches.erase(watch); } - sessions_and_watchers.erase(watches_it); + /// Maybe it's a list watch + auto list_watch = list_watches.find(watch_path); + if (list_watch != list_watches.end()) + { + auto & list_watches_for_path = list_watch->second; + list_watches_for_path.erase(session_id); + if (list_watches_for_path.empty()) + list_watches.erase(list_watch); + } } + + total_watches_count -= watches_it->second.size(); + sessions_and_watchers.erase(watches_it); } template @@ -3521,11 +3558,7 @@ const KeeperStorageBase::Stats & KeeperStorage::getStorageStats() con template uint64_t KeeperStorage::getTotalWatchesCount() const { - uint64_t ret = 0; - for (const auto & [session, paths] : sessions_and_watchers) - ret += paths.size(); - - return ret; + return total_watches_count; } template diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 9509ac79f2b..993ed626f96 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -636,6 +636,7 @@ public: /// Mapping session_id -> set of watched nodes paths SessionAndWatcher sessions_and_watchers; + size_t total_watches_count = 0; /// Currently active watches (node_path -> subscribed sessions) Watches watches;