Merge pull request #52958 from arenadata/ADQM-1011

Added new tests for session_log and fixed the inconsistency between login and logout.
This commit is contained in:
Alexander Tokmakov 2023-08-07 15:18:16 +03:00 committed by GitHub
commit 73d75cde11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 753 additions and 27 deletions

View File

@ -520,6 +520,8 @@ ContextMutablePtr Session::makeSessionContext()
{}, {},
session_context->getSettingsRef().max_sessions_for_user); session_context->getSettingsRef().max_sessions_for_user);
recordLoginSucess(session_context);
return session_context; return session_context;
} }
@ -582,6 +584,8 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
{ session_name_ }, { session_name_ },
max_sessions_for_user); max_sessions_for_user);
recordLoginSucess(session_context);
return session_context; return session_context;
} }
@ -655,24 +659,38 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
if (user_id) if (user_id)
user = query_context->getUser(); user = query_context->getUser();
if (!notified_session_log_about_login) /// Interserver does not create session context
{ recordLoginSucess(query_context);
if (auto session_log = getSessionLog())
{
session_log->addLoginSuccess(
auth_id,
named_session ? std::optional<std::string>(named_session->key.second) : std::nullopt,
*query_context,
user);
notified_session_log_about_login = true;
}
}
return query_context; return query_context;
} }
void Session::recordLoginSucess(ContextPtr login_context) const
{
if (notified_session_log_about_login)
return;
if (!login_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session or query context must be created");
if (auto session_log = getSessionLog())
{
const auto & settings = login_context->getSettingsRef();
const auto access = login_context->getAccess();
session_log->addLoginSuccess(auth_id,
named_session ? named_session->key.second : "",
settings,
access,
getClientInfo(),
user);
notified_session_log_about_login = true;
}
}
void Session::releaseSessionID() void Session::releaseSessionID()
{ {
if (!named_session) if (!named_session)

View File

@ -97,6 +97,8 @@ public:
private: private:
std::shared_ptr<SessionLog> getSessionLog() const; std::shared_ptr<SessionLog> getSessionLog() const;
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const; ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;
void recordLoginSucess(ContextPtr login_context) const;
mutable bool notified_session_log_about_login = false; mutable bool notified_session_log_about_login = false;
const UUID auth_id; const UUID auth_id;

View File

@ -199,12 +199,13 @@ void SessionLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insertData(auth_failure_reason.data(), auth_failure_reason.length()); columns[i++]->insertData(auth_failure_reason.data(), auth_failure_reason.length());
} }
void SessionLog::addLoginSuccess(const UUID & auth_id, std::optional<String> session_id, const Context & login_context, const UserPtr & login_user) void SessionLog::addLoginSuccess(const UUID & auth_id,
const String & session_id,
const Settings & settings,
const ContextAccessPtr & access,
const ClientInfo & client_info,
const UserPtr & login_user)
{ {
const auto access = login_context.getAccess();
const auto & settings = login_context.getSettingsRef();
const auto & client_info = login_context.getClientInfo();
DB::SessionLogElement log_entry(auth_id, SESSION_LOGIN_SUCCESS); DB::SessionLogElement log_entry(auth_id, SESSION_LOGIN_SUCCESS);
log_entry.client_info = client_info; log_entry.client_info = client_info;
@ -215,8 +216,7 @@ void SessionLog::addLoginSuccess(const UUID & auth_id, std::optional<String> ses
} }
log_entry.external_auth_server = login_user ? login_user->auth_data.getLDAPServerName() : ""; log_entry.external_auth_server = login_user ? login_user->auth_data.getLDAPServerName() : "";
if (session_id) log_entry.session_id = session_id;
log_entry.session_id = *session_id;
if (const auto roles_info = access->getRolesInfo()) if (const auto roles_info = access->getRolesInfo())
log_entry.roles = roles_info->getCurrentRolesNames(); log_entry.roles = roles_info->getCurrentRolesNames();

View File

@ -20,6 +20,7 @@ enum SessionLogElementType : int8_t
class ContextAccess; class ContextAccess;
struct User; struct User;
using UserPtr = std::shared_ptr<const User>; using UserPtr = std::shared_ptr<const User>;
using ContextAccessPtr = std::shared_ptr<const ContextAccess>;
/** A struct which will be inserted as row into session_log table. /** A struct which will be inserted as row into session_log table.
* *
@ -72,7 +73,13 @@ class SessionLog : public SystemLog<SessionLogElement>
using SystemLog<SessionLogElement>::SystemLog; using SystemLog<SessionLogElement>::SystemLog;
public: public:
void addLoginSuccess(const UUID & auth_id, std::optional<String> session_id, const Context & login_context, const UserPtr & login_user); void addLoginSuccess(const UUID & auth_id,
const String & session_id,
const Settings & settings,
const ContextAccessPtr & access,
const ClientInfo & client_info,
const UserPtr & login_user);
void addLoginFailure(const UUID & auth_id, const ClientInfo & info, const std::optional<String> & user, const Exception & reason); void addLoginFailure(const UUID & auth_id, const ClientInfo & info, const std::optional<String> & user, const Exception & reason);
void addLogOut(const UUID & auth_id, const UserPtr & login_user, const ClientInfo & client_info); void addLogOut(const UUID & auth_id, const UserPtr & login_user, const ClientInfo & client_info);
}; };

View File

@ -561,8 +561,7 @@ void HTTPHandler::processQuery(
session->makeSessionContext(); session->makeSessionContext();
} }
auto client_info = session->getClientInfo(); auto context = session->makeQueryContext();
auto context = session->makeQueryContext(std::move(client_info));
/// This parameter is used to tune the behavior of output formats (such as Native) for compatibility. /// This parameter is used to tune the behavior of output formats (such as Native) for compatibility.
if (params.has("client_protocol_version")) if (params.has("client_protocol_version"))

View File

@ -27,10 +27,7 @@ proto_dir = os.path.join(SCRIPT_DIR, "./protos")
gen_dir = os.path.join(SCRIPT_DIR, "./_gen") gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
os.makedirs(gen_dir, exist_ok=True) os.makedirs(gen_dir, exist_ok=True)
run_and_check( run_and_check(
"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} \ f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
{proto_dir}/clickhouse_grpc.proto".format(
proto_dir=proto_dir, gen_dir=gen_dir
),
shell=True, shell=True,
) )

View File

@ -0,0 +1 @@
_gen

View File

@ -0,0 +1,9 @@
<clickhouse>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
</clickhouse>

View File

@ -0,0 +1,9 @@
<clickhouse>
<postgresql_port>5433</postgresql_port>
<mysql_port>9001</mysql_port>
<grpc_port>9100</grpc_port>
<grpc replace="replace">
<!-- Enable if you want very detailed logs -->
<verbose_logs>false</verbose_logs>
</grpc>
</clickhouse>

View File

@ -0,0 +1,9 @@
<clickhouse>
<session_log>
<database>system</database>
<table>session_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</session_log>
</clickhouse>

View File

@ -0,0 +1,23 @@
<clickhouse>
<profiles>
<default>
<function_sleep_max_microseconds_per_block>0</function_sleep_max_microseconds_per_block>
</default>
</profiles>
<users>
<default>
</default>
<mysql_user>
<password>pass</password>
</mysql_user>
<postgres_user>
<password>pass</password>
</postgres_user>
<grpc_user>
<password>pass</password>
</grpc_user>
<parallel_user>
<password>pass</password>
</parallel_user>
</users>
</clickhouse>

View File

@ -0,0 +1 @@
../../../../src/Server/grpc_protos/clickhouse_grpc.proto

View File

@ -0,0 +1,289 @@
import os
import grpc
import pymysql.connections
import psycopg2 as py_psql
import pytest
import random
import sys
import threading
from helpers.cluster import ClickHouseCluster, run_and_check
POSTGRES_SERVER_PORT = 5433
MYSQL_SERVER_PORT = 9001
GRPC_PORT = 9100
SESSION_LOG_MATCHING_FIELDS = "auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DEFAULT_ENCODING = "utf-8"
# Use grpcio-tools to generate *pb2.py files from *.proto.
proto_dir = os.path.join(SCRIPT_DIR, "./protos")
gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
os.makedirs(gen_dir, exist_ok=True)
run_and_check(
f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
shell=True,
)
sys.path.append(gen_dir)
import clickhouse_grpc_pb2
import clickhouse_grpc_pb2_grpc
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"node",
main_configs=[
"configs/ports.xml",
"configs/log.xml",
"configs/session_log.xml",
],
user_configs=["configs/users.xml"],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
)
def grpc_get_url():
return f"{instance.ip_address}:{GRPC_PORT}"
def grpc_create_insecure_channel():
channel = grpc.insecure_channel(grpc_get_url())
grpc.channel_ready_future(channel).result(timeout=2)
return channel
session_id_counter = 0
def next_session_id():
global session_id_counter
session_id = session_id_counter
session_id_counter += 1
return str(session_id)
def grpc_query(query, user_, pass_, raise_exception):
try:
query_info = clickhouse_grpc_pb2.QueryInfo(
query=query,
session_id=next_session_id(),
user_name=user_,
password=pass_,
)
channel = grpc_create_insecure_channel()
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
result = stub.ExecuteQuery(query_info)
if result and result.HasField("exception"):
raise Exception(result.exception.display_text)
return result.output.decode(DEFAULT_ENCODING)
except Exception:
assert raise_exception
def postgres_query(query, user_, pass_, raise_exception):
try:
client = py_psql.connect(
host=instance.ip_address,
port=POSTGRES_SERVER_PORT,
user=user_,
password=pass_,
database="default",
)
cursor = client.cursor()
cursor.execute(query)
cursor.fetchall()
except Exception:
assert raise_exception
def mysql_query(query, user_, pass_, raise_exception):
try:
client = pymysql.connections.Connection(
host=instance.ip_address,
user=user_,
password=pass_,
database="default",
port=MYSQL_SERVER_PORT,
)
cursor = client.cursor(pymysql.cursors.DictCursor)
if raise_exception:
with pytest.raises(Exception):
cursor.execute(query)
else:
cursor.execute(query)
cursor.fetchall()
except Exception:
assert raise_exception
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_grpc_session(started_cluster):
grpc_query("SELECT 1", "grpc_user", "pass", False)
grpc_query("SELECT 2", "grpc_user", "wrong_pass", True)
grpc_query("SELECT 3", "wrong_grpc_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "grpc_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'Logout'"
)
assert logout_records == "grpc_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "grpc_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_mysql_session(started_cluster):
mysql_query("SELECT 1", "mysql_user", "pass", False)
mysql_query("SELECT 2", "mysql_user", "wrong_pass", True)
mysql_query("SELECT 3", "wrong_mysql_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "mysql_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'Logout'"
)
assert logout_records == "mysql_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "mysql_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_postgres_session(started_cluster):
postgres_query("SELECT 1", "postgres_user", "pass", False)
postgres_query("SELECT 2", "postgres_user", "wrong_pass", True)
postgres_query("SELECT 3", "wrong_postgres_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "postgres_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'Logout'"
)
assert logout_records == "postgres_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "postgres_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_parallel_sessions(started_cluster):
thread_list = []
for _ in range(10):
# Sleep time does not significantly matter here,
# test should pass even without sleeping.
for function in [postgres_query, grpc_query, mysql_query]:
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"parallel_user",
"pass",
False,
),
)
thread.start()
thread_list.append(thread)
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"parallel_user",
"wrong_pass",
True,
),
)
thread.start()
thread_list.append(thread)
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"wrong_parallel_user",
"pass",
True,
),
)
thread.start()
thread_list.append(thread)
for thread in thread_list:
thread.join()
instance.query("SYSTEM FLUSH LOGS")
port_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user'"
)
assert port_0_sessions == "90\n"
port_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_port = 0"
)
assert port_0_sessions == "0\n"
address_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_address = toIPv6('::')"
)
assert address_0_sessions == "0\n"
grpc_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'gRPC'"
)
assert grpc_sessions == "30\n"
mysql_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'MySQL'"
)
assert mysql_sessions == "30\n"
postgres_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'PostgreSQL'"
)
assert postgres_sessions == "30\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'Logout')"
)
assert logins_and_logouts == "30\n"
logout_failure_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginFailure'"
)
assert logout_failure_sessions == "30\n"

View File

@ -0,0 +1,34 @@
sessions:
150
port_0_sessions:
0
address_0_sessions:
0
tcp_sessions
60
http_sessions
30
http_with_session_id_sessions
30
my_sql_sessions
30
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10

View File

@ -0,0 +1,138 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
# Each user uses a separate thread.
readonly TCP_USERS=( "02833_TCP_USER_${PID}"_{1,2} ) # 2 concurrent TCP users
readonly HTTP_USERS=( "02833_HTTP_USER_${PID}" )
readonly HTTP_WITH_SESSION_ID_SESSION_USERS=( "02833_HTTP_WITH_SESSION_ID_USER_${PID}" )
readonly MYSQL_USERS=( "02833_MYSQL_USER_${PID}")
readonly ALL_USERS=( "${TCP_USERS[@]}" "${HTTP_USERS[@]}" "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}" "${MYSQL_USERS[@]}" )
readonly TCP_USERS_SQL_COLLECTION_STRING="$( echo "${TCP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_WITH_SESSION_ID_SESSION_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly MYSQL_USERS_SQL_COLLECTION_STRING="$( echo "${MYSQL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly ALL_USERS_SQL_COLLECTION_STRING="$( echo "${ALL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${user} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${user}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${user}";
done
# All <type>_session functions execute in separate threads.
# These functions try to create a session with successful login and logout.
# Sleep a small, random amount of time to make concurrency more intense.
# and try to login with an invalid password.
function tcp_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 2" --user="${user}" --password 'invalid'
done
}
function http_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=wrong" -d "SELECT 4"
done
}
function http_with_session_id_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=wrong" -d "SELECT 6"
done
}
function mysql_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 1 FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'wrong', SETTINGS connection_max_tries=1)"
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
export -f mysql_session;
for user in "${TCP_USERS[@]}"; do
timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_USERS[@]}"; do
timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do
timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 &
done
for user in "${MYSQL_USERS[@]}"; do
timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_address = toIPv6('::')"
echo "tcp_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${TCP_USERS_SQL_COLLECTION_STRING}) AND interface = 'TCP'"
echo "http_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "http_with_session_id_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "my_sql_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "DROP USER ${user}"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
done

View File

@ -0,0 +1,13 @@
0
0
0
0
client_port 0 connections:
0
client_address '::' connections:
0
login failures:
0
TCP Login and logout count is equal
HTTP Login and logout count is equal
MySQL Login and logout count is equal

View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER=$"02834_USER_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TEMPORARY TABLE, MYSQL, REMOTE ON *.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "client_port 0 connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_port = 0"
echo "client_address '::' connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_address = toIPv6('::')"
echo "login failures:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and type = 'LoginFailure'"
# remote(...) function sometimes reuses old cached sessions for query execution.
# This makes LoginSuccess/Logout entries count unstable, but success and logouts must always match.
for interface in 'TCP' 'HTTP' 'MySQL'
do
LOGIN_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}'"`
CORRESPONDING_LOGOUT_RECORDS_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}')"`
if [ "$LOGIN_COUNT" == "$CORRESPONDING_LOGOUT_RECORDS_COUNT" ]; then
echo "${interface} Login and logout count is equal"
else
TOTAL_LOGOUT_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}'"`
echo "${interface} Login count ${LOGIN_COUNT} != corresponding logout count ${CORRESPONDING_LOGOUT_RECORDS_COUNT}. TOTAL_LOGOUT_COUNT ${TOTAL_LOGOUT_COUNT}"
fi
done
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"

View File

@ -0,0 +1,8 @@
port_0_sessions:
0
address_0_sessions:
0
Corresponding LoginSuccess/Logout
9
LoginFailure
0

View File

@ -0,0 +1,113 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER="02835_USER_${PID}"
readonly TEST_ROLE="02835_ROLE_${PID}"
readonly TEST_PROFILE="02835_PROFILE_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
function tcp_session()
{
local user=$1
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.numbers" --user="${user}"
}
function http_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
function http_with_session_id_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
# Busy-waits until user $1, specified amount of queries ($2) will run simultaneously.
function wait_for_queries_start()
{
local user=$1
local queries_count=$2
# 10 seconds waiting
counter=0 retries=100
while [[ $counter -lt $retries ]]; do
result=$($CLICKHOUSE_CLIENT --query "SELECT COUNT(*) FROM system.processes WHERE user = '${user}'")
if [[ $result == "${queries_count}" ]]; then
break;
fi
sleep 0.1
((++counter))
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
# DROP USE CASE
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP ROLE CASE
${CLICKHOUSE_CLIENT} -q "CREATE ROLE IF NOT EXISTS ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} DEFAULT ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP PROFILE CASE
${CLICKHOUSE_CLIENT} -q "CREATE SETTINGS PROFILE IF NOT EXISTS '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_address = toIPv6('::')"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS}, FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginFailure'"