This commit is contained in:
Alexander Tokmakov 2023-03-02 02:59:27 +01:00
parent 6ee65fa1dc
commit ad4a44df52
4 changed files with 52 additions and 9 deletions

View File

@ -140,6 +140,23 @@ public:
scheduleCloseSession(session, lock);
}
void closeSession(const UUID & user_id, const String & session_id)
{
std::unique_lock lock(mutex);
Key key{user_id, session_id};
auto it = sessions.find(key);
if (it == sessions.end())
{
LOG_INFO(log, "Session {} not found for user {}, probably it's already closed", session_id, user_id);
return;
}
if (!it->second.unique())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot close session {} with refcount {}", session_id, it->second.use_count());
sessions.erase(it);
}
private:
class SessionKeyHash
{
@ -408,7 +425,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
std::shared_ptr<NamedSessionData> new_named_session;
bool new_named_session_created = false;
std::tie(new_named_session, new_named_session_created)
= NamedSessionsStorage::instance().acquireSession(global_context, user_id.value_or(UUID{}), session_name_, timeout_, session_check_);
= NamedSessionsStorage::instance().acquireSession(global_context, *user_id, session_name_, timeout_, session_check_);
auto new_session_context = new_named_session->context;
new_session_context->makeSessionContext();
@ -533,5 +550,18 @@ void Session::releaseSessionID()
named_session = nullptr;
}
void Session::closeSession(const String & session_id)
{
if (!user_id) /// User was not authenticated
return;
/// named_session may be not set due to an early exception
if (!named_session)
return;
releaseSessionID();
NamedSessionsStorage::instance().closeSession(*user_id, session_id);
}
}

View File

@ -77,6 +77,9 @@ public:
/// Releases the currently used session ID so it becomes available for reuse by another session.
void releaseSessionID();
/// Closes and removes session
void closeSession(const String & session_id);
private:
std::shared_ptr<SessionLog> getSessionLog() const;
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;

View File

@ -24,6 +24,7 @@
#include <Common/logger_useful.h>
#include <Common/SettingsChanges.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTSetQuery.h>
@ -678,7 +679,7 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in;
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version"};
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session"};
Names reserved_param_suffixes;
@ -957,6 +958,14 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
/// In case of exception, send stack trace to client.
bool with_stacktrace = false;
/// Close http session (if any) after processing the request
bool close_session = false;
String session_id;
SCOPE_EXIT_SAFE({
if (close_session && !session_id.empty())
session->closeSession(session_id);
});
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
SCOPE_EXIT({
@ -1006,6 +1015,9 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
HTMLForm params(default_settings, request);
with_stacktrace = params.getParsed<bool>("stacktrace", false);
close_session = params.getParsed<bool>("close_session", false);
if (close_session)
session_id = params.get("session_id");
/// FIXME: maybe this check is already unnecessary.
/// Workaround. Poco does not detect 411 Length Required case.

View File

@ -16,22 +16,20 @@ $CLICKHOUSE_CLIENT -q 'create table dedup_test(A Int64) Engine = MergeTree order
function insert_data
{
IMPLICIT=$(( RANDOM % 2 ))
SESSION_ID="${SESSION}_$RANDOM$RANDOM$RANDOM"
TXN_SETTINGS="session_id=$SESSION_ID&throw_on_unsupported_query_inside_transaction=0"
SESSION_ID="${SESSION}_$RANDOM.$RANDOM.$RANDOM"
TXN_SETTINGS="session_id=$SESSION_ID&throw_on_unsupported_query_inside_transaction=0&implicit_transaction=$IMPLICIT"
BEGIN=""
COMMIT=""
SETTINGS="query_id=$ID&$TXN_SETTINGS&max_insert_block_size=110000&min_insert_block_size_rows=110000"
if [[ "$IMPLICIT" -eq 0 ]]; then
$CLICKHOUSE_CURL -sS -d 'begin transaction' "$CLICKHOUSE_URL&$TXN_SETTINGS"
SETTINGS="$SETTINGS&session_check=1"
BEGIN="begin transaction;"
COMMIT=$(echo -ne "\n\ncommit")
else
TXN_SETTINGS="$TXN_SETTINGS&implicit_transaction=1"
fi
SETTINGS="query_id=$ID&$TXN_SETTINGS&max_insert_block_size=110000&min_insert_block_size_rows=110000"
# max_block_size=10000, so external table will contain smaller blocks that will be squashed on insert-select (more chances to catch a bug on query cancellation)
TRASH_SETTINGS="query_id=$ID&$TXN_SETTINGS&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_insert_block_size=110000&max_block_size=10000&min_insert_block_size_bytes=0&min_insert_block_size_rows=110000&max_insert_block_size=110000"
TRASH_SETTINGS="$SETTINGS&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_block_size=10000&min_insert_block_size_bytes=0"
TYPE=$(( RANDOM % 6 ))
if [[ "$TYPE" -eq 0 ]]; then
@ -49,7 +47,7 @@ function insert_data
fi
if [[ "$IMPLICIT" -eq 0 ]]; then
$CLICKHOUSE_CURL -sS -d 'commit' "$CLICKHOUSE_URL&$TXN_SETTINGS" | grep -Faq "Transaction is not in RUNNING state" && $CLICKHOUSE_CURL -sS -d 'rollback' "$CLICKHOUSE_URL&$TXN_SETTINGS"
$CLICKHOUSE_CURL -sS -d 'commit' "$CLICKHOUSE_URL&$TXN_SETTINGS&close_session=1" | grep -Faq "Transaction is not in RUNNING state" && $CLICKHOUSE_CURL -sS -d 'rollback' "$CLICKHOUSE_URL&$TXN_SETTINGS"
fi
}