fix watches logic

This commit is contained in:
Mikhail Artemenko 2024-09-06 09:33:47 +00:00
parent 231a7c97cc
commit 2bbe933531

View File

@ -1129,6 +1129,12 @@ struct KeeperStorageRequestProcessor
return {};
}
virtual KeeperStorageBase::ResponsesForSessions
processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const
{
return processWatches(watches, list_watches);
}
virtual bool checkAuth(Storage & /*storage*/, int64_t /*session_id*/, bool /*is_local*/) const { return true; }
virtual ~KeeperStorageRequestProcessor() = default;
@ -1635,10 +1641,23 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage
}
KeeperStorageBase::ResponsesForSessions
processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
processWatches(const Storage & storage, int64_t zxid, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
{
/// TODO(michicosun) rewrite
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
/// need to iterate over zxid deltas and update watches for deleted tree.
const auto & deltas = storage.uncommitted_state.deltas;
KeeperStorageBase::ResponsesForSessions responses;
for (auto it = deltas.rbegin(); it != deltas.rend() && it->zxid == zxid; ++it)
{
const auto * remove_delta = std::get_if<typename Storage::RemoveNodeDelta>(&it->operation);
if (remove_delta)
{
auto new_responses = processWatchesImpl(it->path, watches, list_watches, Coordination::Event::DELETED);
responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end()));
}
}
return responses;
}
private:
@ -2511,12 +2530,12 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
}
KeeperStorageBase::ResponsesForSessions
processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override
processWatches(const Storage & storage, int64_t zxid, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override
{
typename Storage::ResponsesForSessions result;
for (const auto & generic_request : concrete_requests)
{
auto responses = generic_request->processWatches(watches, list_watches);
auto responses = generic_request->processWatches(storage, zxid, watches, list_watches);
result.insert(result.end(), responses.begin(), responses.end());
}
return result;
@ -2980,7 +2999,7 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
/// If this requests processed successfully we need to check watches
if (response->error == Coordination::Error::ZOK)
{
auto watch_responses = request_processor->processWatches(watches, list_watches);
auto watch_responses = request_processor->processWatches(*this, zxid, watches, list_watches);
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}