From a8b74698573e448d60a7323753555102ef51ef20 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 6 Mar 2021 17:14:38 +0300 Subject: [PATCH] Fix session timeout update --- src/Coordination/NuKeeperStorage.cpp | 2 +- src/Server/NuKeeperTCPHandler.cpp | 4 +++- .../test_testkeeper_restore_from_snapshot/test.py | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Coordination/NuKeeperStorage.cpp b/src/Coordination/NuKeeperStorage.cpp index 2efb1030994..fff44163b71 100644 --- a/src/Coordination/NuKeeperStorage.cpp +++ b/src/Coordination/NuKeeperStorage.cpp @@ -632,6 +632,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor zxid = *new_last_zxid; } + session_expiry_queue.update(session_id, session_and_timeout[session_id]); if (zk_request->getOpNum() == Coordination::OpNum::Close) { auto it = ephemerals.find(session_id); @@ -657,7 +658,6 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor } else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) { - session_expiry_queue.update(session_id, session_and_timeout[session_id]); NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request); auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id); response->xid = zk_request->xid; diff --git a/src/Server/NuKeeperTCPHandler.cpp b/src/Server/NuKeeperTCPHandler.cpp index cc8f8f2ff2d..66c620ae65e 100644 --- a/src/Server/NuKeeperTCPHandler.cpp +++ b/src/Server/NuKeeperTCPHandler.cpp @@ -348,8 +348,10 @@ void NuKeeperTCPHandler::runImpl() else if (received_op == Coordination::OpNum::Heartbeat) { LOG_TRACE(log, "Received heartbeat for session #{}", session_id); - session_stopwatch.restart(); } + + /// Each request restarts session stopwatch + session_stopwatch.restart(); } /// Process exact amount of responses from pipe diff --git a/tests/integration/test_testkeeper_restore_from_snapshot/test.py b/tests/integration/test_testkeeper_restore_from_snapshot/test.py index e77f8d18b7d..d399200b163 100644 --- a/tests/integration/test_testkeeper_restore_from_snapshot/test.py +++ b/tests/integration/test_testkeeper_restore_from_snapshot/test.py @@ -63,10 +63,10 @@ def test_recover_from_snapshot(started_cluster): node3.stop_clickhouse(kill=True) # at least we will have 2 snapshots - for i in range(235): + for i in range(435): node1_zk.create("/test_snapshot_multinode_recover" + str(i), ("somedata" + str(i)).encode()) - for i in range(235): + for i in range(435): if i % 10 == 0: node1_zk.delete("/test_snapshot_multinode_recover" + str(i)) @@ -91,7 +91,7 @@ def test_recover_from_snapshot(started_cluster): assert node2_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata" assert node3_zk.get("/test_snapshot_multinode_recover")[0] == b"somedata" - for i in range(235): + for i in range(435): if i % 10 != 0: assert node1_zk.get("/test_snapshot_multinode_recover" + str(i))[0] == ("somedata" + str(i)).encode() assert node2_zk.get("/test_snapshot_multinode_recover" + str(i))[0] == ("somedata" + str(i)).encode()