Simplier sessions expiration in Keeper

This commit is contained in:
alesapin 2021-09-02 14:40:54 +03:00
parent b592400e54
commit 4c613f30b3
5 changed files with 86 additions and 33 deletions

View File

@ -1078,7 +1078,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
zxid = *new_last_zxid;
}
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
@ -1115,6 +1114,9 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
{
/// Update session only for heartbeats
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(*this, zxid, session_id);
response->xid = zk_request->xid;

View File

@ -123,7 +123,7 @@ public:
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.update(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
@ -131,7 +131,7 @@ public:
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.update(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
/// Process user request and return response.

View File

@ -1,5 +1,6 @@
#include <Coordination/SessionExpiryQueue.h>
#include <common/logger_useful.h>
namespace DB
{
@ -12,66 +13,80 @@ bool SessionExpiryQueue::remove(int64_t session_id)
if (set_it != expiry_to_sessions.end())
set_it->second.erase(session_id);
/// No more sessions in this bucket
if (set_it->second.empty())
expiry_to_sessions.erase(set_it);
session_to_timeout.erase(session_it);
return true;
}
return false;
}
bool SessionExpiryQueue::update(int64_t session_id, int64_t timeout_ms)
void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms)
{
auto session_it = session_to_timeout.find(session_id);
int64_t now = getNowMilliseconds();
/// round up to next interval
int64_t new_expiry_time = roundToNextInterval(now + timeout_ms);
auto session_it = session_to_timeout.find(session_id);
/// We already registered this session
if (session_it != session_to_timeout.end())
{
if (new_expiry_time == session_it->second)
return false;
int64_t prev_expiry_time = session_it->second;
session_it->second = new_expiry_time;
/// Nothing changed, session stay in the some bucket
if (new_expiry_time == prev_expiry_time)
return;
/// This bucket doesn't exist, let's create it
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
/// Add session to the next bucket
set_it->second.insert(session_id);
int64_t prev_expiry_time = session_it->second;
if (prev_expiry_time != new_expiry_time)
{
auto prev_set_it = expiry_to_sessions.find(prev_expiry_time);
/// Remove session from previous bucket
if (prev_set_it != expiry_to_sessions.end())
prev_set_it->second.erase(session_id);
}
session_it->second = new_expiry_time;
return true;
/// No more sessions in this bucket
if (prev_set_it->second.empty())
expiry_to_sessions.erase(prev_set_it);
}
else
{
/// Just add sessions to the new bucket
session_to_timeout[session_id] = new_expiry_time;
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
set_it->second.insert(session_id);
return false;
}
}
std::unordered_set<int64_t> SessionExpiryQueue::getExpiredSessions()
std::unordered_set<int64_t> SessionExpiryQueue::getExpiredSessions() const
{
int64_t now = getNowMilliseconds();
if (now < next_expiration_time)
return {};
std::unordered_set<int64_t> result;
auto set_it = expiry_to_sessions.find(next_expiration_time);
int64_t new_expiration_time = next_expiration_time + expiration_interval;
next_expiration_time = new_expiration_time;
if (set_it != expiry_to_sessions.end())
/// Check all buckets
for (auto it = expiry_to_sessions.begin(); it != expiry_to_sessions.end(); ++it)
{
auto result = set_it->second;
expiry_to_sessions.erase(set_it);
return result;
int64_t expire_time_for_sessions = it->first;
if (expire_time_for_sessions <= now)
result.insert(it->second.begin(), it->second.end());
else
break;
}
return {};
return result;
}
void SessionExpiryQueue::clear()

View File

@ -1,4 +1,5 @@
#pragma once
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <chrono>
@ -6,14 +7,25 @@
namespace DB
{
/// Simple class for checking expired sessions. Main idea -- to round sessions
/// timeouts and place all sessions into buckets rounded by their expired time.
/// So we will have not too many different buckets and can check expired
/// sessions quite fast.
/// So buckets looks like this:
/// [1630580418000] -> {1, 5, 6}
/// [1630580418500] -> {2, 3}
/// ...
/// When new session appear it's added to the existing bucket or create new bucket.
class SessionExpiryQueue
{
private:
/// Session -> timeout ms
std::unordered_map<int64_t, int64_t> session_to_timeout;
std::unordered_map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;
/// Expire time -> session expire near this time
std::map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;
int64_t expiration_interval;
int64_t next_expiration_time;
static int64_t getNowMilliseconds()
{
@ -21,23 +33,30 @@ private:
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
/// Round time to the next expiration interval. The result used as a key for
/// expiry_to_sessions map.
int64_t roundToNextInterval(int64_t time) const
{
return (time / expiration_interval + 1) * expiration_interval;
}
public:
/// expiration_interval -- how often we will check new sessions and how small
/// buckets we will have. In ZooKeeper normal session timeout is around 30 seconds
/// and expiration_interval is about 500ms.
explicit SessionExpiryQueue(int64_t expiration_interval_)
: expiration_interval(expiration_interval_)
, next_expiration_time(roundToNextInterval(getNowMilliseconds()))
{
}
/// Session was actually removed
bool remove(int64_t session_id);
bool update(int64_t session_id, int64_t timeout_ms);
/// Update session expiry time (must be called on hearbeats)
void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms);
std::unordered_set<int64_t> getExpiredSessions();
/// Get all expired sessions
std::unordered_set<int64_t> getExpiredSessions() const;
void clear();
};

View File

@ -1374,6 +1374,23 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
EXPECT_TRUE(fs::exists("./logs/changelog_141_145.bin"));
}
TEST(CoordinationTest, TestSessionExpiryQueue)
{
using namespace Coordination;
SessionExpiryQueue queue(500);
queue.addNewSessionOrUpdate(1, 1000);
for (size_t i = 0; i < 2; ++i)
{
EXPECT_EQ(queue.getExpiredSessions(), std::unordered_set<int64_t>({}));
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::this_thread::sleep_for(std::chrono::milliseconds(700));
EXPECT_EQ(queue.getExpiredSessions(), std::unordered_set<int64_t>({1}));
}
int main(int argc, char ** argv)
{