From ad4a44df52b6b04ac5977d12aa35b099a792133c Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 2 Mar 2023 02:59:27 +0100 Subject: [PATCH] fix --- src/Interpreters/Session.cpp | 32 ++++++++++++++++++- src/Interpreters/Session.h | 3 ++ src/Server/HTTPHandler.cpp | 14 +++++++- .../02435_rollback_cancelled_queries.sh | 12 +++---- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/src/Interpreters/Session.cpp b/src/Interpreters/Session.cpp index 7411050aa2d..70d4c0e6ae0 100644 --- a/src/Interpreters/Session.cpp +++ b/src/Interpreters/Session.cpp @@ -140,6 +140,23 @@ public: scheduleCloseSession(session, lock); } + void closeSession(const UUID & user_id, const String & session_id) + { + std::unique_lock lock(mutex); + Key key{user_id, session_id}; + auto it = sessions.find(key); + if (it == sessions.end()) + { + LOG_INFO(log, "Session {} not found for user {}, probably it's already closed", session_id, user_id); + return; + } + + if (!it->second.unique()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot close session {} with refcount {}", session_id, it->second.use_count()); + + sessions.erase(it); + } + private: class SessionKeyHash { @@ -408,7 +425,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std: std::shared_ptr new_named_session; bool new_named_session_created = false; std::tie(new_named_session, new_named_session_created) - = NamedSessionsStorage::instance().acquireSession(global_context, user_id.value_or(UUID{}), session_name_, timeout_, session_check_); + = NamedSessionsStorage::instance().acquireSession(global_context, *user_id, session_name_, timeout_, session_check_); auto new_session_context = new_named_session->context; new_session_context->makeSessionContext(); @@ -533,5 +550,18 @@ void Session::releaseSessionID() named_session = nullptr; } +void Session::closeSession(const String & session_id) +{ + if (!user_id) /// User was not authenticated + return; + + /// named_session may be not set due to an early exception + if (!named_session) + return; + + releaseSessionID(); + NamedSessionsStorage::instance().closeSession(*user_id, session_id); +} + } diff --git a/src/Interpreters/Session.h b/src/Interpreters/Session.h index 0f17c378915..443867806d6 100644 --- a/src/Interpreters/Session.h +++ b/src/Interpreters/Session.h @@ -77,6 +77,9 @@ public: /// Releases the currently used session ID so it becomes available for reuse by another session. void releaseSessionID(); + /// Closes and removes session + void closeSession(const String & session_id); + private: std::shared_ptr getSessionLog() const; ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const; diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index 702743ef1f0..f468167f782 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -678,7 +679,7 @@ void HTTPHandler::processQuery( std::unique_ptr in; static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", - "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version"}; + "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session"}; Names reserved_param_suffixes; @@ -957,6 +958,14 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse /// In case of exception, send stack trace to client. bool with_stacktrace = false; + /// Close http session (if any) after processing the request + bool close_session = false; + String session_id; + + SCOPE_EXIT_SAFE({ + if (close_session && !session_id.empty()) + session->closeSession(session_id); + }); OpenTelemetry::TracingContextHolderPtr thread_trace_context; SCOPE_EXIT({ @@ -1006,6 +1015,9 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse HTMLForm params(default_settings, request); with_stacktrace = params.getParsed("stacktrace", false); + close_session = params.getParsed("close_session", false); + if (close_session) + session_id = params.get("session_id"); /// FIXME: maybe this check is already unnecessary. /// Workaround. Poco does not detect 411 Length Required case. diff --git a/tests/queries/0_stateless/02435_rollback_cancelled_queries.sh b/tests/queries/0_stateless/02435_rollback_cancelled_queries.sh index b639cd5ef70..7c7ef037e02 100755 --- a/tests/queries/0_stateless/02435_rollback_cancelled_queries.sh +++ b/tests/queries/0_stateless/02435_rollback_cancelled_queries.sh @@ -16,22 +16,20 @@ $CLICKHOUSE_CLIENT -q 'create table dedup_test(A Int64) Engine = MergeTree order function insert_data { IMPLICIT=$(( RANDOM % 2 )) - SESSION_ID="${SESSION}_$RANDOM$RANDOM$RANDOM" - TXN_SETTINGS="session_id=$SESSION_ID&throw_on_unsupported_query_inside_transaction=0" + SESSION_ID="${SESSION}_$RANDOM.$RANDOM.$RANDOM" + TXN_SETTINGS="session_id=$SESSION_ID&throw_on_unsupported_query_inside_transaction=0&implicit_transaction=$IMPLICIT" BEGIN="" COMMIT="" SETTINGS="query_id=$ID&$TXN_SETTINGS&max_insert_block_size=110000&min_insert_block_size_rows=110000" if [[ "$IMPLICIT" -eq 0 ]]; then $CLICKHOUSE_CURL -sS -d 'begin transaction' "$CLICKHOUSE_URL&$TXN_SETTINGS" + SETTINGS="$SETTINGS&session_check=1" BEGIN="begin transaction;" COMMIT=$(echo -ne "\n\ncommit") - else - TXN_SETTINGS="$TXN_SETTINGS&implicit_transaction=1" fi - SETTINGS="query_id=$ID&$TXN_SETTINGS&max_insert_block_size=110000&min_insert_block_size_rows=110000" # max_block_size=10000, so external table will contain smaller blocks that will be squashed on insert-select (more chances to catch a bug on query cancellation) - TRASH_SETTINGS="query_id=$ID&$TXN_SETTINGS&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_insert_block_size=110000&max_block_size=10000&min_insert_block_size_bytes=0&min_insert_block_size_rows=110000&max_insert_block_size=110000" + TRASH_SETTINGS="$SETTINGS&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_block_size=10000&min_insert_block_size_bytes=0" TYPE=$(( RANDOM % 6 )) if [[ "$TYPE" -eq 0 ]]; then @@ -49,7 +47,7 @@ function insert_data fi if [[ "$IMPLICIT" -eq 0 ]]; then - $CLICKHOUSE_CURL -sS -d 'commit' "$CLICKHOUSE_URL&$TXN_SETTINGS" | grep -Faq "Transaction is not in RUNNING state" && $CLICKHOUSE_CURL -sS -d 'rollback' "$CLICKHOUSE_URL&$TXN_SETTINGS" + $CLICKHOUSE_CURL -sS -d 'commit' "$CLICKHOUSE_URL&$TXN_SETTINGS&close_session=1" | grep -Faq "Transaction is not in RUNNING state" && $CLICKHOUSE_CURL -sS -d 'rollback' "$CLICKHOUSE_URL&$TXN_SETTINGS" fi }