Fix watches count and list request

This commit is contained in:
Antonio Andelic 2024-09-17 14:15:33 +02:00
parent f3654b8fc8
commit 8db3dddb3d
2 changed files with 121 additions and 87 deletions

View File

@ -130,7 +130,11 @@ void unregisterEphemeralPath(KeeperStorageBase::Ephemerals & ephemerals, int64_t
} }
KeeperStorageBase::ResponsesForSessions processWatchesImpl( KeeperStorageBase::ResponsesForSessions processWatchesImpl(
const String & path, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches, Coordination::Event event_type) const String & path,
KeeperStorageBase::Watches & watches,
KeeperStorageBase::Watches & list_watches,
KeeperStorageBase::SessionAndWatcher & sessions_and_watchers,
Coordination::Event event_type)
{ {
KeeperStorageBase::ResponsesForSessions result; KeeperStorageBase::ResponsesForSessions result;
auto watch_it = watches.find(path); auto watch_it = watches.find(path);
@ -143,7 +147,11 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl(
watch_response->type = event_type; watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED; watch_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : watch_it->second) for (auto watcher_session : watch_it->second)
{
[[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path);
chassert(erased);
result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_response}); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_response});
}
watches.erase(watch_it); watches.erase(watch_it);
} }
@ -179,7 +187,11 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl(
watch_list_response->state = Coordination::State::CONNECTED; watch_list_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : watch_it->second) for (auto watcher_session : watch_it->second)
{
[[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path_to_check);
chassert(erased);
result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_list_response}); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_list_response});
}
list_watches.erase(watch_it); list_watches.erase(watch_it);
} }
@ -1322,8 +1334,12 @@ std::list<KeeperStorageBase::Delta> preprocess(
} }
template <std::derived_from<Coordination::ZooKeeperRequest> T> template <std::derived_from<Coordination::ZooKeeperRequest> T>
KeeperStorageBase::ResponsesForSessions KeeperStorageBase::ResponsesForSessions processWatches(
processWatches(const T & /*zk_request*/, KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const T & /*zk_request*/,
KeeperStorageBase::DeltaRange /*deltas*/,
KeeperStorageBase::Watches & /*watches*/,
KeeperStorageBase::Watches & /*list_watches*/,
KeeperStorageBase::SessionAndWatcher & /*sessions_and_watchers*/)
{ {
return {}; return {};
} }
@ -1371,9 +1387,10 @@ KeeperStorageBase::ResponsesForSessions processWatches(
const Coordination::ZooKeeperCreateRequest & zk_request, const Coordination::ZooKeeperCreateRequest & zk_request,
KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::DeltaRange /*deltas*/,
KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & watches,
KeeperStorageBase::Watches & list_watches) KeeperStorageBase::Watches & list_watches,
KeeperStorageBase::SessionAndWatcher & sessions_and_watchers)
{ {
return processWatchesImpl(zk_request.getPath(), watches, list_watches, Coordination::Event::CREATED); return processWatchesImpl(zk_request.getPath(), watches, list_watches, sessions_and_watchers, Coordination::Event::CREATED);
} }
template <typename Storage> template <typename Storage>
@ -1616,9 +1633,10 @@ KeeperStorageBase::ResponsesForSessions processWatches(
const Coordination::ZooKeeperRemoveRequest & zk_request, const Coordination::ZooKeeperRemoveRequest & zk_request,
KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::DeltaRange /*deltas*/,
KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & watches,
KeeperStorageBase::Watches & list_watches) KeeperStorageBase::Watches & list_watches,
KeeperStorageBase::SessionAndWatcher & sessions_and_watchers)
{ {
return processWatchesImpl(zk_request.getPath(), watches, list_watches, Coordination::Event::DELETED); return processWatchesImpl(zk_request.getPath(), watches, list_watches, sessions_and_watchers, Coordination::Event::DELETED);
} }
template <typename Storage> template <typename Storage>
@ -1903,7 +1921,8 @@ KeeperStorageBase::ResponsesForSessions processWatches(
const Coordination::ZooKeeperRemoveRecursiveRequest & /*zk_request*/, const Coordination::ZooKeeperRemoveRecursiveRequest & /*zk_request*/,
KeeperStorageBase::DeltaRange deltas, KeeperStorageBase::DeltaRange deltas,
KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & watches,
KeeperStorageBase::Watches & list_watches) KeeperStorageBase::Watches & list_watches,
KeeperStorageBase::SessionAndWatcher & sessions_and_watchers)
{ {
KeeperStorageBase::ResponsesForSessions responses; KeeperStorageBase::ResponsesForSessions responses;
for (const auto & delta : deltas) for (const auto & delta : deltas)
@ -1911,7 +1930,7 @@ KeeperStorageBase::ResponsesForSessions processWatches(
const auto * remove_delta = std::get_if<KeeperStorageBase::RemoveNodeDelta>(&delta.operation); const auto * remove_delta = std::get_if<KeeperStorageBase::RemoveNodeDelta>(&delta.operation);
if (remove_delta) if (remove_delta)
{ {
auto new_responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); auto new_responses = processWatchesImpl(delta.path, watches, list_watches, sessions_and_watchers, Coordination::Event::DELETED);
responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end())); responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end()));
} }
} }
@ -2110,9 +2129,10 @@ KeeperStorageBase::ResponsesForSessions processWatches(
const Coordination::ZooKeeperSetRequest & zk_request, const Coordination::ZooKeeperSetRequest & zk_request,
KeeperStorageBase::DeltaRange /*deltas*/, KeeperStorageBase::DeltaRange /*deltas*/,
KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & watches,
KeeperStorageBase::Watches & list_watches) KeeperStorageBase::Watches & list_watches,
KeeperStorageBase::SessionAndWatcher & sessions_and_watchers)
{ {
return processWatchesImpl(zk_request.getPath(), watches, list_watches, Coordination::Event::CHANGED); return processWatchesImpl(zk_request.getPath(), watches, list_watches, sessions_and_watchers, Coordination::Event::CHANGED);
} }
template <typename Storage> template <typename Storage>
@ -2250,15 +2270,18 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList
if (path_prefix.empty()) if (path_prefix.empty())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Path cannot be empty"); throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Path cannot be empty");
const auto & get_children = [&]() const auto get_children = [&]()
{ {
if constexpr (Storage::use_rocksdb) if constexpr (Storage::use_rocksdb)
return container.getChildren(zk_request.path); return std::optional{container.getChildren(zk_request.path)};
else else
return node_it->value.getChildren(); return &node_it->value.getChildren();
}; };
const auto & children = get_children();
response->names.reserve(children.size()); const auto children = get_children();
response->names.reserve(children->size());
chassert(static_cast<size_t>(node_it->value.stats.numChildren()) == children->size());
const auto add_child = [&](const auto & child) const auto add_child = [&](const auto & child)
{ {
@ -2290,7 +2313,7 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList
return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY);
}; };
for (const auto & child : children) for (const auto & child : *children)
{ {
if (add_child(child)) if (add_child(child))
{ {
@ -2572,7 +2595,8 @@ KeeperStorageBase::ResponsesForSessions processWatches(
const Coordination::ZooKeeperMultiRequest & zk_request, const Coordination::ZooKeeperMultiRequest & zk_request,
KeeperStorageBase::DeltaRange deltas, KeeperStorageBase::DeltaRange deltas,
KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & watches,
KeeperStorageBase::Watches & list_watches) KeeperStorageBase::Watches & list_watches,
KeeperStorageBase::SessionAndWatcher & sessions_and_watchers)
{ {
KeeperStorageBase::ResponsesForSessions result; KeeperStorageBase::ResponsesForSessions result;
@ -2581,7 +2605,7 @@ KeeperStorageBase::ResponsesForSessions processWatches(
{ {
auto subdeltas = extractSubdeltas(deltas); auto subdeltas = extractSubdeltas(deltas);
auto responses = callOnConcreteRequestType( auto responses = callOnConcreteRequestType(
*generic_request, [&](const auto & subrequest) { return processWatches(subrequest, subdeltas, watches, list_watches); }); *generic_request, [&](const auto & subrequest) { return processWatches(subrequest, subdeltas, watches, list_watches, sessions_and_watchers); });
result.insert(result.end(), responses.begin(), responses.end()); result.insert(result.end(), responses.begin(), responses.end());
} }
return result; return result;
@ -2974,52 +2998,47 @@ void KeeperStorage<Container>::preprocessRequest(
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{ {
std::unordered_map<
std::string,
UpdateNodeStatDelta,
StringHashForHeterogeneousLookup,
StringHashForHeterogeneousLookup::transparent_key_equal>
parent_updates;
const auto process_ephemerals_for_session = [&](auto & current_ephemerals)
{ {
const auto process_ephemerals_for_session = [&](auto & current_ephemerals) auto session_ephemerals = current_ephemerals.find(session_id);
if (session_ephemerals != current_ephemerals.end())
{ {
auto session_ephemerals = current_ephemerals.find(session_id); for (const auto & ephemeral_path : session_ephemerals->second)
if (session_ephemerals != current_ephemerals.end())
{ {
std::unordered_map<std::string_view, UpdateNodeStatDelta> parent_updates; auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false);
for (const auto & ephemeral_path : session_ephemerals->second)
/// maybe the node is deleted or recreated with different session_id in the uncommitted state
if (!node || node->stats.ephemeralOwner() != session_id)
continue;
auto parent_node_path = parentNodePath(ephemeral_path).toView();
auto parent_update_it = parent_updates.find(parent_node_path);
if (parent_update_it == parent_updates.end())
{ {
auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false); auto parent_node = uncommitted_state.getNode(StringRef{parent_node_path}, /*should_lock_storage=*/false);
std::tie(parent_update_it, std::ignore) = parent_updates.emplace(parent_node_path, *parent_node);
/// maybe the node is deleted or recreated with different session_id in the uncommitted state
if (!node || node->stats.ephemeralOwner() != session_id)
continue;
auto parent_node_path = parentNodePath(ephemeral_path).toView();
auto parent_update_it = parent_updates.find(parent_node_path);
if (parent_update_it == parent_updates.end())
{
auto parent_node = uncommitted_state.getNode(StringRef{parent_node_path}, /*should_lock_storage=*/false);
std::tie(parent_update_it, std::ignore) = parent_updates.emplace(parent_node_path, *parent_node);
}
auto & parent_update_delta = parent_update_it->second;
++parent_update_delta.new_stats.cversion;
parent_update_delta.new_stats.decreaseNumChildren();
new_deltas.emplace_back(
ephemeral_path,
transaction->zxid,
RemoveNodeDelta{.stat = node->stats, .acls = uncommitted_state.getACLs(ephemeral_path), .data = std::string{node->getData()}});
} }
for (auto & [parent_path, parent_update_delta] : parent_updates) auto & parent_update_delta = parent_update_it->second;
{ ++parent_update_delta.new_stats.cversion;
new_deltas.emplace_back parent_update_delta.new_stats.decreaseNumChildren();
(
std::string{parent_path}, new_deltas.emplace_back(
new_last_zxid, ephemeral_path,
std::move(parent_update_delta) transaction->zxid,
); RemoveNodeDelta{.stat = node->stats, .acls = uncommitted_state.getACLs(ephemeral_path), .data = std::string{node->getData()}});
}
} }
}; }
};
{
/// storage lock should always be taken before ephemeral lock /// storage lock should always be taken before ephemeral lock
std::shared_lock storage_lock(storage_mutex); std::shared_lock storage_lock(storage_mutex);
std::lock_guard ephemeral_lock(ephemeral_mutex); std::lock_guard ephemeral_lock(ephemeral_mutex);
@ -3028,6 +3047,16 @@ void KeeperStorage<Container>::preprocessRequest(
uncommitted_state.ephemerals.erase(session_id); uncommitted_state.ephemerals.erase(session_id);
} }
for (auto & [parent_path, parent_update_delta] : parent_updates)
{
new_deltas.emplace_back
(
parent_path,
new_last_zxid,
std::move(parent_update_delta)
);
}
new_deltas.emplace_back(transaction->zxid, CloseSessionDelta{session_id}); new_deltas.emplace_back(transaction->zxid, CloseSessionDelta{session_id});
new_digest = calculateNodesDigest(new_digest, new_deltas); new_digest = calculateNodesDigest(new_digest, new_deltas);
return; return;
@ -3132,7 +3161,7 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
{ {
if (std::holds_alternative<RemoveNodeDelta>(delta.operation)) if (std::holds_alternative<RemoveNodeDelta>(delta.operation))
{ {
auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); auto responses = processWatchesImpl(delta.path, watches, list_watches, sessions_and_watchers, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end()); results.insert(results.end(), responses.begin(), responses.end());
} }
} }
@ -3211,20 +3240,27 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
auto add_watch_result = watches_type[zk_request->getPath()].emplace(session_id); auto add_watch_result = watches_type[zk_request->getPath()].emplace(session_id);
if (add_watch_result.second) if (add_watch_result.second)
{
++total_watches_count;
sessions_and_watchers[session_id].emplace(zk_request->getPath()); sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
} }
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{ {
auto add_watch_result = watches[zk_request->getPath()].emplace(session_id); auto add_watch_result = watches[zk_request->getPath()].emplace(session_id);
if (add_watch_result.second) if (add_watch_result.second)
{
++total_watches_count;
sessions_and_watchers[session_id].emplace(zk_request->getPath()); sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
} }
} }
/// If this requests processed successfully we need to check watches /// If this requests processed successfully we need to check watches
if (response->error == Coordination::Error::ZOK) if (response->error == Coordination::Error::ZOK)
{ {
auto watch_responses = processWatches(concrete_zk_request, deltas_range, watches, list_watches); auto watch_responses = processWatches(concrete_zk_request, deltas_range, watches, list_watches, sessions_and_watchers);
total_watches_count -= watch_responses.size();
results.insert(results.end(), watch_responses.begin(), watch_responses.end()); results.insert(results.end(), watch_responses.begin(), watch_responses.end());
} }
@ -3409,33 +3445,34 @@ void KeeperStorage<Container>::clearDeadWatches(int64_t session_id)
{ {
/// Clear all watches for this session /// Clear all watches for this session
auto watches_it = sessions_and_watchers.find(session_id); auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end()) if (watches_it == sessions_and_watchers.end())
{ return;
for (const auto & watch_path : watches_it->second)
{
/// Maybe it's a normal watch
auto watch = watches.find(watch_path);
if (watch != watches.end())
{
auto & watches_for_path = watch->second;
watches_for_path.erase(session_id);
if (watches_for_path.empty())
watches.erase(watch);
}
/// Maybe it's a list watch for (const auto & watch_path : watches_it->second)
auto list_watch = list_watches.find(watch_path); {
if (list_watch != list_watches.end()) /// Maybe it's a normal watch
{ auto watch = watches.find(watch_path);
auto & list_watches_for_path = list_watch->second; if (watch != watches.end())
list_watches_for_path.erase(session_id); {
if (list_watches_for_path.empty()) auto & watches_for_path = watch->second;
list_watches.erase(list_watch); watches_for_path.erase(session_id);
} if (watches_for_path.empty())
watches.erase(watch);
} }
sessions_and_watchers.erase(watches_it); /// Maybe it's a list watch
auto list_watch = list_watches.find(watch_path);
if (list_watch != list_watches.end())
{
auto & list_watches_for_path = list_watch->second;
list_watches_for_path.erase(session_id);
if (list_watches_for_path.empty())
list_watches.erase(list_watch);
}
} }
total_watches_count -= watches_it->second.size();
sessions_and_watchers.erase(watches_it);
} }
template<typename Container> template<typename Container>
@ -3521,11 +3558,7 @@ const KeeperStorageBase::Stats & KeeperStorage<Container>::getStorageStats() con
template<typename Container> template<typename Container>
uint64_t KeeperStorage<Container>::getTotalWatchesCount() const uint64_t KeeperStorage<Container>::getTotalWatchesCount() const
{ {
uint64_t ret = 0; return total_watches_count;
for (const auto & [session, paths] : sessions_and_watchers)
ret += paths.size();
return ret;
} }
template<typename Container> template<typename Container>

View File

@ -636,6 +636,7 @@ public:
/// Mapping session_id -> set of watched nodes paths /// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers; SessionAndWatcher sessions_and_watchers;
size_t total_watches_count = 0;
/// Currently active watches (node_path -> subscribed sessions) /// Currently active watches (node_path -> subscribed sessions)
Watches watches; Watches watches;