diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index fd065bd1249..8b12b59e50b 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1552,14 +1552,37 @@ void KeeperStorage::preprocessRequest( current_nodes.current_zxid = new_last_zxid; KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); + if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special + { + auto & deltas = current_nodes.deltas; + auto session_ephemerals = ephemerals.find(session_id); + if (session_ephemerals != ephemerals.end()) + { + for (const auto & ephemeral_path : session_ephemerals->second) + { + if (current_nodes.hasNode(ephemeral_path)) + { + deltas.emplace_back(parentPath(ephemeral_path).toString(), new_last_zxid, UpdateNodeDelta{[ephemeral_path](Node & parent) + { + --parent.stat.numChildren; + ++parent.stat.cversion; + }}); + + deltas.emplace_back(ephemeral_path, new_last_zxid, RemoveNodeDelta()); + } + } + } + + return; + } + if (check_acl && !request_processor->checkAuth(*this, session_id, false)) { - current_nodes.deltas.push_back(Delta{new_last_zxid, Coordination::Error::ZNOAUTH}); + current_nodes.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); return; } auto new_deltas = request_processor->preprocess(*this, new_last_zxid, session_id, time); - current_nodes.deltas.insert( current_nodes.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end())); } @@ -1586,28 +1609,24 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { - auto session_ephemerals = ephemerals.find(session_id); - if (session_ephemerals != ephemerals.end()) + ephemerals.erase(session_id); + + commit(zxid, session_id); + + for (const auto & delta : current_nodes.deltas) { - for (const auto & ephemeral_path : session_ephemerals->second) + if (delta.zxid > zxid) + break; + + if (std::holds_alternative(delta.operation)) { - container.updateValue( - parentPath(ephemeral_path), - [&ephemeral_path](KeeperStorage::Node & parent) - { - --parent.stat.numChildren; - ++parent.stat.cversion; - auto base_name = getBaseName(ephemeral_path); - parent.removeChild(base_name); - }); - - container.erase(ephemeral_path); - - auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED); + auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED); results.insert(results.end(), responses.begin(), responses.end()); } - ephemerals.erase(session_ephemerals); } + + std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); + clearDeadWatches(session_id); auto auth_it = session_and_auth.find(session_id); if (auth_it != session_and_auth.end())