diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index bb6dd04eeec..24d95a89a4d 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1129,6 +1129,12 @@ struct KeeperStorageRequestProcessor return {}; } + virtual KeeperStorageBase::ResponsesForSessions + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const + { + return processWatches(watches, list_watches); + } + virtual bool checkAuth(Storage & /*storage*/, int64_t /*session_id*/, bool /*is_local*/) const { return true; } virtual ~KeeperStorageRequestProcessor() = default; @@ -1635,10 +1641,23 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage } KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + processWatches(const Storage & storage, int64_t zxid, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override { - /// TODO(michicosun) rewrite - return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); + /// need to iterate over zxid deltas and update watches for deleted tree. + const auto & deltas = storage.uncommitted_state.deltas; + + KeeperStorageBase::ResponsesForSessions responses; + for (auto it = deltas.rbegin(); it != deltas.rend() && it->zxid == zxid; ++it) + { + const auto * remove_delta = std::get_if(&it->operation); + if (remove_delta) + { + auto new_responses = processWatchesImpl(it->path, watches, list_watches, Coordination::Event::DELETED); + responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end())); + } + } + + return responses; } private: @@ -2511,12 +2530,12 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } KeeperStorageBase::ResponsesForSessions - processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override + processWatches(const Storage & storage, int64_t zxid, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override { typename Storage::ResponsesForSessions result; for (const auto & generic_request : concrete_requests) { - auto responses = generic_request->processWatches(watches, list_watches); + auto responses = generic_request->processWatches(storage, zxid, watches, list_watches); result.insert(result.end(), responses.begin(), responses.end()); } return result; @@ -2980,7 +2999,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process /// If this requests processed successfully we need to check watches if (response->error == Coordination::Error::ZOK) { - auto watch_responses = request_processor->processWatches(watches, list_watches); + auto watch_responses = request_processor->processWatches(*this, zxid, watches, list_watches); results.insert(results.end(), watch_responses.begin(), watch_responses.end()); }