Fix watches

This commit is contained in:
Antonio Andelic 2024-09-18 09:52:19 +02:00
parent 8db3dddb3d
commit 3106653852
2 changed files with 87 additions and 45 deletions

View File

@ -148,7 +148,8 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl(
watch_response->state = Coordination::State::CONNECTED; watch_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : watch_it->second) for (auto watcher_session : watch_it->second)
{ {
[[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path); [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(
KeeperStorageBase::WatchInfo{.path = path, .is_list_watch = false});
chassert(erased); chassert(erased);
result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_response}); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_response});
} }
@ -188,7 +189,8 @@ KeeperStorageBase::ResponsesForSessions processWatchesImpl(
watch_list_response->state = Coordination::State::CONNECTED; watch_list_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : watch_it->second) for (auto watcher_session : watch_it->second)
{ {
[[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(path_to_check); [[maybe_unused]] auto erased = sessions_and_watchers[watcher_session].erase(
KeeperStorageBase::WatchInfo{.path = path_to_check, .is_list_watch = true});
chassert(erased); chassert(erased);
result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_list_response}); result.push_back(KeeperStorageBase::ResponseForSession{watcher_session, watch_list_response});
} }
@ -828,18 +830,12 @@ void KeeperStorage<Container>::UncommittedState::rollback(std::list<Delta> rollb
if constexpr (std::same_as<DeltaType, CreateNodeDelta>) if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
{ {
if (operation.stat.ephemeralOwner != 0) if (operation.stat.ephemeralOwner != 0)
{
std::lock_guard lock(storage.ephemeral_mutex);
unregisterEphemeralPath(storage.uncommitted_state.ephemerals, operation.stat.ephemeralOwner, delta.path, /*throw_if_missing=*/false); unregisterEphemeralPath(storage.uncommitted_state.ephemerals, operation.stat.ephemeralOwner, delta.path, /*throw_if_missing=*/false);
}
} }
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>) else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
{ {
if (operation.stat.ephemeralOwner() != 0) if (operation.stat.ephemeralOwner() != 0)
{
std::lock_guard lock(storage.ephemeral_mutex);
storage.uncommitted_state.ephemerals[operation.stat.ephemeralOwner()].emplace(delta.path); storage.uncommitted_state.ephemerals[operation.stat.ephemeralOwner()].emplace(delta.path);
}
} }
}, },
delta.operation); delta.operation);
@ -1159,7 +1155,6 @@ bool KeeperStorage<Container>::createNode(
++committed_ephemeral_nodes; ++committed_ephemeral_nodes;
std::lock_guard lock(ephemeral_mutex); std::lock_guard lock(ephemeral_mutex);
committed_ephemerals[stat.ephemeralOwner].emplace(path); committed_ephemerals[stat.ephemeralOwner].emplace(path);
unregisterEphemeralPath(uncommitted_state.ephemerals, stat.ephemeralOwner, path, /*throw_if_missing=*/false);
} }
return true; return true;
@ -1454,10 +1449,7 @@ std::list<KeeperStorageBase::Delta> preprocess(
return {typename Storage::Delta{zxid, Coordination::Error::ZINVALIDACL}}; return {typename Storage::Delta{zxid, Coordination::Error::ZINVALIDACL}};
if (zk_request.is_ephemeral) if (zk_request.is_ephemeral)
{
std::lock_guard lock(storage.ephemeral_mutex);
storage.uncommitted_state.ephemerals[session_id].emplace(path_created); storage.uncommitted_state.ephemerals[session_id].emplace(path_created);
}
int32_t parent_cversion = zk_request.parent_cversion; int32_t parent_cversion = zk_request.parent_cversion;
@ -1721,7 +1713,6 @@ std::list<KeeperStorageBase::Delta> preprocess(
if (node->stats.isEphemeral()) if (node->stats.isEphemeral())
{ {
std::lock_guard ephemeral_lock(storage.ephemeral_mutex);
/// try deleting the ephemeral node from the uncommitted state /// try deleting the ephemeral node from the uncommitted state
unregisterEphemeralPath(storage.uncommitted_state.ephemerals, node->stats.ephemeralOwner(), zk_request.path, /*throw_if_missing=*/false); unregisterEphemeralPath(storage.uncommitted_state.ephemerals, node->stats.ephemeralOwner(), zk_request.path, /*throw_if_missing=*/false);
} }
@ -2027,7 +2018,6 @@ std::list<KeeperStorageBase::Delta> preprocess(
const auto * remove_delta = std::get_if<KeeperStorageBase::RemoveNodeDelta>(&delta.operation); const auto * remove_delta = std::get_if<KeeperStorageBase::RemoveNodeDelta>(&delta.operation);
if (remove_delta && remove_delta->stat.ephemeralOwner()) if (remove_delta && remove_delta->stat.ephemeralOwner())
{ {
std::lock_guard lock(storage.ephemeral_mutex);
unregisterEphemeralPath( unregisterEphemeralPath(
storage.uncommitted_state.ephemerals, remove_delta->stat.ephemeralOwner(), delta.path, /*throw_if_missing=*/false); storage.uncommitted_state.ephemerals, remove_delta->stat.ephemeralOwner(), delta.path, /*throw_if_missing=*/false);
} }
@ -2281,7 +2271,17 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList
const auto children = get_children(); const auto children = get_children();
response->names.reserve(children->size()); response->names.reserve(children->size());
chassert(static_cast<size_t>(node_it->value.stats.numChildren()) == children->size()); #ifdef DEBUG_OR_SANITIZER_BUILD
if (!zk_request.path.starts_with(keeper_system_path) && static_cast<size_t>(node_it->value.stats.numChildren()) != children->size())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Difference between numChildren ({}) and actual children size ({}) for '{}'",
node_it->value.stats.numChildren(),
children->size(),
zk_request.path);
}
#endif
const auto add_child = [&](const auto & child) const auto add_child = [&](const auto & child)
{ {
@ -3004,13 +3004,25 @@ void KeeperStorage<Container>::preprocessRequest(
StringHashForHeterogeneousLookup, StringHashForHeterogeneousLookup,
StringHashForHeterogeneousLookup::transparent_key_equal> StringHashForHeterogeneousLookup::transparent_key_equal>
parent_updates; parent_updates;
const auto process_ephemerals_for_session = [&](auto & current_ephemerals)
const auto process_ephemerals_for_session
= [&](const auto & current_ephemerals, auto & processed_ephemeral_nodes, bool check_processed_nodes)
{ {
auto session_ephemerals = current_ephemerals.find(session_id); auto session_ephemerals = current_ephemerals.find(session_id);
if (session_ephemerals != current_ephemerals.end()) if (session_ephemerals != current_ephemerals.end())
{ {
for (const auto & ephemeral_path : session_ephemerals->second) for (const auto & ephemeral_path : session_ephemerals->second)
{ {
if (check_processed_nodes)
{
if (processed_ephemeral_nodes.contains(ephemeral_path))
continue;
}
else
{
processed_ephemeral_nodes.insert(ephemeral_path);
}
auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false); auto node = uncommitted_state.getNode(ephemeral_path, /*should_lock_storage=*/false);
/// maybe the node is deleted or recreated with different session_id in the uncommitted state /// maybe the node is deleted or recreated with different session_id in the uncommitted state
@ -3041,12 +3053,16 @@ void KeeperStorage<Container>::preprocessRequest(
{ {
/// storage lock should always be taken before ephemeral lock /// storage lock should always be taken before ephemeral lock
std::shared_lock storage_lock(storage_mutex); std::shared_lock storage_lock(storage_mutex);
std::unordered_set<std::string_view> processed_ephemeral_nodes;
process_ephemerals_for_session(uncommitted_state.ephemerals, processed_ephemeral_nodes, /*check_processed_nodes=*/false);
std::lock_guard ephemeral_lock(ephemeral_mutex); std::lock_guard ephemeral_lock(ephemeral_mutex);
process_ephemerals_for_session(committed_ephemerals); process_ephemerals_for_session(committed_ephemerals, processed_ephemeral_nodes, /*check_processed_nodes=*/true);
process_ephemerals_for_session(uncommitted_state.ephemerals);
uncommitted_state.ephemerals.erase(session_id);
} }
uncommitted_state.ephemerals.erase(session_id);
for (auto & [parent_path, parent_update_delta] : parent_updates) for (auto & [parent_path, parent_update_delta] : parent_updates)
{ {
new_deltas.emplace_back new_deltas.emplace_back
@ -3234,24 +3250,27 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
static constexpr std::array list_requests{ static constexpr std::array list_requests{
Coordination::OpNum::List, Coordination::OpNum::SimpleList, Coordination::OpNum::FilteredList}; Coordination::OpNum::List, Coordination::OpNum::SimpleList, Coordination::OpNum::FilteredList};
auto & watches_type = std::find(list_requests.begin(), list_requests.end(), zk_request->getOpNum()) != list_requests.end() auto is_list_watch
? list_watches = std::find(list_requests.begin(), list_requests.end(), zk_request->getOpNum()) != list_requests.end();
: watches;
auto add_watch_result = watches_type[zk_request->getPath()].emplace(session_id); auto & watches_type = is_list_watch ? list_watches : watches;
if (add_watch_result.second)
auto [watch_it, path_inserted] = watches_type.try_emplace(zk_request->getPath());
auto [path_it, session_inserted] = watch_it->second.emplace(session_id);
if (session_inserted)
{ {
++total_watches_count; ++total_watches_count;
sessions_and_watchers[session_id].emplace(zk_request->getPath()); sessions_and_watchers[session_id].emplace(WatchInfo{.path = watch_it->first, .is_list_watch = is_list_watch});
} }
} }
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists) else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{ {
auto add_watch_result = watches[zk_request->getPath()].emplace(session_id); auto [watch_it, path_inserted] = watches.try_emplace(zk_request->getPath());
if (add_watch_result.second) auto session_insert_info = watch_it->second.emplace(session_id);
if (session_insert_info.second)
{ {
++total_watches_count; ++total_watches_count;
sessions_and_watchers[session_id].emplace(zk_request->getPath()); sessions_and_watchers[session_id].emplace(WatchInfo{.path = watch_it->first, .is_list_watch = false});
} }
} }
} }
@ -3448,27 +3467,26 @@ void KeeperStorage<Container>::clearDeadWatches(int64_t session_id)
if (watches_it == sessions_and_watchers.end()) if (watches_it == sessions_and_watchers.end())
return; return;
for (const auto & watch_path : watches_it->second) for (const auto [watch_path, is_list_watch] : watches_it->second)
{ {
/// Maybe it's a normal watch if (is_list_watch)
auto watch = watches.find(watch_path);
if (watch != watches.end())
{
auto & watches_for_path = watch->second;
watches_for_path.erase(session_id);
if (watches_for_path.empty())
watches.erase(watch);
}
/// Maybe it's a list watch
auto list_watch = list_watches.find(watch_path);
if (list_watch != list_watches.end())
{ {
auto list_watch = list_watches.find(watch_path);
chassert(list_watch != list_watches.end());
auto & list_watches_for_path = list_watch->second; auto & list_watches_for_path = list_watch->second;
list_watches_for_path.erase(session_id); list_watches_for_path.erase(session_id);
if (list_watches_for_path.empty()) if (list_watches_for_path.empty())
list_watches.erase(list_watch); list_watches.erase(list_watch);
} }
else
{
auto watch = watches.find(watch_path);
chassert(watch != watches.end());
auto & watches_for_path = watch->second;
watches_for_path.erase(session_id);
if (watches_for_path.empty())
watches.erase(watch);
}
} }
total_watches_count -= watches_it->second.size(); total_watches_count -= watches_it->second.size();
@ -3481,7 +3499,7 @@ void KeeperStorage<Container>::dumpWatches(WriteBufferFromOwnString & buf) const
for (const auto & [session_id, watches_paths] : sessions_and_watchers) for (const auto & [session_id, watches_paths] : sessions_and_watchers)
{ {
buf << "0x" << getHexUIntLowercase(session_id) << "\n"; buf << "0x" << getHexUIntLowercase(session_id) << "\n";
for (const String & path : watches_paths) for (const auto [path, is_list_watch] : watches_paths)
buf << "\t" << path << "\n"; buf << "\t" << path << "\n";
} }
} }

View File

@ -6,6 +6,7 @@
#include <Coordination/ACLMap.h> #include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h> #include <Coordination/SessionExpiryQueue.h>
#include <Coordination/SnapshotableHashTable.h> #include <Coordination/SnapshotableHashTable.h>
#include "Common/StringHashForHeterogeneousLookup.h"
#include <Common/SharedMutex.h> #include <Common/SharedMutex.h>
#include <Common/Concepts.h> #include <Common/Concepts.h>
@ -314,13 +315,36 @@ public:
}; };
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>; using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>; struct WatchInfo
{
std::string_view path;
bool is_list_watch;
bool operator==(const WatchInfo &) const = default;
};
struct WatchInfoHash
{
auto operator()(WatchInfo info) const
{
SipHash hash;
hash.update(info.path);
hash.update(info.is_list_watch);
return hash.get64();
}
};
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<WatchInfo, WatchInfoHash>>;
using SessionIDs = std::unordered_set<int64_t>; using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password /// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>; using AuthIDs = std::vector<AuthID>;
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>; using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>; using Watches = std::unordered_map<
String /* path, relative of root_path */,
SessionIDs,
StringHashForHeterogeneousLookup,
StringHashForHeterogeneousLookup::transparent_key_equal>;
// Applying ZooKeeper request to storage consists of two steps: // Applying ZooKeeper request to storage consists of two steps:
// - preprocessing which, instead of applying the changes directly to storage, // - preprocessing which, instead of applying the changes directly to storage,