Define close session

This commit is contained in:
Antonio Andelic 2022-05-09 09:35:16 +00:00
parent 9796527890
commit 285bb21b91

View File

@ -1552,14 +1552,37 @@ void KeeperStorage::preprocessRequest(
current_nodes.current_zxid = new_last_zxid;
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
auto & deltas = current_nodes.deltas;
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
{
for (const auto & ephemeral_path : session_ephemerals->second)
{
if (current_nodes.hasNode(ephemeral_path))
{
deltas.emplace_back(parentPath(ephemeral_path).toString(), new_last_zxid, UpdateNodeDelta{[ephemeral_path](Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
}});
deltas.emplace_back(ephemeral_path, new_last_zxid, RemoveNodeDelta());
}
}
}
return;
}
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
{
current_nodes.deltas.push_back(Delta{new_last_zxid, Coordination::Error::ZNOAUTH});
current_nodes.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
return;
}
auto new_deltas = request_processor->preprocess(*this, new_last_zxid, session_id, time);
current_nodes.deltas.insert(
current_nodes.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
}
@ -1586,28 +1609,24 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
ephemerals.erase(session_id);
commit(zxid, session_id);
for (const auto & delta : current_nodes.deltas)
{
for (const auto & ephemeral_path : session_ephemerals->second)
if (delta.zxid > zxid)
break;
if (std::holds_alternative<RemoveNodeDelta>(delta.operation))
{
container.updateValue(
parentPath(ephemeral_path),
[&ephemeral_path](KeeperStorage::Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
auto base_name = getBaseName(ephemeral_path);
parent.removeChild(base_name);
});
container.erase(ephemeral_path);
auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end());
}
ephemerals.erase(session_ephemerals);
}
std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
clearDeadWatches(session_id);
auto auth_it = session_and_auth.find(session_id);
if (auth_it != session_and_auth.end())