diff --git a/src/Interpreters/Session.cpp b/src/Interpreters/Session.cpp index de2a779b740..0a6435cff75 100644 --- a/src/Interpreters/Session.cpp +++ b/src/Interpreters/Session.cpp @@ -520,6 +520,8 @@ ContextMutablePtr Session::makeSessionContext() {}, session_context->getSettingsRef().max_sessions_for_user); + recordLoginSucess(session_context); + return session_context; } @@ -582,6 +584,8 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std: { session_name_ }, max_sessions_for_user); + recordLoginSucess(session_context); + return session_context; } @@ -655,24 +659,38 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t if (user_id) user = query_context->getUser(); - if (!notified_session_log_about_login) - { - if (auto session_log = getSessionLog()) - { - session_log->addLoginSuccess( - auth_id, - named_session ? std::optional(named_session->key.second) : std::nullopt, - *query_context, - user); - - notified_session_log_about_login = true; - } - } + /// Interserver does not create session context + recordLoginSucess(query_context); return query_context; } +void Session::recordLoginSucess(ContextPtr login_context) const +{ + if (notified_session_log_about_login) + return; + + if (!login_context) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Session or query context must be created"); + + if (auto session_log = getSessionLog()) + { + const auto & settings = login_context->getSettingsRef(); + const auto access = login_context->getAccess(); + + session_log->addLoginSuccess(auth_id, + named_session ? named_session->key.second : "", + settings, + access, + getClientInfo(), + user); + + notified_session_log_about_login = true; + } +} + + void Session::releaseSessionID() { if (!named_session) diff --git a/src/Interpreters/Session.h b/src/Interpreters/Session.h index 51c0e3c71fa..81ef987b428 100644 --- a/src/Interpreters/Session.h +++ b/src/Interpreters/Session.h @@ -97,6 +97,8 @@ public: private: std::shared_ptr getSessionLog() const; ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const; + void recordLoginSucess(ContextPtr login_context) const; + mutable bool notified_session_log_about_login = false; const UUID auth_id; diff --git a/src/Interpreters/SessionLog.cpp b/src/Interpreters/SessionLog.cpp index 0a8a7fc18c5..b847eaf9824 100644 --- a/src/Interpreters/SessionLog.cpp +++ b/src/Interpreters/SessionLog.cpp @@ -199,12 +199,13 @@ void SessionLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insertData(auth_failure_reason.data(), auth_failure_reason.length()); } -void SessionLog::addLoginSuccess(const UUID & auth_id, std::optional session_id, const Context & login_context, const UserPtr & login_user) +void SessionLog::addLoginSuccess(const UUID & auth_id, + const String & session_id, + const Settings & settings, + const ContextAccessPtr & access, + const ClientInfo & client_info, + const UserPtr & login_user) { - const auto access = login_context.getAccess(); - const auto & settings = login_context.getSettingsRef(); - const auto & client_info = login_context.getClientInfo(); - DB::SessionLogElement log_entry(auth_id, SESSION_LOGIN_SUCCESS); log_entry.client_info = client_info; @@ -215,8 +216,7 @@ void SessionLog::addLoginSuccess(const UUID & auth_id, std::optional ses } log_entry.external_auth_server = login_user ? login_user->auth_data.getLDAPServerName() : ""; - if (session_id) - log_entry.session_id = *session_id; + log_entry.session_id = session_id; if (const auto roles_info = access->getRolesInfo()) log_entry.roles = roles_info->getCurrentRolesNames(); diff --git a/src/Interpreters/SessionLog.h b/src/Interpreters/SessionLog.h index 1282ac09c4d..8757bc12270 100644 --- a/src/Interpreters/SessionLog.h +++ b/src/Interpreters/SessionLog.h @@ -20,6 +20,7 @@ enum SessionLogElementType : int8_t class ContextAccess; struct User; using UserPtr = std::shared_ptr; +using ContextAccessPtr = std::shared_ptr; /** A struct which will be inserted as row into session_log table. * @@ -72,7 +73,13 @@ class SessionLog : public SystemLog using SystemLog::SystemLog; public: - void addLoginSuccess(const UUID & auth_id, std::optional session_id, const Context & login_context, const UserPtr & login_user); + void addLoginSuccess(const UUID & auth_id, + const String & session_id, + const Settings & settings, + const ContextAccessPtr & access, + const ClientInfo & client_info, + const UserPtr & login_user); + void addLoginFailure(const UUID & auth_id, const ClientInfo & info, const std::optional & user, const Exception & reason); void addLogOut(const UUID & auth_id, const UserPtr & login_user, const ClientInfo & client_info); }; diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index ff5690a3b07..10d5e7a0242 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -561,8 +561,7 @@ void HTTPHandler::processQuery( session->makeSessionContext(); } - auto client_info = session->getClientInfo(); - auto context = session->makeQueryContext(std::move(client_info)); + auto context = session->makeQueryContext(); /// This parameter is used to tune the behavior of output formats (such as Native) for compatibility. if (params.has("client_protocol_version")) diff --git a/tests/integration/test_profile_max_sessions_for_user/test.py b/tests/integration/test_profile_max_sessions_for_user/test.py index 2930262f63e..65587933fed 100755 --- a/tests/integration/test_profile_max_sessions_for_user/test.py +++ b/tests/integration/test_profile_max_sessions_for_user/test.py @@ -27,10 +27,7 @@ proto_dir = os.path.join(SCRIPT_DIR, "./protos") gen_dir = os.path.join(SCRIPT_DIR, "./_gen") os.makedirs(gen_dir, exist_ok=True) run_and_check( - "python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} \ - {proto_dir}/clickhouse_grpc.proto".format( - proto_dir=proto_dir, gen_dir=gen_dir - ), + f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto", shell=True, ) diff --git a/tests/integration/test_session_log/.gitignore b/tests/integration/test_session_log/.gitignore new file mode 100644 index 00000000000..edf565ec632 --- /dev/null +++ b/tests/integration/test_session_log/.gitignore @@ -0,0 +1 @@ +_gen diff --git a/tests/integration/test_session_log/__init__.py b/tests/integration/test_session_log/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_session_log/configs/log.xml b/tests/integration/test_session_log/configs/log.xml new file mode 100644 index 00000000000..7a079b81e69 --- /dev/null +++ b/tests/integration/test_session_log/configs/log.xml @@ -0,0 +1,9 @@ + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 + + \ No newline at end of file diff --git a/tests/integration/test_session_log/configs/ports.xml b/tests/integration/test_session_log/configs/ports.xml new file mode 100644 index 00000000000..fbaefc16b3a --- /dev/null +++ b/tests/integration/test_session_log/configs/ports.xml @@ -0,0 +1,9 @@ + + 5433 + 9001 + 9100 + + + false + + \ No newline at end of file diff --git a/tests/integration/test_session_log/configs/session_log.xml b/tests/integration/test_session_log/configs/session_log.xml new file mode 100644 index 00000000000..a0e4e3e2216 --- /dev/null +++ b/tests/integration/test_session_log/configs/session_log.xml @@ -0,0 +1,9 @@ + + + system + session_log
+ + toYYYYMM(event_date) + 7500 +
+
diff --git a/tests/integration/test_session_log/configs/users.xml b/tests/integration/test_session_log/configs/users.xml new file mode 100644 index 00000000000..0416dfadc8a --- /dev/null +++ b/tests/integration/test_session_log/configs/users.xml @@ -0,0 +1,23 @@ + + + + 0 + + + + + + + pass + + + pass + + + pass + + + pass + + + \ No newline at end of file diff --git a/tests/integration/test_session_log/protos/clickhouse_grpc.proto b/tests/integration/test_session_log/protos/clickhouse_grpc.proto new file mode 120000 index 00000000000..25d15f11e3b --- /dev/null +++ b/tests/integration/test_session_log/protos/clickhouse_grpc.proto @@ -0,0 +1 @@ +../../../../src/Server/grpc_protos/clickhouse_grpc.proto \ No newline at end of file diff --git a/tests/integration/test_session_log/test.py b/tests/integration/test_session_log/test.py new file mode 100644 index 00000000000..dbb39993ce3 --- /dev/null +++ b/tests/integration/test_session_log/test.py @@ -0,0 +1,289 @@ +import os + +import grpc +import pymysql.connections +import psycopg2 as py_psql +import pytest +import random +import sys +import threading + +from helpers.cluster import ClickHouseCluster, run_and_check + +POSTGRES_SERVER_PORT = 5433 +MYSQL_SERVER_PORT = 9001 +GRPC_PORT = 9100 +SESSION_LOG_MATCHING_FIELDS = "auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface" + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +DEFAULT_ENCODING = "utf-8" + +# Use grpcio-tools to generate *pb2.py files from *.proto. +proto_dir = os.path.join(SCRIPT_DIR, "./protos") +gen_dir = os.path.join(SCRIPT_DIR, "./_gen") +os.makedirs(gen_dir, exist_ok=True) +run_and_check( + f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto", + shell=True, +) + +sys.path.append(gen_dir) + +import clickhouse_grpc_pb2 +import clickhouse_grpc_pb2_grpc + +cluster = ClickHouseCluster(__file__) +instance = cluster.add_instance( + "node", + main_configs=[ + "configs/ports.xml", + "configs/log.xml", + "configs/session_log.xml", + ], + user_configs=["configs/users.xml"], + # Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387 + env_variables={ + "TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="") + }, +) + + +def grpc_get_url(): + return f"{instance.ip_address}:{GRPC_PORT}" + + +def grpc_create_insecure_channel(): + channel = grpc.insecure_channel(grpc_get_url()) + grpc.channel_ready_future(channel).result(timeout=2) + return channel + + +session_id_counter = 0 + + +def next_session_id(): + global session_id_counter + session_id = session_id_counter + session_id_counter += 1 + return str(session_id) + + +def grpc_query(query, user_, pass_, raise_exception): + try: + query_info = clickhouse_grpc_pb2.QueryInfo( + query=query, + session_id=next_session_id(), + user_name=user_, + password=pass_, + ) + channel = grpc_create_insecure_channel() + stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel) + result = stub.ExecuteQuery(query_info) + if result and result.HasField("exception"): + raise Exception(result.exception.display_text) + + return result.output.decode(DEFAULT_ENCODING) + except Exception: + assert raise_exception + + +def postgres_query(query, user_, pass_, raise_exception): + try: + client = py_psql.connect( + host=instance.ip_address, + port=POSTGRES_SERVER_PORT, + user=user_, + password=pass_, + database="default", + ) + cursor = client.cursor() + cursor.execute(query) + cursor.fetchall() + except Exception: + assert raise_exception + + +def mysql_query(query, user_, pass_, raise_exception): + try: + client = pymysql.connections.Connection( + host=instance.ip_address, + user=user_, + password=pass_, + database="default", + port=MYSQL_SERVER_PORT, + ) + cursor = client.cursor(pymysql.cursors.DictCursor) + if raise_exception: + with pytest.raises(Exception): + cursor.execute(query) + else: + cursor.execute(query) + cursor.fetchall() + except Exception: + assert raise_exception + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_grpc_session(started_cluster): + grpc_query("SELECT 1", "grpc_user", "pass", False) + grpc_query("SELECT 2", "grpc_user", "wrong_pass", True) + grpc_query("SELECT 3", "wrong_grpc_user", "pass", True) + + instance.query("SYSTEM FLUSH LOGS") + login_success_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginSuccess'" + ) + assert login_success_records == "grpc_user\t1\t1\n" + logout_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'Logout'" + ) + assert logout_records == "grpc_user\t1\t1\n" + login_failure_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginFailure'" + ) + assert login_failure_records == "grpc_user\t1\t1\n" + logins_and_logouts = instance.query( + f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'Logout')" + ) + assert logins_and_logouts == "1\n" + + +def test_mysql_session(started_cluster): + mysql_query("SELECT 1", "mysql_user", "pass", False) + mysql_query("SELECT 2", "mysql_user", "wrong_pass", True) + mysql_query("SELECT 3", "wrong_mysql_user", "pass", True) + + instance.query("SYSTEM FLUSH LOGS") + login_success_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginSuccess'" + ) + assert login_success_records == "mysql_user\t1\t1\n" + logout_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'Logout'" + ) + assert logout_records == "mysql_user\t1\t1\n" + login_failure_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginFailure'" + ) + assert login_failure_records == "mysql_user\t1\t1\n" + logins_and_logouts = instance.query( + f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'Logout')" + ) + assert logins_and_logouts == "1\n" + + +def test_postgres_session(started_cluster): + postgres_query("SELECT 1", "postgres_user", "pass", False) + postgres_query("SELECT 2", "postgres_user", "wrong_pass", True) + postgres_query("SELECT 3", "wrong_postgres_user", "pass", True) + + instance.query("SYSTEM FLUSH LOGS") + login_success_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginSuccess'" + ) + assert login_success_records == "postgres_user\t1\t1\n" + logout_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'Logout'" + ) + assert logout_records == "postgres_user\t1\t1\n" + login_failure_records = instance.query( + "SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginFailure'" + ) + assert login_failure_records == "postgres_user\t1\t1\n" + logins_and_logouts = instance.query( + f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'Logout')" + ) + assert logins_and_logouts == "1\n" + + +def test_parallel_sessions(started_cluster): + thread_list = [] + for _ in range(10): + # Sleep time does not significantly matter here, + # test should pass even without sleeping. + for function in [postgres_query, grpc_query, mysql_query]: + thread = threading.Thread( + target=function, + args=( + f"SELECT sleep({random.uniform(0.03, 0.04)})", + "parallel_user", + "pass", + False, + ), + ) + thread.start() + thread_list.append(thread) + thread = threading.Thread( + target=function, + args=( + f"SELECT sleep({random.uniform(0.03, 0.04)})", + "parallel_user", + "wrong_pass", + True, + ), + ) + thread.start() + thread_list.append(thread) + thread = threading.Thread( + target=function, + args=( + f"SELECT sleep({random.uniform(0.03, 0.04)})", + "wrong_parallel_user", + "pass", + True, + ), + ) + thread.start() + thread_list.append(thread) + + for thread in thread_list: + thread.join() + + instance.query("SYSTEM FLUSH LOGS") + port_0_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user'" + ) + assert port_0_sessions == "90\n" + + port_0_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_port = 0" + ) + assert port_0_sessions == "0\n" + + address_0_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_address = toIPv6('::')" + ) + assert address_0_sessions == "0\n" + + grpc_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'gRPC'" + ) + assert grpc_sessions == "30\n" + + mysql_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'MySQL'" + ) + assert mysql_sessions == "30\n" + + postgres_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'PostgreSQL'" + ) + assert postgres_sessions == "30\n" + + logins_and_logouts = instance.query( + f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'Logout')" + ) + assert logins_and_logouts == "30\n" + + logout_failure_sessions = instance.query( + f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginFailure'" + ) + assert logout_failure_sessions == "30\n" diff --git a/tests/queries/0_stateless/02833_concurrrent_sessions.reference b/tests/queries/0_stateless/02833_concurrrent_sessions.reference new file mode 100644 index 00000000000..bfe507e8eac --- /dev/null +++ b/tests/queries/0_stateless/02833_concurrrent_sessions.reference @@ -0,0 +1,34 @@ +sessions: +150 +port_0_sessions: +0 +address_0_sessions: +0 +tcp_sessions +60 +http_sessions +30 +http_with_session_id_sessions +30 +my_sql_sessions +30 +Corresponding LoginSuccess/Logout +10 +LoginFailure +10 +Corresponding LoginSuccess/Logout +10 +LoginFailure +10 +Corresponding LoginSuccess/Logout +10 +LoginFailure +10 +Corresponding LoginSuccess/Logout +10 +LoginFailure +10 +Corresponding LoginSuccess/Logout +10 +LoginFailure +10 diff --git a/tests/queries/0_stateless/02833_concurrrent_sessions.sh b/tests/queries/0_stateless/02833_concurrrent_sessions.sh new file mode 100755 index 00000000000..d1d571c6985 --- /dev/null +++ b/tests/queries/0_stateless/02833_concurrrent_sessions.sh @@ -0,0 +1,138 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, long + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +readonly PID=$$ + +# Each user uses a separate thread. +readonly TCP_USERS=( "02833_TCP_USER_${PID}"_{1,2} ) # 2 concurrent TCP users +readonly HTTP_USERS=( "02833_HTTP_USER_${PID}" ) +readonly HTTP_WITH_SESSION_ID_SESSION_USERS=( "02833_HTTP_WITH_SESSION_ID_USER_${PID}" ) +readonly MYSQL_USERS=( "02833_MYSQL_USER_${PID}") +readonly ALL_USERS=( "${TCP_USERS[@]}" "${HTTP_USERS[@]}" "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}" "${MYSQL_USERS[@]}" ) + +readonly TCP_USERS_SQL_COLLECTION_STRING="$( echo "${TCP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )" +readonly HTTP_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )" +readonly HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_WITH_SESSION_ID_SESSION_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )" +readonly MYSQL_USERS_SQL_COLLECTION_STRING="$( echo "${MYSQL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )" +readonly ALL_USERS_SQL_COLLECTION_STRING="$( echo "${ALL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )" + +readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface" + +for user in "${ALL_USERS[@]}"; do + ${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${user} IDENTIFIED WITH plaintext_password BY 'pass'" + ${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${user}" + ${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${user}"; +done + +# All _session functions execute in separate threads. +# These functions try to create a session with successful login and logout. +# Sleep a small, random amount of time to make concurrency more intense. +# and try to login with an invalid password. +function tcp_session() +{ + local user=$1 + local i=0 + while (( (i++) < 10 )); do + # login logout + ${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass" + # login failure + ${CLICKHOUSE_CLIENT} -q "SELECT 2" --user="${user}" --password 'invalid' + done +} + +function http_session() +{ + local user=$1 + local i=0 + while (( (i++) < 10 )); do + # login logout + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})" + + # login failure + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=wrong" -d "SELECT 4" + done +} + +function http_with_session_id_session() +{ + local user=$1 + local i=0 + while (( (i++) < 10 )); do + # login logout + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}" + + # login failure + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=wrong" -d "SELECT 6" + done +} + +function mysql_session() +{ + local user=$1 + local i=0 + while (( (i++) < 10 )); do + # login logout + ${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')" + + # login failure + ${CLICKHOUSE_CLIENT} -q "SELECT 1 FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'wrong', SETTINGS connection_max_tries=1)" + done +} + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" +${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})" + +export -f tcp_session; +export -f http_session; +export -f http_with_session_id_session; +export -f mysql_session; + +for user in "${TCP_USERS[@]}"; do + timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 & +done + +for user in "${HTTP_USERS[@]}"; do + timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 & +done + +for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do + timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 & +done + +for user in "${MYSQL_USERS[@]}"; do + timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 & +done + +wait + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" + +echo "sessions:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})" + +echo "port_0_sessions:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0" + +echo "address_0_sessions:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_address = toIPv6('::')" + +echo "tcp_sessions" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${TCP_USERS_SQL_COLLECTION_STRING}) AND interface = 'TCP'" +echo "http_sessions" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'" +echo "http_with_session_id_sessions" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'" +echo "my_sql_sessions" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'" + +for user in "${ALL_USERS[@]}"; do + ${CLICKHOUSE_CLIENT} -q "DROP USER ${user}" + echo "Corresponding LoginSuccess/Logout" + ${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')" + echo "LoginFailure" + ${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'" + done diff --git a/tests/queries/0_stateless/02834_remote_session_log.reference b/tests/queries/0_stateless/02834_remote_session_log.reference new file mode 100644 index 00000000000..e2680982ab0 --- /dev/null +++ b/tests/queries/0_stateless/02834_remote_session_log.reference @@ -0,0 +1,13 @@ +0 +0 +0 +0 +client_port 0 connections: +0 +client_address '::' connections: +0 +login failures: +0 +TCP Login and logout count is equal +HTTP Login and logout count is equal +MySQL Login and logout count is equal diff --git a/tests/queries/0_stateless/02834_remote_session_log.sh b/tests/queries/0_stateless/02834_remote_session_log.sh new file mode 100755 index 00000000000..64f20a70cb1 --- /dev/null +++ b/tests/queries/0_stateless/02834_remote_session_log.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +readonly PID=$$ +readonly TEST_USER=$"02834_USER_${PID}" +readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface" + +${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER} IDENTIFIED WITH plaintext_password BY 'pass'" +${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${TEST_USER}" +${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${TEST_USER}" +${CLICKHOUSE_CLIENT} -q "GRANT CREATE TEMPORARY TABLE, MYSQL, REMOTE ON *.* TO ${TEST_USER}" + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" +${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'" + +${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \ + -d "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" + +${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \ + -d "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" + +${CLICKHOUSE_CLIENT} -q "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass" +${CLICKHOUSE_CLIENT} -q "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass" + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" + +echo "client_port 0 connections:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_port = 0" + +echo "client_address '::' connections:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_address = toIPv6('::')" + +echo "login failures:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and type = 'LoginFailure'" + +# remote(...) function sometimes reuses old cached sessions for query execution. +# This makes LoginSuccess/Logout entries count unstable, but success and logouts must always match. + +for interface in 'TCP' 'HTTP' 'MySQL' +do + LOGIN_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}'"` + CORRESPONDING_LOGOUT_RECORDS_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}')"` + + if [ "$LOGIN_COUNT" == "$CORRESPONDING_LOGOUT_RECORDS_COUNT" ]; then + echo "${interface} Login and logout count is equal" + else + TOTAL_LOGOUT_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}'"` + echo "${interface} Login count ${LOGIN_COUNT} != corresponding logout count ${CORRESPONDING_LOGOUT_RECORDS_COUNT}. TOTAL_LOGOUT_COUNT ${TOTAL_LOGOUT_COUNT}" + fi +done + +${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}" diff --git a/tests/queries/0_stateless/02835_drop_user_during_session.reference b/tests/queries/0_stateless/02835_drop_user_during_session.reference new file mode 100644 index 00000000000..7252faab8c6 --- /dev/null +++ b/tests/queries/0_stateless/02835_drop_user_during_session.reference @@ -0,0 +1,8 @@ +port_0_sessions: +0 +address_0_sessions: +0 +Corresponding LoginSuccess/Logout +9 +LoginFailure +0 diff --git a/tests/queries/0_stateless/02835_drop_user_during_session.sh b/tests/queries/0_stateless/02835_drop_user_during_session.sh new file mode 100755 index 00000000000..e848e1fe90d --- /dev/null +++ b/tests/queries/0_stateless/02835_drop_user_during_session.sh @@ -0,0 +1,113 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +readonly PID=$$ + +readonly TEST_USER="02835_USER_${PID}" +readonly TEST_ROLE="02835_ROLE_${PID}" +readonly TEST_PROFILE="02835_PROFILE_${PID}" +readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface" + +function tcp_session() +{ + local user=$1 + ${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.numbers" --user="${user}" +} + +function http_session() +{ + local user=$1 + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers" +} + +function http_with_session_id_session() +{ + local user=$1 + ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers" +} + +# Busy-waits until user $1, specified amount of queries ($2) will run simultaneously. +function wait_for_queries_start() +{ + local user=$1 + local queries_count=$2 + # 10 seconds waiting + counter=0 retries=100 + while [[ $counter -lt $retries ]]; do + result=$($CLICKHOUSE_CLIENT --query "SELECT COUNT(*) FROM system.processes WHERE user = '${user}'") + if [[ $result == "${queries_count}" ]]; then + break; + fi + sleep 0.1 + ((++counter)) + done +} + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" +${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'" + +# DROP USE CASE +${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER}" +${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}" + +export -f tcp_session; +export -f http_session; +export -f http_with_session_id_session; + +timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 & +timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 & +timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 & + +wait_for_queries_start $TEST_USER 3 +${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}" +${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null & + +wait + +# DROP ROLE CASE +${CLICKHOUSE_CLIENT} -q "CREATE ROLE IF NOT EXISTS ${TEST_ROLE}" +${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} DEFAULT ROLE ${TEST_ROLE}" +${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}" + +timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 & +timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 & +timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 & + +wait_for_queries_start $TEST_USER 3 +${CLICKHOUSE_CLIENT} -q "DROP ROLE ${TEST_ROLE}" +${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}" + +${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null & + +wait + +# DROP PROFILE CASE +${CLICKHOUSE_CLIENT} -q "CREATE SETTINGS PROFILE IF NOT EXISTS '${TEST_PROFILE}'" +${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} SETTINGS PROFILE '${TEST_PROFILE}'" +${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}" + +timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 & +timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 & +timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 & + +wait_for_queries_start $TEST_USER 3 +${CLICKHOUSE_CLIENT} -q "DROP SETTINGS PROFILE '${TEST_PROFILE}'" +${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}" + +${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null & + +wait + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" + +echo "port_0_sessions:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_port = 0" +echo "address_0_sessions:" +${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_address = toIPv6('::')" +echo "Corresponding LoginSuccess/Logout" +${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS}, FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout')" +echo "LoginFailure" +${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginFailure'"