Backport #70998 to 24.3: Fix named session leak. Simplify scheduling mechanism

This commit is contained in:
robot-clickhouse 2024-10-30 11:08:19 +00:00
parent f27180875f
commit 92bf7b9a9a
3 changed files with 72 additions and 53 deletions

View File

@ -44,9 +44,9 @@ using NamedSessionKey = std::pair<UUID, String>;
struct NamedSessionData
{
NamedSessionKey key;
UInt64 close_cycle = 0;
ContextMutablePtr context;
std::chrono::steady_clock::duration timeout;
std::chrono::steady_clock::time_point close_time_bucket{};
NamedSessionsStorage & parent;
NamedSessionData(NamedSessionKey key_, ContextPtr context_, std::chrono::steady_clock::duration timeout_, NamedSessionsStorage & parent_)
@ -131,6 +131,18 @@ public:
if (!session.unique())
throw Exception(ErrorCodes::SESSION_IS_LOCKED, "Session {} is locked by a concurrent client", session_id);
if (session->close_time_bucket != std::chrono::steady_clock::time_point{})
{
auto bucket_it = close_time_buckets.find(session->close_time_bucket);
auto & bucket_sessions = bucket_it->second;
bucket_sessions.erase(key);
if (bucket_sessions.empty())
close_time_buckets.erase(bucket_it);
session->close_time_bucket = std::chrono::steady_clock::time_point{};
}
return {session, false};
}
}
@ -174,33 +186,31 @@ private:
}
};
/// TODO it's very complicated. Make simple std::map with time_t or boost::multi_index.
using Container = std::unordered_map<Key, std::shared_ptr<NamedSessionData>, SessionKeyHash>;
using CloseTimes = std::deque<std::vector<Key>>;
Container sessions;
CloseTimes close_times;
std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1);
std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now();
UInt64 close_cycle = 0;
// Ordered map of close times for sessions, grouped by the next multiple of close_interval
using CloseTimes = std::map<std::chrono::steady_clock::time_point, std::unordered_set<Key, SessionKeyHash>>;
CloseTimes close_time_buckets;
constexpr static std::chrono::steady_clock::duration close_interval = std::chrono::milliseconds(1000);
constexpr static std::chrono::nanoseconds::rep close_interval_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(close_interval).count();
void scheduleCloseSession(NamedSessionData & session, std::unique_lock<std::mutex> &)
{
/// Push it on a queue of sessions to close, on a position corresponding to the timeout.
/// (timeout is measured from current moment of time)
chassert(session.close_time_bucket == std::chrono::steady_clock::time_point{});
const UInt64 close_index = session.timeout / close_interval + 1;
const auto new_close_cycle = close_cycle + close_index;
const auto session_close_time = std::chrono::steady_clock::now() + session.timeout;
const auto session_close_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(session_close_time.time_since_epoch()).count();
const auto bucket_padding = close_interval - std::chrono::nanoseconds(session_close_time_ns % close_interval_ns);
const auto close_time_bucket = session_close_time + bucket_padding;
if (session.close_cycle != new_close_cycle)
{
session.close_cycle = new_close_cycle;
if (close_times.size() < close_index + 1)
close_times.resize(close_index + 1);
close_times[close_index].emplace_back(session.key);
}
session.close_time_bucket = close_time_bucket;
auto & bucket_sessions = close_time_buckets[close_time_bucket];
bucket_sessions.insert(session.key);
LOG_TEST(log, "Schedule closing session with session_id: {}, user_id: {}",
session.key.second, session.key.first);
session.key.second, session.key.first);
}
void cleanThread()
@ -209,55 +219,46 @@ private:
std::unique_lock lock{mutex};
while (!quit)
{
auto interval = closeSessions(lock);
if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
closeSessions(lock);
if (cond.wait_for(lock, close_interval, [this]() -> bool { return quit; }))
break;
}
}
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
std::chrono::steady_clock::duration closeSessions(std::unique_lock<std::mutex> & lock)
void closeSessions(std::unique_lock<std::mutex> & lock)
{
const auto now = std::chrono::steady_clock::now();
/// The time to close the next session did not come
if (now < close_cycle_time)
return close_cycle_time - now; /// Will sleep until it comes.
const auto current_cycle = close_cycle;
++close_cycle;
close_cycle_time = now + close_interval;
if (close_times.empty())
return close_interval;
auto & sessions_to_close = close_times.front();
for (const auto & key : sessions_to_close)
for (auto bucket_it = close_time_buckets.begin(); bucket_it != close_time_buckets.end(); bucket_it = close_time_buckets.erase(bucket_it))
{
const auto session = sessions.find(key);
const auto & [time_bucket, session_keys] = *bucket_it;
if (time_bucket > now)
break;
if (session != sessions.end() && session->second->close_cycle <= current_cycle)
for (const auto & key : session_keys)
{
if (session->second.use_count() != 1)
{
LOG_TEST(log, "Delay closing session with session_id: {}, user_id: {}", key.second, key.first);
const auto & session_it = sessions.find(key);
/// Skip but move it to close on the next cycle.
session->second->timeout = std::chrono::steady_clock::duration{0};
scheduleCloseSession(*session->second, lock);
}
else
if (session_it == sessions.end())
continue;
const auto & session = session_it->second;
if (session.use_count() != 1)
{
LOG_TRACE(log, "Close session with session_id: {}, user_id: {}", key.second, key.first);
sessions.erase(session);
LOG_TEST(log, "Delay closing session with session_id: {}, user_id: {}, refcount: {}",
key.second, key.first, session.use_count());
session->timeout = std::chrono::steady_clock::duration{0};
scheduleCloseSession(*session, lock);
continue;
}
LOG_TRACE(log, "Close session with session_id: {}, user_id: {}", key.second, key.first);
sessions.erase(session_it);
}
}
close_times.pop_front();
return close_interval;
}
std::mutex mutex;

View File

@ -0,0 +1,3 @@
A session successfully closes when timeout first expires with refcount != 1
45
1

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
# Tags: long, no-parallel
# shellcheck disable=SC2015
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "A session successfully closes when timeout first expires with refcount != 1"
# Here we do not want an infinite loop - because we want this mechanism to be reliable in all cases
# So it's better to give it enough time to complete even in constrained environments
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_timeout=1" --data-binary "CREATE TEMPORARY TABLE x (n UInt64) AS SELECT number FROM numbers(10)"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_timeout=1" --data-binary "SELECT sum(n + sleep(3)) FROM x" # This query ensures timeout expires with refcount > 1
sleep 15
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_check=1" --data-binary "SELECT 1" | grep -c -F 'SESSION_NOT_FOUND'