From 92bf7b9a9ab146fe63646dd8d96e2c24f68772b4 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 30 Oct 2024 11:08:19 +0000 Subject: [PATCH] Backport #70998 to 24.3: Fix named session leak. Simplify scheduling mechanism --- src/Interpreters/Session.cpp | 107 +++++++++--------- ..._expire_in_use_in_http_interface.reference | 3 + ...session_expire_in_use_in_http_interface.sh | 15 +++ 3 files changed, 72 insertions(+), 53 deletions(-) create mode 100644 tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.reference create mode 100755 tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.sh diff --git a/src/Interpreters/Session.cpp b/src/Interpreters/Session.cpp index 9f64380ab43..05cb1e1d779 100644 --- a/src/Interpreters/Session.cpp +++ b/src/Interpreters/Session.cpp @@ -44,9 +44,9 @@ using NamedSessionKey = std::pair; struct NamedSessionData { NamedSessionKey key; - UInt64 close_cycle = 0; ContextMutablePtr context; std::chrono::steady_clock::duration timeout; + std::chrono::steady_clock::time_point close_time_bucket{}; NamedSessionsStorage & parent; NamedSessionData(NamedSessionKey key_, ContextPtr context_, std::chrono::steady_clock::duration timeout_, NamedSessionsStorage & parent_) @@ -131,6 +131,18 @@ public: if (!session.unique()) throw Exception(ErrorCodes::SESSION_IS_LOCKED, "Session {} is locked by a concurrent client", session_id); + + if (session->close_time_bucket != std::chrono::steady_clock::time_point{}) + { + auto bucket_it = close_time_buckets.find(session->close_time_bucket); + auto & bucket_sessions = bucket_it->second; + bucket_sessions.erase(key); + if (bucket_sessions.empty()) + close_time_buckets.erase(bucket_it); + + session->close_time_bucket = std::chrono::steady_clock::time_point{}; + } + return {session, false}; } } @@ -174,33 +186,31 @@ private: } }; - /// TODO it's very complicated. Make simple std::map with time_t or boost::multi_index. using Container = std::unordered_map, SessionKeyHash>; - using CloseTimes = std::deque>; Container sessions; - CloseTimes close_times; - std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1); - std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now(); - UInt64 close_cycle = 0; + + // Ordered map of close times for sessions, grouped by the next multiple of close_interval + using CloseTimes = std::map>; + CloseTimes close_time_buckets; + + constexpr static std::chrono::steady_clock::duration close_interval = std::chrono::milliseconds(1000); + constexpr static std::chrono::nanoseconds::rep close_interval_ns = std::chrono::duration_cast(close_interval).count(); void scheduleCloseSession(NamedSessionData & session, std::unique_lock &) { - /// Push it on a queue of sessions to close, on a position corresponding to the timeout. - /// (timeout is measured from current moment of time) + chassert(session.close_time_bucket == std::chrono::steady_clock::time_point{}); - const UInt64 close_index = session.timeout / close_interval + 1; - const auto new_close_cycle = close_cycle + close_index; + const auto session_close_time = std::chrono::steady_clock::now() + session.timeout; + const auto session_close_time_ns = std::chrono::duration_cast(session_close_time.time_since_epoch()).count(); + const auto bucket_padding = close_interval - std::chrono::nanoseconds(session_close_time_ns % close_interval_ns); + const auto close_time_bucket = session_close_time + bucket_padding; - if (session.close_cycle != new_close_cycle) - { - session.close_cycle = new_close_cycle; - if (close_times.size() < close_index + 1) - close_times.resize(close_index + 1); - close_times[close_index].emplace_back(session.key); - } + session.close_time_bucket = close_time_bucket; + auto & bucket_sessions = close_time_buckets[close_time_bucket]; + bucket_sessions.insert(session.key); LOG_TEST(log, "Schedule closing session with session_id: {}, user_id: {}", - session.key.second, session.key.first); + session.key.second, session.key.first); } void cleanThread() @@ -209,55 +219,46 @@ private: std::unique_lock lock{mutex}; while (!quit) { - auto interval = closeSessions(lock); - if (cond.wait_for(lock, interval, [this]() -> bool { return quit; })) + closeSessions(lock); + if (cond.wait_for(lock, close_interval, [this]() -> bool { return quit; })) break; } } - /// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added. - std::chrono::steady_clock::duration closeSessions(std::unique_lock & lock) + void closeSessions(std::unique_lock & lock) { const auto now = std::chrono::steady_clock::now(); - /// The time to close the next session did not come - if (now < close_cycle_time) - return close_cycle_time - now; /// Will sleep until it comes. - - const auto current_cycle = close_cycle; - - ++close_cycle; - close_cycle_time = now + close_interval; - - if (close_times.empty()) - return close_interval; - - auto & sessions_to_close = close_times.front(); - - for (const auto & key : sessions_to_close) + for (auto bucket_it = close_time_buckets.begin(); bucket_it != close_time_buckets.end(); bucket_it = close_time_buckets.erase(bucket_it)) { - const auto session = sessions.find(key); + const auto & [time_bucket, session_keys] = *bucket_it; + if (time_bucket > now) + break; - if (session != sessions.end() && session->second->close_cycle <= current_cycle) + for (const auto & key : session_keys) { - if (session->second.use_count() != 1) - { - LOG_TEST(log, "Delay closing session with session_id: {}, user_id: {}", key.second, key.first); + const auto & session_it = sessions.find(key); - /// Skip but move it to close on the next cycle. - session->second->timeout = std::chrono::steady_clock::duration{0}; - scheduleCloseSession(*session->second, lock); - } - else + if (session_it == sessions.end()) + continue; + + const auto & session = session_it->second; + + if (session.use_count() != 1) { - LOG_TRACE(log, "Close session with session_id: {}, user_id: {}", key.second, key.first); - sessions.erase(session); + LOG_TEST(log, "Delay closing session with session_id: {}, user_id: {}, refcount: {}", + key.second, key.first, session.use_count()); + + session->timeout = std::chrono::steady_clock::duration{0}; + scheduleCloseSession(*session, lock); + continue; } + + LOG_TRACE(log, "Close session with session_id: {}, user_id: {}", key.second, key.first); + + sessions.erase(session_it); } } - - close_times.pop_front(); - return close_interval; } std::mutex mutex; diff --git a/tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.reference b/tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.reference new file mode 100644 index 00000000000..02a9f40656d --- /dev/null +++ b/tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.reference @@ -0,0 +1,3 @@ +A session successfully closes when timeout first expires with refcount != 1 +45 +1 diff --git a/tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.sh b/tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.sh new file mode 100755 index 00000000000..f1782cd645b --- /dev/null +++ b/tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +# Tags: long, no-parallel +# shellcheck disable=SC2015 + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +echo "A session successfully closes when timeout first expires with refcount != 1" +# Here we do not want an infinite loop - because we want this mechanism to be reliable in all cases +# So it's better to give it enough time to complete even in constrained environments +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_timeout=1" --data-binary "CREATE TEMPORARY TABLE x (n UInt64) AS SELECT number FROM numbers(10)" +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_timeout=1" --data-binary "SELECT sum(n + sleep(3)) FROM x" # This query ensures timeout expires with refcount > 1 +sleep 15 +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_check=1" --data-binary "SELECT 1" | grep -c -F 'SESSION_NOT_FOUND'