Merge pull request #28519 from ClickHouse/better_session_expiration

Simplier sessions expiration in Keeper and add comments
This commit is contained in:
alesapin 2021-09-04 15:44:55 +03:00 committed by GitHub
commit aa58f22207
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 44 deletions

View File

@ -357,7 +357,7 @@ void KeeperServer::waitInit()
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
}
std::unordered_set<int64_t> KeeperServer::getDeadSessions()
std::vector<int64_t> KeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}

View File

@ -71,7 +71,7 @@ public:
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
/// Return set of the non-active sessions
std::unordered_set<int64_t> getDeadSessions();
std::vector<int64_t> getDeadSessions();
bool isLeader() const;

View File

@ -304,7 +304,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
responses_queue.push(response);
}
std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions()
std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_and_responses_lock);
return storage->getDeadSessions();

View File

@ -71,7 +71,7 @@ public:
/// Process local read request
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
std::unordered_set<int64_t> getDeadSessions();
std::vector<int64_t> getDeadSessions();
void shutdownStorage();

View File

@ -1078,7 +1078,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
zxid = *new_last_zxid;
}
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{

View File

@ -123,7 +123,7 @@ public:
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.update(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
@ -131,7 +131,7 @@ public:
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.update(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
/// Process user request and return response.
@ -172,7 +172,7 @@ public:
}
/// Get all dead sessions
std::unordered_set<int64_t> getDeadSessions()
std::vector<int64_t> getDeadSessions()
{
return session_expiry_queue.getExpiredSessions();
}

View File

@ -1,82 +1,96 @@
#include <Coordination/SessionExpiryQueue.h>
#include <common/logger_useful.h>
namespace DB
{
bool SessionExpiryQueue::remove(int64_t session_id)
{
auto session_it = session_to_timeout.find(session_id);
if (session_it != session_to_timeout.end())
auto session_it = session_to_expiration_time.find(session_id);
if (session_it != session_to_expiration_time.end())
{
auto set_it = expiry_to_sessions.find(session_it->second);
if (set_it != expiry_to_sessions.end())
set_it->second.erase(session_id);
/// No more sessions in this bucket
if (set_it->second.empty())
expiry_to_sessions.erase(set_it);
session_to_expiration_time.erase(session_it);
return true;
}
return false;
}
bool SessionExpiryQueue::update(int64_t session_id, int64_t timeout_ms)
void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms)
{
auto session_it = session_to_timeout.find(session_id);
int64_t now = getNowMilliseconds();
/// round up to next interval
int64_t new_expiry_time = roundToNextInterval(now + timeout_ms);
if (session_it != session_to_timeout.end())
auto session_it = session_to_expiration_time.find(session_id);
/// We already registered this session
if (session_it != session_to_expiration_time.end())
{
if (new_expiry_time == session_it->second)
return false;
int64_t prev_expiry_time = session_it->second;
session_it->second = new_expiry_time;
/// Nothing changed, session stay in the some bucket
if (new_expiry_time == prev_expiry_time)
return;
/// This bucket doesn't exist, let's create it
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
/// Add session to the next bucket
set_it->second.insert(session_id);
int64_t prev_expiry_time = session_it->second;
if (prev_expiry_time != new_expiry_time)
{
auto prev_set_it = expiry_to_sessions.find(prev_expiry_time);
if (prev_set_it != expiry_to_sessions.end())
prev_set_it->second.erase(session_id);
}
session_it->second = new_expiry_time;
return true;
auto prev_set_it = expiry_to_sessions.find(prev_expiry_time);
/// Remove session from previous bucket
if (prev_set_it != expiry_to_sessions.end())
prev_set_it->second.erase(session_id);
/// No more sessions in this bucket
if (prev_set_it->second.empty())
expiry_to_sessions.erase(prev_set_it);
}
else
{
session_to_timeout[session_id] = new_expiry_time;
/// Just add sessions to the new bucket
session_to_expiration_time[session_id] = new_expiry_time;
auto set_it = expiry_to_sessions.find(new_expiry_time);
if (set_it == expiry_to_sessions.end())
std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set<int64_t>());
set_it->second.insert(session_id);
return false;
}
}
std::unordered_set<int64_t> SessionExpiryQueue::getExpiredSessions()
std::vector<int64_t> SessionExpiryQueue::getExpiredSessions() const
{
int64_t now = getNowMilliseconds();
if (now < next_expiration_time)
return {};
std::vector<int64_t> result;
auto set_it = expiry_to_sessions.find(next_expiration_time);
int64_t new_expiration_time = next_expiration_time + expiration_interval;
next_expiration_time = new_expiration_time;
if (set_it != expiry_to_sessions.end())
/// Check all buckets
for (const auto & [expire_time, expired_sessions] : expiry_to_sessions)
{
auto result = set_it->second;
expiry_to_sessions.erase(set_it);
return result;
if (expire_time <= now)
result.insert(result.end(), expired_sessions.begin(), expired_sessions.end());
else
break;
}
return {};
return result;
}
void SessionExpiryQueue::clear()
{
session_to_timeout.clear();
session_to_expiration_time.clear();
expiry_to_sessions.clear();
}

View File

@ -1,19 +1,32 @@
#pragma once
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <chrono>
namespace DB
{
/// Simple class for checking expired sessions. Main idea -- to round sessions
/// timeouts and place all sessions into buckets rounded by their expired time.
/// So we will have not too many different buckets and can check expired
/// sessions quite fast.
/// So buckets looks like this:
/// [1630580418000] -> {1, 5, 6}
/// [1630580418500] -> {2, 3}
/// ...
/// When new session appear it's added to the existing bucket or create new bucket.
class SessionExpiryQueue
{
private:
std::unordered_map<int64_t, int64_t> session_to_timeout;
std::unordered_map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;
/// Session -> timeout ms
std::unordered_map<int64_t, int64_t> session_to_expiration_time;
/// Expire time -> session expire near this time
std::map<int64_t, std::unordered_set<int64_t>> expiry_to_sessions;
int64_t expiration_interval;
int64_t next_expiration_time;
static int64_t getNowMilliseconds()
{
@ -21,23 +34,30 @@ private:
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
/// Round time to the next expiration interval. The result used as a key for
/// expiry_to_sessions map.
int64_t roundToNextInterval(int64_t time) const
{
return (time / expiration_interval + 1) * expiration_interval;
}
public:
/// expiration_interval -- how often we will check new sessions and how small
/// buckets we will have. In ZooKeeper normal session timeout is around 30 seconds
/// and expiration_interval is about 500ms.
explicit SessionExpiryQueue(int64_t expiration_interval_)
: expiration_interval(expiration_interval_)
, next_expiration_time(roundToNextInterval(getNowMilliseconds()))
{
}
/// Session was actually removed
bool remove(int64_t session_id);
bool update(int64_t session_id, int64_t timeout_ms);
/// Update session expiry time (must be called on hearbeats)
void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms);
std::unordered_set<int64_t> getExpiredSessions();
/// Get all expired sessions
std::vector<int64_t> getExpiredSessions() const;
void clear();
};

View File

@ -1318,6 +1318,7 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin"));
DB::KeeperLogStore changelog_1("./logs", 10, true);
@ -1347,6 +1348,7 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
changelog_2.compact(105);
EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin"));
@ -1375,6 +1377,23 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin"));
}
TEST(CoordinationTest, TestSessionExpiryQueue)
{
using namespace Coordination;
SessionExpiryQueue queue(500);
queue.addNewSessionOrUpdate(1, 1000);
for (size_t i = 0; i < 2; ++i)
{
EXPECT_EQ(queue.getExpiredSessions(), std::vector<int64_t>({}));
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::this_thread::sleep_for(std::chrono::milliseconds(700));
EXPECT_EQ(queue.getExpiredSessions(), std::vector<int64_t>({1}));
}
int main(int argc, char ** argv)
{