Better code around sessions, step 2

This commit is contained in:
Alexey Milovidov 2020-03-05 06:57:31 +03:00
parent 156e6246c1
commit aac2f98870
3 changed files with 70 additions and 57 deletions

View File

@ -273,7 +273,7 @@ void HTTPHandler::processQuery(
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
std::shared_ptr<Context> session;
std::shared_ptr<Session> session;
String session_id;
std::chrono::steady_clock::duration session_timeout;
bool session_is_set = params.has("session_id");
@ -287,15 +287,10 @@ void HTTPHandler::processQuery(
session = context.acquireSession(session_id, session_timeout, session_check == "1");
context = *session;
context.setSessionContext(*session);
context = session->context;
context.setSessionContext(session->context);
}
SCOPE_EXIT({
if (session_is_set)
session->releaseSession(session_id, session_timeout);
});
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
CompressionMethod http_response_compression_method = CompressionMethod::None;

View File

@ -96,11 +96,34 @@ namespace ErrorCodes
}
class Sessions;
/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
struct Session
{
SessionKey key;
UInt64 close_cycle = 0;
bool is_used = false;
Context context;
std::chrono::steady_clock::duration timeout;
Sessions & parent;
Session(SessionKey key_, Context & context_, std::chrono::steady_clock::duration timeout_, Sessions & parent_)
: key(key_), context(context_), timeout(timeout_), parent(parent_)
{
}
~Session();
};
class Sessions
{
public:
using Key = Context::SessionKey;
using Key = SessionKey;
~Sessions()
{
@ -121,10 +144,17 @@ public:
}
/// Find existing session or create a new.
std::shared_ptr<Context> acquireSession(const Key & key, Context & context, std::chrono::steady_clock::duration timeout, bool throw_if_not_found)
std::shared_ptr<Session> acquireSession(const String & session_id, Context & context, std::chrono::steady_clock::duration timeout, bool throw_if_not_found)
{
std::unique_lock lock(mutex);
auto & user_name = context.client_info.current_user;
if (user_name.empty())
throw Exception("Empty user name.", ErrorCodes::LOGICAL_ERROR);
Key key(user_name, session_id);
auto it = sessions.find(key);
if (it == sessions.end())
{
@ -132,29 +162,31 @@ public:
throw Exception("Session not found.", ErrorCodes::SESSION_NOT_FOUND);
/// Create a new session from current context.
auto new_session = std::make_shared<Context>(context);
scheduleCloseSession(key, *new_session, timeout, lock);
auto new_session = std::make_shared<Session>(key, context, timeout, *this);
scheduleCloseSession(*new_session, lock);
it = sessions.insert(std::make_pair(key, std::move(new_session))).first;
}
else if (it->second->client_info.current_user != context.client_info.current_user)
else if (it->second->key.first != context.client_info.current_user)
{
throw Exception("Session belongs to a different user", ErrorCodes::LOGICAL_ERROR);
}
/// Use existing session.
const auto & session = it->second;
if (session->session_is_used)
if (session->is_used)
throw Exception("Session is locked by a concurrent client.", ErrorCodes::SESSION_IS_LOCKED);
session->session_is_used = true;
session->is_used = true;
return session;
}
void releaseSession(const Key & key, Context & session, std::chrono::steady_clock::duration timeout)
void releaseSession(Session & session)
{
std::unique_lock lock(mutex);
session.session_is_used = false;
scheduleCloseSession(key, session, timeout, lock);
session.is_used = false;
scheduleCloseSession(session, lock);
}
private:
@ -170,7 +202,7 @@ private:
}
};
using Container = std::unordered_map<Key, std::shared_ptr<Context>, SessionKeyHash>;
using Container = std::unordered_map<Key, std::shared_ptr<Session>, SessionKeyHash>;
using CloseTimes = std::deque<std::vector<Key>>;
Container sessions;
CloseTimes close_times;
@ -178,17 +210,20 @@ private:
std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now();
UInt64 close_cycle = 0;
void scheduleCloseSession(const Key & key, Context & session, std::chrono::steady_clock::duration timeout, std::unique_lock<std::mutex> &)
void scheduleCloseSession(Session & session, std::unique_lock<std::mutex> &)
{
const UInt64 close_index = timeout / close_interval + 1;
/// Push it on a queue of sessions to close, on a position corresponding to the timeout.
/// (timeout is measured from current moment of time)
const UInt64 close_index = session.timeout / close_interval + 1;
const auto new_close_cycle = close_cycle + close_index;
if (session.session_close_cycle != new_close_cycle)
if (session.close_cycle != new_close_cycle)
{
session.session_close_cycle = new_close_cycle;
session.close_cycle = new_close_cycle;
if (close_times.size() < close_index + 1)
close_times.resize(close_index + 1);
close_times[close_index].emplace_back(key);
close_times[close_index].emplace_back(session.key);
}
}
@ -230,10 +265,13 @@ private:
{
const auto session = sessions.find(key);
if (session != sessions.end() && session->second->session_close_cycle <= current_cycle)
if (session != sessions.end() && session->second->close_cycle <= current_cycle)
{
if (session->second->session_is_used)
scheduleCloseSession(key, *session->second, std::chrono::seconds(0), lock);
if (session->second->is_used)
{
session->second->timeout = std::chrono::steady_clock::duration{0};
scheduleCloseSession(*session->second, lock);
}
else
sessions.erase(session);
}
@ -250,6 +288,12 @@ private:
};
Session::~Session()
{
parent.releaseSession(*this);
}
/** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important.
*/
@ -494,26 +538,9 @@ Databases Context::getDatabases()
}
Context::SessionKey Context::getSessionKey(const String & session_id) const
std::shared_ptr<Session> Context::acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check)
{
auto & user_name = client_info.current_user;
if (user_name.empty())
throw Exception("Empty user name.", ErrorCodes::LOGICAL_ERROR);
return SessionKey(user_name, session_id);
}
std::shared_ptr<Context> Context::acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check)
{
return shared->sessions.acquireSession(getSessionKey(session_id), *this, timeout, session_check);
}
void Context::releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout)
{
shared->sessions.releaseSession(getSessionKey(session_id), *this, timeout);
return shared->sessions.acquireSession(session_id, *this, timeout, session_check);
}

View File

@ -100,11 +100,10 @@ class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
struct Session;
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
#endif
/// Table -> set of table-views that make SELECT from it.
@ -177,8 +176,6 @@ private:
Context * global_context = nullptr; /// Global context. Could be equal to this.
friend class Sessions;
UInt64 session_close_cycle = 0;
bool session_is_used = false;
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
@ -420,8 +417,7 @@ public:
const Databases getDatabases() const;
Databases getDatabases();
std::shared_ptr<Context> acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check);
void releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout);
std::shared_ptr<Session> acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check);
/// For methods below you may need to acquire a lock by yourself.
std::unique_lock<std::recursive_mutex> getLock() const;
@ -582,9 +578,6 @@ public:
String getFormatSchemaPath() const;
void setFormatSchemaPath(const String & path);
/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;
SampleBlockCache & getSampleBlockCache() const;
/// Query parameters for prepared statements.
@ -629,8 +622,6 @@ private:
StoragePtr getTableImpl(const StorageID & table_id, std::optional<Exception> * exception) const;
SessionKey getSessionKey(const String & session_id) const;
void checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const;
};