Fix session timeout update

This commit is contained in:
alesapin 2021-03-06 17:14:38 +03:00
parent 00c0d75b60
commit a8b7469857
3 changed files with 7 additions and 5 deletions

View File

@ -632,6 +632,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
zxid = *new_last_zxid; zxid = *new_last_zxid;
} }
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) if (zk_request->getOpNum() == Coordination::OpNum::Close)
{ {
auto it = ephemerals.find(session_id); auto it = ephemerals.find(session_id);
@ -657,7 +658,6 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
} }
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat)
{ {
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request); NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id); auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
response->xid = zk_request->xid; response->xid = zk_request->xid;

View File

@ -348,8 +348,10 @@ void NuKeeperTCPHandler::runImpl()
else if (received_op == Coordination::OpNum::Heartbeat) else if (received_op == Coordination::OpNum::Heartbeat)
{ {
LOG_TRACE(log, "Received heartbeat for session #{}", session_id); LOG_TRACE(log, "Received heartbeat for session #{}", session_id);
session_stopwatch.restart();
} }
/// Each request restarts session stopwatch
session_stopwatch.restart();
} }
/// Process exact amount of responses from pipe /// Process exact amount of responses from pipe

View File

@ -63,10 +63,10 @@ def test_recover_from_snapshot(started_cluster):
node3.stop_clickhouse(kill=True) node3.stop_clickhouse(kill=True)
# at least we will have 2 snapshots # at least we will have 2 snapshots
for i in range(235): for i in range(435):
node1_zk.create("/test_snapshot_multinode_recover" + str(i), ("somedata" + str(i)).encode()) node1_zk.create("/test_snapshot_multinode_recover" + str(i), ("somedata" + str(i)).encode())
for i in range(235): for i in range(435):
if i % 10 == 0: if i % 10 == 0:
node1_zk.delete("/test_snapshot_multinode_recover" + str(i)) node1_zk.delete("/test_snapshot_multinode_recover" + str(i))
@ -91,7 +91,7 @@ def test_recover_from_snapshot(started_cluster):
assert node2_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata" assert node2_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
assert node3_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata" assert node3_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata"
for i in range(235): for i in range(435):
if i % 10 != 0: if i % 10 != 0:
assert node1_zk.get("/test_snapshot_multinode_recover" + str(i))[0] == ("somedata" + str(i)).encode() assert node1_zk.get("/test_snapshot_multinode_recover" + str(i))[0] == ("somedata" + str(i)).encode()
assert node2_zk.get("/test_snapshot_multinode_recover" + str(i))[0] == ("somedata" + str(i)).encode() assert node2_zk.get("/test_snapshot_multinode_recover" + str(i))[0] == ("somedata" + str(i)).encode()