From 4c613f30b327c4093f7c70905a6cd0e3ff9a0f56 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 2 Sep 2021 14:40:54 +0300 Subject: [PATCH 1/6] Simplier sessions expiration in Keeper --- src/Coordination/KeeperStorage.cpp | 4 +- src/Coordination/KeeperStorage.h | 4 +- src/Coordination/SessionExpiryQueue.cpp | 65 ++++++++++++------- src/Coordination/SessionExpiryQueue.h | 29 +++++++-- ...t_for_build.cpp => gtest_coordination.cpp} | 17 +++++ 5 files changed, 86 insertions(+), 33 deletions(-) rename src/Coordination/tests/{gtest_for_build.cpp => gtest_coordination.cpp} (98%) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 3053ce17ad1..207ab25ddd2 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1078,7 +1078,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina zxid = *new_last_zxid; } - session_expiry_queue.update(session_id, session_and_timeout[session_id]); if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { @@ -1115,6 +1114,9 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina } else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special { + /// Update session only for heartbeats + session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); + KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); auto [response, _] = storage_request->process(*this, zxid, session_id); response->xid = zk_request->xid; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 1e925a0634e..041dab05156 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -123,7 +123,7 @@ public: { auto result = session_id_counter++; session_and_timeout.emplace(result, session_timeout_ms); - session_expiry_queue.update(result, session_timeout_ms); + session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms); return result; } @@ -131,7 +131,7 @@ public: void addSessionID(int64_t session_id, int64_t session_timeout_ms) { session_and_timeout.emplace(session_id, session_timeout_ms); - session_expiry_queue.update(session_id, session_timeout_ms); + session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms); } /// Process user request and return response. diff --git a/src/Coordination/SessionExpiryQueue.cpp b/src/Coordination/SessionExpiryQueue.cpp index 51837087af5..e19a92c29d7 100644 --- a/src/Coordination/SessionExpiryQueue.cpp +++ b/src/Coordination/SessionExpiryQueue.cpp @@ -1,5 +1,6 @@ #include #include + namespace DB { @@ -12,66 +13,80 @@ bool SessionExpiryQueue::remove(int64_t session_id) if (set_it != expiry_to_sessions.end()) set_it->second.erase(session_id); + /// No more sessions in this bucket + if (set_it->second.empty()) + expiry_to_sessions.erase(set_it); + + session_to_timeout.erase(session_it); + return true; } return false; } -bool SessionExpiryQueue::update(int64_t session_id, int64_t timeout_ms) +void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms) { - auto session_it = session_to_timeout.find(session_id); int64_t now = getNowMilliseconds(); + /// round up to next interval int64_t new_expiry_time = roundToNextInterval(now + timeout_ms); + auto session_it = session_to_timeout.find(session_id); + /// We already registered this session if (session_it != session_to_timeout.end()) { - if (new_expiry_time == session_it->second) - return false; + int64_t prev_expiry_time = session_it->second; + session_it->second = new_expiry_time; + /// Nothing changed, session stay in the some bucket + if (new_expiry_time == prev_expiry_time) + return; + /// This bucket doesn't exist, let's create it auto set_it = expiry_to_sessions.find(new_expiry_time); if (set_it == expiry_to_sessions.end()) std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set()); + /// Add session to the next bucket set_it->second.insert(session_id); - int64_t prev_expiry_time = session_it->second; - if (prev_expiry_time != new_expiry_time) - { - auto prev_set_it = expiry_to_sessions.find(prev_expiry_time); - if (prev_set_it != expiry_to_sessions.end()) - prev_set_it->second.erase(session_id); - } - session_it->second = new_expiry_time; - return true; + auto prev_set_it = expiry_to_sessions.find(prev_expiry_time); + /// Remove session from previous bucket + if (prev_set_it != expiry_to_sessions.end()) + prev_set_it->second.erase(session_id); + + /// No more sessions in this bucket + if (prev_set_it->second.empty()) + expiry_to_sessions.erase(prev_set_it); } else { + /// Just add sessions to the new bucket session_to_timeout[session_id] = new_expiry_time; + auto set_it = expiry_to_sessions.find(new_expiry_time); if (set_it == expiry_to_sessions.end()) std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set()); + set_it->second.insert(session_id); - return false; } } -std::unordered_set SessionExpiryQueue::getExpiredSessions() +std::unordered_set SessionExpiryQueue::getExpiredSessions() const { int64_t now = getNowMilliseconds(); - if (now < next_expiration_time) - return {}; + std::unordered_set result; - auto set_it = expiry_to_sessions.find(next_expiration_time); - int64_t new_expiration_time = next_expiration_time + expiration_interval; - next_expiration_time = new_expiration_time; - if (set_it != expiry_to_sessions.end()) + /// Check all buckets + for (auto it = expiry_to_sessions.begin(); it != expiry_to_sessions.end(); ++it) { - auto result = set_it->second; - expiry_to_sessions.erase(set_it); - return result; + int64_t expire_time_for_sessions = it->first; + if (expire_time_for_sessions <= now) + result.insert(it->second.begin(), it->second.end()); + else + break; } - return {}; + + return result; } void SessionExpiryQueue::clear() diff --git a/src/Coordination/SessionExpiryQueue.h b/src/Coordination/SessionExpiryQueue.h index dff629a2432..8270c8fc6c8 100644 --- a/src/Coordination/SessionExpiryQueue.h +++ b/src/Coordination/SessionExpiryQueue.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -6,14 +7,25 @@ namespace DB { +/// Simple class for checking expired sessions. Main idea -- to round sessions +/// timeouts and place all sessions into buckets rounded by their expired time. +/// So we will have not too many different buckets and can check expired +/// sessions quite fast. +/// So buckets looks like this: +/// [1630580418000] -> {1, 5, 6} +/// [1630580418500] -> {2, 3} +/// ... +/// When new session appear it's added to the existing bucket or create new bucket. class SessionExpiryQueue { private: + /// Session -> timeout ms std::unordered_map session_to_timeout; - std::unordered_map> expiry_to_sessions; + + /// Expire time -> session expire near this time + std::map> expiry_to_sessions; int64_t expiration_interval; - int64_t next_expiration_time; static int64_t getNowMilliseconds() { @@ -21,23 +33,30 @@ private: return duration_cast(system_clock::now().time_since_epoch()).count(); } + /// Round time to the next expiration interval. The result used as a key for + /// expiry_to_sessions map. int64_t roundToNextInterval(int64_t time) const { return (time / expiration_interval + 1) * expiration_interval; } public: + /// expiration_interval -- how often we will check new sessions and how small + /// buckets we will have. In ZooKeeper normal session timeout is around 30 seconds + /// and expiration_interval is about 500ms. explicit SessionExpiryQueue(int64_t expiration_interval_) : expiration_interval(expiration_interval_) - , next_expiration_time(roundToNextInterval(getNowMilliseconds())) { } + /// Session was actually removed bool remove(int64_t session_id); - bool update(int64_t session_id, int64_t timeout_ms); + /// Update session expiry time (must be called on hearbeats) + void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms); - std::unordered_set getExpiredSessions(); + /// Get all expired sessions + std::unordered_set getExpiredSessions() const; void clear(); }; diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_coordination.cpp similarity index 98% rename from src/Coordination/tests/gtest_for_build.cpp rename to src/Coordination/tests/gtest_coordination.cpp index 47eadbf9720..2c1cddd124b 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1374,6 +1374,23 @@ TEST(CoordinationTest, TestRotateIntervalChanges) EXPECT_TRUE(fs::exists("./logs/changelog_141_145.bin")); } +TEST(CoordinationTest, TestSessionExpiryQueue) +{ + using namespace Coordination; + SessionExpiryQueue queue(500); + + queue.addNewSessionOrUpdate(1, 1000); + + for (size_t i = 0; i < 2; ++i) + { + EXPECT_EQ(queue.getExpiredSessions(), std::unordered_set({})); + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(700)); + EXPECT_EQ(queue.getExpiredSessions(), std::unordered_set({1})); +} + int main(int argc, char ** argv) { From c5470864e86adbd3b5c2fc6db1dc884e87a6f170 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 2 Sep 2021 14:43:34 +0300 Subject: [PATCH 2/6] Fixup --- src/Coordination/KeeperStorage.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 207ab25ddd2..7ad7fa6c779 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1078,6 +1078,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina zxid = *new_last_zxid; } + session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { @@ -1114,9 +1115,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina } else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special { - /// Update session only for heartbeats - session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); - KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); auto [response, _] = storage_request->process(*this, zxid, session_id); response->xid = zk_request->xid; From 010985ce87f58d644c9a701bb0df327b4aa3f66a Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 2 Sep 2021 14:54:32 +0300 Subject: [PATCH 3/6] Add comment --- src/Coordination/KeeperStorage.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 7ad7fa6c779..1e5a74ef5eb 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1078,6 +1078,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina zxid = *new_last_zxid; } + /// ZooKeeper update sessions expirity for each request, not only for heartbeats session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special From 3a3c3acd18402a75b4435ab68c44037fcfec1314 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 2 Sep 2021 23:19:44 +0300 Subject: [PATCH 4/6] Update src/Coordination/SessionExpiryQueue.h Co-authored-by: tavplubix --- src/Coordination/SessionExpiryQueue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Coordination/SessionExpiryQueue.h b/src/Coordination/SessionExpiryQueue.h index 8270c8fc6c8..d8925449f2a 100644 --- a/src/Coordination/SessionExpiryQueue.h +++ b/src/Coordination/SessionExpiryQueue.h @@ -56,7 +56,7 @@ public: void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms); /// Get all expired sessions - std::unordered_set getExpiredSessions() const; + std::vector getExpiredSessions() const; void clear(); }; From a94932983025a174d44e10b285634fa0b38ff14d Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 2 Sep 2021 23:37:34 +0300 Subject: [PATCH 5/6] Review fixes --- src/Coordination/KeeperServer.cpp | 2 +- src/Coordination/KeeperServer.h | 2 +- src/Coordination/KeeperStateMachine.cpp | 2 +- src/Coordination/KeeperStateMachine.h | 2 +- src/Coordination/KeeperStorage.h | 2 +- src/Coordination/SessionExpiryQueue.cpp | 25 +++++++++---------- src/Coordination/SessionExpiryQueue.h | 3 ++- src/Coordination/tests/gtest_coordination.cpp | 4 +-- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index b6b0ab8cb72..9caea5354bf 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -357,7 +357,7 @@ void KeeperServer::waitInit() throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization"); } -std::unordered_set KeeperServer::getDeadSessions() +std::vector KeeperServer::getDeadSessions() { return state_machine->getDeadSessions(); } diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index d1138ccef1a..eb1f8437cc9 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -71,7 +71,7 @@ public: RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); /// Return set of the non-active sessions - std::unordered_set getDeadSessions(); + std::vector getDeadSessions(); bool isLeader() const; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index ffbac0656b9..9ef3c7b32f5 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -304,7 +304,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi responses_queue.push(response); } -std::unordered_set KeeperStateMachine::getDeadSessions() +std::vector KeeperStateMachine::getDeadSessions() { std::lock_guard lock(storage_and_responses_lock); return storage->getDeadSessions(); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 32beaaf69e6..0e032e29670 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -71,7 +71,7 @@ public: /// Process local read request void processReadRequest(const KeeperStorage::RequestForSession & request_for_session); - std::unordered_set getDeadSessions(); + std::vector getDeadSessions(); void shutdownStorage(); diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 041dab05156..bc9a81bc484 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -172,7 +172,7 @@ public: } /// Get all dead sessions - std::unordered_set getDeadSessions() + std::vector getDeadSessions() { return session_expiry_queue.getExpiredSessions(); } diff --git a/src/Coordination/SessionExpiryQueue.cpp b/src/Coordination/SessionExpiryQueue.cpp index e19a92c29d7..90c17bf904f 100644 --- a/src/Coordination/SessionExpiryQueue.cpp +++ b/src/Coordination/SessionExpiryQueue.cpp @@ -6,8 +6,8 @@ namespace DB bool SessionExpiryQueue::remove(int64_t session_id) { - auto session_it = session_to_timeout.find(session_id); - if (session_it != session_to_timeout.end()) + auto session_it = session_to_expiration_time.find(session_id); + if (session_it != session_to_expiration_time.end()) { auto set_it = expiry_to_sessions.find(session_it->second); if (set_it != expiry_to_sessions.end()) @@ -17,7 +17,7 @@ bool SessionExpiryQueue::remove(int64_t session_id) if (set_it->second.empty()) expiry_to_sessions.erase(set_it); - session_to_timeout.erase(session_it); + session_to_expiration_time.erase(session_it); return true; } @@ -31,9 +31,9 @@ void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeo /// round up to next interval int64_t new_expiry_time = roundToNextInterval(now + timeout_ms); - auto session_it = session_to_timeout.find(session_id); + auto session_it = session_to_expiration_time.find(session_id); /// We already registered this session - if (session_it != session_to_timeout.end()) + if (session_it != session_to_expiration_time.end()) { int64_t prev_expiry_time = session_it->second; session_it->second = new_expiry_time; @@ -61,7 +61,7 @@ void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeo else { /// Just add sessions to the new bucket - session_to_timeout[session_id] = new_expiry_time; + session_to_expiration_time[session_id] = new_expiry_time; auto set_it = expiry_to_sessions.find(new_expiry_time); if (set_it == expiry_to_sessions.end()) @@ -71,17 +71,16 @@ void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeo } } -std::unordered_set SessionExpiryQueue::getExpiredSessions() const +std::vector SessionExpiryQueue::getExpiredSessions() const { int64_t now = getNowMilliseconds(); - std::unordered_set result; + std::vector result; /// Check all buckets - for (auto it = expiry_to_sessions.begin(); it != expiry_to_sessions.end(); ++it) + for (auto & [expire_time, expired_sessions] : expiry_to_sessions) { - int64_t expire_time_for_sessions = it->first; - if (expire_time_for_sessions <= now) - result.insert(it->second.begin(), it->second.end()); + if (expire_time <= now) + result.insert(result.end(), expired_sessions.begin(), expired_sessions.end()); else break; } @@ -91,7 +90,7 @@ std::unordered_set SessionExpiryQueue::getExpiredSessions() const void SessionExpiryQueue::clear() { - session_to_timeout.clear(); + session_to_expiration_time.clear(); expiry_to_sessions.clear(); } diff --git a/src/Coordination/SessionExpiryQueue.h b/src/Coordination/SessionExpiryQueue.h index d8925449f2a..8581800834d 100644 --- a/src/Coordination/SessionExpiryQueue.h +++ b/src/Coordination/SessionExpiryQueue.h @@ -2,6 +2,7 @@ #include #include #include +#include #include namespace DB @@ -20,7 +21,7 @@ class SessionExpiryQueue { private: /// Session -> timeout ms - std::unordered_map session_to_timeout; + std::unordered_map session_to_expiration_time; /// Expire time -> session expire near this time std::map> expiry_to_sessions; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 2c1cddd124b..0dfede5927d 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1383,12 +1383,12 @@ TEST(CoordinationTest, TestSessionExpiryQueue) for (size_t i = 0; i < 2; ++i) { - EXPECT_EQ(queue.getExpiredSessions(), std::unordered_set({})); + EXPECT_EQ(queue.getExpiredSessions(), std::vector({})); std::this_thread::sleep_for(std::chrono::milliseconds(400)); } std::this_thread::sleep_for(std::chrono::milliseconds(700)); - EXPECT_EQ(queue.getExpiredSessions(), std::unordered_set({1})); + EXPECT_EQ(queue.getExpiredSessions(), std::vector({1})); } From 9281c4786b8daaadb0ca57b5059a227be48ba2ad Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 3 Sep 2021 10:10:19 +0300 Subject: [PATCH 6/6] Fix queue test --- src/Coordination/SessionExpiryQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Coordination/SessionExpiryQueue.cpp b/src/Coordination/SessionExpiryQueue.cpp index 90c17bf904f..b6d3843f1d7 100644 --- a/src/Coordination/SessionExpiryQueue.cpp +++ b/src/Coordination/SessionExpiryQueue.cpp @@ -77,7 +77,7 @@ std::vector SessionExpiryQueue::getExpiredSessions() const std::vector result; /// Check all buckets - for (auto & [expire_time, expired_sessions] : expiry_to_sessions) + for (const auto & [expire_time, expired_sessions] : expiry_to_sessions) { if (expire_time <= now) result.insert(result.end(), expired_sessions.begin(), expired_sessions.end());