diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index b6b0ab8cb72..9caea5354bf 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -357,7 +357,7 @@ void KeeperServer::waitInit() throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization"); } -std::unordered_set KeeperServer::getDeadSessions() +std::vector KeeperServer::getDeadSessions() { return state_machine->getDeadSessions(); } diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index d1138ccef1a..eb1f8437cc9 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -71,7 +71,7 @@ public: RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); /// Return set of the non-active sessions - std::unordered_set getDeadSessions(); + std::vector getDeadSessions(); bool isLeader() const; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index ffbac0656b9..9ef3c7b32f5 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -304,7 +304,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi responses_queue.push(response); } -std::unordered_set KeeperStateMachine::getDeadSessions() +std::vector KeeperStateMachine::getDeadSessions() { std::lock_guard lock(storage_and_responses_lock); return storage->getDeadSessions(); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 32beaaf69e6..0e032e29670 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -71,7 +71,7 @@ public: /// Process local read request void processReadRequest(const KeeperStorage::RequestForSession & request_for_session); - std::unordered_set getDeadSessions(); + std::vector getDeadSessions(); void shutdownStorage(); diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 3053ce17ad1..1e5a74ef5eb 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1078,7 +1078,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina zxid = *new_last_zxid; } - session_expiry_queue.update(session_id, session_and_timeout[session_id]); + /// ZooKeeper update sessions expirity for each request, not only for heartbeats + session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]); if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special { diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 1e925a0634e..bc9a81bc484 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -123,7 +123,7 @@ public: { auto result = session_id_counter++; session_and_timeout.emplace(result, session_timeout_ms); - session_expiry_queue.update(result, session_timeout_ms); + session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms); return result; } @@ -131,7 +131,7 @@ public: void addSessionID(int64_t session_id, int64_t session_timeout_ms) { session_and_timeout.emplace(session_id, session_timeout_ms); - session_expiry_queue.update(session_id, session_timeout_ms); + session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms); } /// Process user request and return response. @@ -172,7 +172,7 @@ public: } /// Get all dead sessions - std::unordered_set getDeadSessions() + std::vector getDeadSessions() { return session_expiry_queue.getExpiredSessions(); } diff --git a/src/Coordination/SessionExpiryQueue.cpp b/src/Coordination/SessionExpiryQueue.cpp index 51837087af5..b6d3843f1d7 100644 --- a/src/Coordination/SessionExpiryQueue.cpp +++ b/src/Coordination/SessionExpiryQueue.cpp @@ -1,82 +1,96 @@ #include #include + namespace DB { bool SessionExpiryQueue::remove(int64_t session_id) { - auto session_it = session_to_timeout.find(session_id); - if (session_it != session_to_timeout.end()) + auto session_it = session_to_expiration_time.find(session_id); + if (session_it != session_to_expiration_time.end()) { auto set_it = expiry_to_sessions.find(session_it->second); if (set_it != expiry_to_sessions.end()) set_it->second.erase(session_id); + /// No more sessions in this bucket + if (set_it->second.empty()) + expiry_to_sessions.erase(set_it); + + session_to_expiration_time.erase(session_it); + return true; } return false; } -bool SessionExpiryQueue::update(int64_t session_id, int64_t timeout_ms) +void SessionExpiryQueue::addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms) { - auto session_it = session_to_timeout.find(session_id); int64_t now = getNowMilliseconds(); + /// round up to next interval int64_t new_expiry_time = roundToNextInterval(now + timeout_ms); - if (session_it != session_to_timeout.end()) + auto session_it = session_to_expiration_time.find(session_id); + /// We already registered this session + if (session_it != session_to_expiration_time.end()) { - if (new_expiry_time == session_it->second) - return false; + int64_t prev_expiry_time = session_it->second; + session_it->second = new_expiry_time; + /// Nothing changed, session stay in the some bucket + if (new_expiry_time == prev_expiry_time) + return; + /// This bucket doesn't exist, let's create it auto set_it = expiry_to_sessions.find(new_expiry_time); if (set_it == expiry_to_sessions.end()) std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set()); + /// Add session to the next bucket set_it->second.insert(session_id); - int64_t prev_expiry_time = session_it->second; - if (prev_expiry_time != new_expiry_time) - { - auto prev_set_it = expiry_to_sessions.find(prev_expiry_time); - if (prev_set_it != expiry_to_sessions.end()) - prev_set_it->second.erase(session_id); - } - session_it->second = new_expiry_time; - return true; + auto prev_set_it = expiry_to_sessions.find(prev_expiry_time); + /// Remove session from previous bucket + if (prev_set_it != expiry_to_sessions.end()) + prev_set_it->second.erase(session_id); + + /// No more sessions in this bucket + if (prev_set_it->second.empty()) + expiry_to_sessions.erase(prev_set_it); } else { - session_to_timeout[session_id] = new_expiry_time; + /// Just add sessions to the new bucket + session_to_expiration_time[session_id] = new_expiry_time; + auto set_it = expiry_to_sessions.find(new_expiry_time); if (set_it == expiry_to_sessions.end()) std::tie(set_it, std::ignore) = expiry_to_sessions.emplace(new_expiry_time, std::unordered_set()); + set_it->second.insert(session_id); - return false; } } -std::unordered_set SessionExpiryQueue::getExpiredSessions() +std::vector SessionExpiryQueue::getExpiredSessions() const { int64_t now = getNowMilliseconds(); - if (now < next_expiration_time) - return {}; + std::vector result; - auto set_it = expiry_to_sessions.find(next_expiration_time); - int64_t new_expiration_time = next_expiration_time + expiration_interval; - next_expiration_time = new_expiration_time; - if (set_it != expiry_to_sessions.end()) + /// Check all buckets + for (const auto & [expire_time, expired_sessions] : expiry_to_sessions) { - auto result = set_it->second; - expiry_to_sessions.erase(set_it); - return result; + if (expire_time <= now) + result.insert(result.end(), expired_sessions.begin(), expired_sessions.end()); + else + break; } - return {}; + + return result; } void SessionExpiryQueue::clear() { - session_to_timeout.clear(); + session_to_expiration_time.clear(); expiry_to_sessions.clear(); } diff --git a/src/Coordination/SessionExpiryQueue.h b/src/Coordination/SessionExpiryQueue.h index dff629a2432..8581800834d 100644 --- a/src/Coordination/SessionExpiryQueue.h +++ b/src/Coordination/SessionExpiryQueue.h @@ -1,19 +1,32 @@ #pragma once +#include #include #include +#include #include namespace DB { +/// Simple class for checking expired sessions. Main idea -- to round sessions +/// timeouts and place all sessions into buckets rounded by their expired time. +/// So we will have not too many different buckets and can check expired +/// sessions quite fast. +/// So buckets looks like this: +/// [1630580418000] -> {1, 5, 6} +/// [1630580418500] -> {2, 3} +/// ... +/// When new session appear it's added to the existing bucket or create new bucket. class SessionExpiryQueue { private: - std::unordered_map session_to_timeout; - std::unordered_map> expiry_to_sessions; + /// Session -> timeout ms + std::unordered_map session_to_expiration_time; + + /// Expire time -> session expire near this time + std::map> expiry_to_sessions; int64_t expiration_interval; - int64_t next_expiration_time; static int64_t getNowMilliseconds() { @@ -21,23 +34,30 @@ private: return duration_cast(system_clock::now().time_since_epoch()).count(); } + /// Round time to the next expiration interval. The result used as a key for + /// expiry_to_sessions map. int64_t roundToNextInterval(int64_t time) const { return (time / expiration_interval + 1) * expiration_interval; } public: + /// expiration_interval -- how often we will check new sessions and how small + /// buckets we will have. In ZooKeeper normal session timeout is around 30 seconds + /// and expiration_interval is about 500ms. explicit SessionExpiryQueue(int64_t expiration_interval_) : expiration_interval(expiration_interval_) - , next_expiration_time(roundToNextInterval(getNowMilliseconds())) { } + /// Session was actually removed bool remove(int64_t session_id); - bool update(int64_t session_id, int64_t timeout_ms); + /// Update session expiry time (must be called on hearbeats) + void addNewSessionOrUpdate(int64_t session_id, int64_t timeout_ms); - std::unordered_set getExpiredSessions(); + /// Get all expired sessions + std::vector getExpiredSessions() const; void clear(); }; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 078d0743492..f12646ccb74 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1318,6 +1318,7 @@ TEST(CoordinationTest, TestRotateIntervalChanges) } } + EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin")); DB::KeeperLogStore changelog_1("./logs", 10, true); @@ -1347,6 +1348,7 @@ TEST(CoordinationTest, TestRotateIntervalChanges) } changelog_2.compact(105); + EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin")); @@ -1375,6 +1377,23 @@ TEST(CoordinationTest, TestRotateIntervalChanges) EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin")); } +TEST(CoordinationTest, TestSessionExpiryQueue) +{ + using namespace Coordination; + SessionExpiryQueue queue(500); + + queue.addNewSessionOrUpdate(1, 1000); + + for (size_t i = 0; i < 2; ++i) + { + EXPECT_EQ(queue.getExpiredSessions(), std::vector({})); + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(700)); + EXPECT_EQ(queue.getExpiredSessions(), std::vector({1})); +} + int main(int argc, char ** argv) {