mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #71246 from ClickHouse/backport/24.3/70998
Backport #70998 to 24.3: Fix named session leak. Simplify scheduling mechanism
This commit is contained in:
commit
02dc726823
@ -44,9 +44,9 @@ using NamedSessionKey = std::pair<UUID, String>;
|
||||
struct NamedSessionData
|
||||
{
|
||||
NamedSessionKey key;
|
||||
UInt64 close_cycle = 0;
|
||||
ContextMutablePtr context;
|
||||
std::chrono::steady_clock::duration timeout;
|
||||
std::chrono::steady_clock::time_point close_time_bucket{};
|
||||
NamedSessionsStorage & parent;
|
||||
|
||||
NamedSessionData(NamedSessionKey key_, ContextPtr context_, std::chrono::steady_clock::duration timeout_, NamedSessionsStorage & parent_)
|
||||
@ -131,6 +131,18 @@ public:
|
||||
|
||||
if (!session.unique())
|
||||
throw Exception(ErrorCodes::SESSION_IS_LOCKED, "Session {} is locked by a concurrent client", session_id);
|
||||
|
||||
if (session->close_time_bucket != std::chrono::steady_clock::time_point{})
|
||||
{
|
||||
auto bucket_it = close_time_buckets.find(session->close_time_bucket);
|
||||
auto & bucket_sessions = bucket_it->second;
|
||||
bucket_sessions.erase(key);
|
||||
if (bucket_sessions.empty())
|
||||
close_time_buckets.erase(bucket_it);
|
||||
|
||||
session->close_time_bucket = std::chrono::steady_clock::time_point{};
|
||||
}
|
||||
|
||||
return {session, false};
|
||||
}
|
||||
}
|
||||
@ -174,33 +186,31 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
/// TODO it's very complicated. Make simple std::map with time_t or boost::multi_index.
|
||||
using Container = std::unordered_map<Key, std::shared_ptr<NamedSessionData>, SessionKeyHash>;
|
||||
using CloseTimes = std::deque<std::vector<Key>>;
|
||||
Container sessions;
|
||||
CloseTimes close_times;
|
||||
std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1);
|
||||
std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now();
|
||||
UInt64 close_cycle = 0;
|
||||
|
||||
// Ordered map of close times for sessions, grouped by the next multiple of close_interval
|
||||
using CloseTimes = std::map<std::chrono::steady_clock::time_point, std::unordered_set<Key, SessionKeyHash>>;
|
||||
CloseTimes close_time_buckets;
|
||||
|
||||
constexpr static std::chrono::steady_clock::duration close_interval = std::chrono::milliseconds(1000);
|
||||
constexpr static std::chrono::nanoseconds::rep close_interval_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(close_interval).count();
|
||||
|
||||
void scheduleCloseSession(NamedSessionData & session, std::unique_lock<std::mutex> &)
|
||||
{
|
||||
/// Push it on a queue of sessions to close, on a position corresponding to the timeout.
|
||||
/// (timeout is measured from current moment of time)
|
||||
chassert(session.close_time_bucket == std::chrono::steady_clock::time_point{});
|
||||
|
||||
const UInt64 close_index = session.timeout / close_interval + 1;
|
||||
const auto new_close_cycle = close_cycle + close_index;
|
||||
const auto session_close_time = std::chrono::steady_clock::now() + session.timeout;
|
||||
const auto session_close_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(session_close_time.time_since_epoch()).count();
|
||||
const auto bucket_padding = close_interval - std::chrono::nanoseconds(session_close_time_ns % close_interval_ns);
|
||||
const auto close_time_bucket = session_close_time + bucket_padding;
|
||||
|
||||
if (session.close_cycle != new_close_cycle)
|
||||
{
|
||||
session.close_cycle = new_close_cycle;
|
||||
if (close_times.size() < close_index + 1)
|
||||
close_times.resize(close_index + 1);
|
||||
close_times[close_index].emplace_back(session.key);
|
||||
}
|
||||
session.close_time_bucket = close_time_bucket;
|
||||
auto & bucket_sessions = close_time_buckets[close_time_bucket];
|
||||
bucket_sessions.insert(session.key);
|
||||
|
||||
LOG_TEST(log, "Schedule closing session with session_id: {}, user_id: {}",
|
||||
session.key.second, session.key.first);
|
||||
session.key.second, session.key.first);
|
||||
}
|
||||
|
||||
void cleanThread()
|
||||
@ -209,55 +219,46 @@ private:
|
||||
std::unique_lock lock{mutex};
|
||||
while (!quit)
|
||||
{
|
||||
auto interval = closeSessions(lock);
|
||||
if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
|
||||
closeSessions(lock);
|
||||
if (cond.wait_for(lock, close_interval, [this]() -> bool { return quit; }))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
|
||||
std::chrono::steady_clock::duration closeSessions(std::unique_lock<std::mutex> & lock)
|
||||
void closeSessions(std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
|
||||
/// The time to close the next session did not come
|
||||
if (now < close_cycle_time)
|
||||
return close_cycle_time - now; /// Will sleep until it comes.
|
||||
|
||||
const auto current_cycle = close_cycle;
|
||||
|
||||
++close_cycle;
|
||||
close_cycle_time = now + close_interval;
|
||||
|
||||
if (close_times.empty())
|
||||
return close_interval;
|
||||
|
||||
auto & sessions_to_close = close_times.front();
|
||||
|
||||
for (const auto & key : sessions_to_close)
|
||||
for (auto bucket_it = close_time_buckets.begin(); bucket_it != close_time_buckets.end(); bucket_it = close_time_buckets.erase(bucket_it))
|
||||
{
|
||||
const auto session = sessions.find(key);
|
||||
const auto & [time_bucket, session_keys] = *bucket_it;
|
||||
if (time_bucket > now)
|
||||
break;
|
||||
|
||||
if (session != sessions.end() && session->second->close_cycle <= current_cycle)
|
||||
for (const auto & key : session_keys)
|
||||
{
|
||||
if (session->second.use_count() != 1)
|
||||
{
|
||||
LOG_TEST(log, "Delay closing session with session_id: {}, user_id: {}", key.second, key.first);
|
||||
const auto & session_it = sessions.find(key);
|
||||
|
||||
/// Skip but move it to close on the next cycle.
|
||||
session->second->timeout = std::chrono::steady_clock::duration{0};
|
||||
scheduleCloseSession(*session->second, lock);
|
||||
}
|
||||
else
|
||||
if (session_it == sessions.end())
|
||||
continue;
|
||||
|
||||
const auto & session = session_it->second;
|
||||
|
||||
if (session.use_count() != 1)
|
||||
{
|
||||
LOG_TRACE(log, "Close session with session_id: {}, user_id: {}", key.second, key.first);
|
||||
sessions.erase(session);
|
||||
LOG_TEST(log, "Delay closing session with session_id: {}, user_id: {}, refcount: {}",
|
||||
key.second, key.first, session.use_count());
|
||||
|
||||
session->timeout = std::chrono::steady_clock::duration{0};
|
||||
scheduleCloseSession(*session, lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Close session with session_id: {}, user_id: {}", key.second, key.first);
|
||||
|
||||
sessions.erase(session_it);
|
||||
}
|
||||
}
|
||||
|
||||
close_times.pop_front();
|
||||
return close_interval;
|
||||
}
|
||||
|
||||
std::mutex mutex;
|
||||
|
@ -0,0 +1,3 @@
|
||||
A session successfully closes when timeout first expires with refcount != 1
|
||||
45
|
||||
1
|
15
tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.sh
Executable file
15
tests/queries/0_stateless/03254_session_expire_in_use_in_http_interface.sh
Executable file
@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, no-parallel
|
||||
# shellcheck disable=SC2015
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
echo "A session successfully closes when timeout first expires with refcount != 1"
|
||||
# Here we do not want an infinite loop - because we want this mechanism to be reliable in all cases
|
||||
# So it's better to give it enough time to complete even in constrained environments
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_timeout=1" --data-binary "CREATE TEMPORARY TABLE x (n UInt64) AS SELECT number FROM numbers(10)"
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_timeout=1" --data-binary "SELECT sum(n + sleep(3)) FROM x" # This query ensures timeout expires with refcount > 1
|
||||
sleep 15
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${CLICKHOUSE_DATABASE}_10&session_check=1" --data-binary "SELECT 1" | grep -c -F 'SESSION_NOT_FOUND'
|
Loading…
Reference in New Issue
Block a user