Merge branch 'master' into fix-00906_low_cardinality_cache

This commit is contained in:
Alexey Milovidov 2023-08-11 04:42:43 +03:00 committed by GitHub
commit f3186c939e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 72 additions and 759 deletions

View File

@ -1,13 +1,11 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h> #include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <Formats/ReadSchemaUtils.h> #include <Formats/ReadSchemaUtils.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Formats/ISchemaReader.h> #include <Processors/Formats/ISchemaReader.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <IO/WithFileName.h>
namespace DB namespace DB
{ {
@ -53,6 +51,7 @@ ColumnsDescription readSchemaFromFormat(
bool retry, bool retry,
ContextPtr & context, ContextPtr & context,
std::unique_ptr<ReadBuffer> & buf) std::unique_ptr<ReadBuffer> & buf)
try
{ {
NamesAndTypesList names_and_types; NamesAndTypesList names_and_types;
if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name)) if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name))
@ -209,12 +208,23 @@ ColumnsDescription readSchemaFromFormat(
ErrorCodes::BAD_ARGUMENTS, ErrorCodes::BAD_ARGUMENTS,
"{} file format doesn't support schema inference. You must specify the structure manually", "{} file format doesn't support schema inference. You must specify the structure manually",
format_name); format_name);
/// Some formats like CSVWithNames can contain empty column names. We don't support empty column names and further processing can fail with an exception. Let's just remove columns with empty names from the structure. /// Some formats like CSVWithNames can contain empty column names. We don't support empty column names and further processing can fail with an exception. Let's just remove columns with empty names from the structure.
names_and_types.erase( names_and_types.erase(
std::remove_if(names_and_types.begin(), names_and_types.end(), [](const NameAndTypePair & pair) { return pair.name.empty(); }), std::remove_if(names_and_types.begin(), names_and_types.end(), [](const NameAndTypePair & pair) { return pair.name.empty(); }),
names_and_types.end()); names_and_types.end());
return ColumnsDescription(names_and_types); return ColumnsDescription(names_and_types);
} }
catch (Exception & e)
{
if (!buf)
throw;
auto file_name = getFileNameFromReadBuffer(*buf);
if (!file_name.empty())
e.addMessage(fmt::format("(in file/uri {})", file_name));
throw;
}
ColumnsDescription readSchemaFromFormat( ColumnsDescription readSchemaFromFormat(
const String & format_name, const String & format_name,

View File

@ -520,8 +520,6 @@ ContextMutablePtr Session::makeSessionContext()
{}, {},
session_context->getSettingsRef().max_sessions_for_user); session_context->getSettingsRef().max_sessions_for_user);
recordLoginSucess(session_context);
return session_context; return session_context;
} }
@ -584,8 +582,6 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
{ session_name_ }, { session_name_ },
max_sessions_for_user); max_sessions_for_user);
recordLoginSucess(session_context);
return session_context; return session_context;
} }
@ -659,35 +655,21 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
if (user_id) if (user_id)
user = query_context->getUser(); user = query_context->getUser();
/// Interserver does not create session context if (!notified_session_log_about_login)
recordLoginSucess(query_context);
return query_context;
}
void Session::recordLoginSucess(ContextPtr login_context) const
{
if (notified_session_log_about_login)
return;
if (!login_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session or query context must be created");
if (auto session_log = getSessionLog())
{ {
const auto & settings = login_context->getSettingsRef(); if (auto session_log = getSessionLog())
const auto access = login_context->getAccess(); {
session_log->addLoginSuccess(
auth_id,
named_session ? std::optional<std::string>(named_session->key.second) : std::nullopt,
*query_context,
user);
session_log->addLoginSuccess(auth_id, notified_session_log_about_login = true;
named_session ? named_session->key.second : "", }
settings,
access,
getClientInfo(),
user);
} }
notified_session_log_about_login = true; return query_context;
} }

View File

@ -97,8 +97,6 @@ public:
private: private:
std::shared_ptr<SessionLog> getSessionLog() const; std::shared_ptr<SessionLog> getSessionLog() const;
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const; ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;
void recordLoginSucess(ContextPtr login_context) const;
mutable bool notified_session_log_about_login = false; mutable bool notified_session_log_about_login = false;
const UUID auth_id; const UUID auth_id;

View File

@ -199,13 +199,12 @@ void SessionLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insertData(auth_failure_reason.data(), auth_failure_reason.length()); columns[i++]->insertData(auth_failure_reason.data(), auth_failure_reason.length());
} }
void SessionLog::addLoginSuccess(const UUID & auth_id, void SessionLog::addLoginSuccess(const UUID & auth_id, std::optional<String> session_id, const Context & login_context, const UserPtr & login_user)
const String & session_id,
const Settings & settings,
const ContextAccessPtr & access,
const ClientInfo & client_info,
const UserPtr & login_user)
{ {
const auto access = login_context.getAccess();
const auto & settings = login_context.getSettingsRef();
const auto & client_info = login_context.getClientInfo();
DB::SessionLogElement log_entry(auth_id, SESSION_LOGIN_SUCCESS); DB::SessionLogElement log_entry(auth_id, SESSION_LOGIN_SUCCESS);
log_entry.client_info = client_info; log_entry.client_info = client_info;
@ -216,7 +215,8 @@ void SessionLog::addLoginSuccess(const UUID & auth_id,
} }
log_entry.external_auth_server = login_user ? login_user->auth_data.getLDAPServerName() : ""; log_entry.external_auth_server = login_user ? login_user->auth_data.getLDAPServerName() : "";
log_entry.session_id = session_id; if (session_id)
log_entry.session_id = *session_id;
if (const auto roles_info = access->getRolesInfo()) if (const auto roles_info = access->getRolesInfo())
log_entry.roles = roles_info->getCurrentRolesNames(); log_entry.roles = roles_info->getCurrentRolesNames();

View File

@ -20,7 +20,6 @@ enum SessionLogElementType : int8_t
class ContextAccess; class ContextAccess;
struct User; struct User;
using UserPtr = std::shared_ptr<const User>; using UserPtr = std::shared_ptr<const User>;
using ContextAccessPtr = std::shared_ptr<const ContextAccess>;
/** A struct which will be inserted as row into session_log table. /** A struct which will be inserted as row into session_log table.
* *
@ -73,13 +72,7 @@ class SessionLog : public SystemLog<SessionLogElement>
using SystemLog<SessionLogElement>::SystemLog; using SystemLog<SessionLogElement>::SystemLog;
public: public:
void addLoginSuccess(const UUID & auth_id, void addLoginSuccess(const UUID & auth_id, std::optional<String> session_id, const Context & login_context, const UserPtr & login_user);
const String & session_id,
const Settings & settings,
const ContextAccessPtr & access,
const ClientInfo & client_info,
const UserPtr & login_user);
void addLoginFailure(const UUID & auth_id, const ClientInfo & info, const std::optional<String> & user, const Exception & reason); void addLoginFailure(const UUID & auth_id, const ClientInfo & info, const std::optional<String> & user, const Exception & reason);
void addLogOut(const UUID & auth_id, const UserPtr & login_user, const ClientInfo & client_info); void addLogOut(const UUID & auth_id, const UserPtr & login_user, const ClientInfo & client_info);
}; };

View File

@ -86,7 +86,21 @@ void IRowInputFormat::logError()
Chunk IRowInputFormat::generate() Chunk IRowInputFormat::generate()
{ {
if (total_rows == 0) if (total_rows == 0)
readPrefix(); {
try
{
readPrefix();
}
catch (Exception & e)
{
auto file_name = getFileNameFromReadBuffer(getReadBuffer());
if (!file_name.empty())
e.addMessage(fmt::format("(in file/uri {})", file_name));
e.addMessage("(while reading header)");
throw;
}
}
const Block & header = getPort().getHeader(); const Block & header = getPort().getHeader();

View File

@ -561,7 +561,8 @@ void HTTPHandler::processQuery(
session->makeSessionContext(); session->makeSessionContext();
} }
auto context = session->makeQueryContext(); auto client_info = session->getClientInfo();
auto context = session->makeQueryContext(std::move(client_info));
/// This parameter is used to tune the behavior of output formats (such as Native) for compatibility. /// This parameter is used to tune the behavior of output formats (such as Native) for compatibility.
if (params.has("client_protocol_version")) if (params.has("client_protocol_version"))

View File

@ -114,7 +114,6 @@ test_quota/test.py::test_tracking_quota
test_quota/test.py::test_users_xml_is_readonly test_quota/test.py::test_users_xml_is_readonly
test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database
test_profile_events_s3/test.py::test_profile_events test_profile_events_s3/test.py::test_profile_events
test_system_flush_logs/test.py::test_system_logs[system.text_log-0]
test_user_defined_object_persistence/test.py::test_persistence test_user_defined_object_persistence/test.py::test_persistence
test_settings_profile/test.py::test_show_profiles test_settings_profile/test.py::test_show_profiles
test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functions_on_cluster test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functions_on_cluster

View File

@ -27,7 +27,10 @@ proto_dir = os.path.join(SCRIPT_DIR, "./protos")
gen_dir = os.path.join(SCRIPT_DIR, "./_gen") gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
os.makedirs(gen_dir, exist_ok=True) os.makedirs(gen_dir, exist_ok=True)
run_and_check( run_and_check(
f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto", "python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} \
{proto_dir}/clickhouse_grpc.proto".format(
proto_dir=proto_dir, gen_dir=gen_dir
),
shell=True, shell=True,
) )

View File

@ -1 +0,0 @@
_gen

View File

@ -1,9 +0,0 @@
<clickhouse>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
</clickhouse>

View File

@ -1,9 +0,0 @@
<clickhouse>
<postgresql_port>5433</postgresql_port>
<mysql_port>9001</mysql_port>
<grpc_port>9100</grpc_port>
<grpc replace="replace">
<!-- Enable if you want very detailed logs -->
<verbose_logs>false</verbose_logs>
</grpc>
</clickhouse>

View File

@ -1,9 +0,0 @@
<clickhouse>
<session_log>
<database>system</database>
<table>session_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</session_log>
</clickhouse>

View File

@ -1,23 +0,0 @@
<clickhouse>
<profiles>
<default>
<function_sleep_max_microseconds_per_block>0</function_sleep_max_microseconds_per_block>
</default>
</profiles>
<users>
<default>
</default>
<mysql_user>
<password>pass</password>
</mysql_user>
<postgres_user>
<password>pass</password>
</postgres_user>
<grpc_user>
<password>pass</password>
</grpc_user>
<parallel_user>
<password>pass</password>
</parallel_user>
</users>
</clickhouse>

View File

@ -1 +0,0 @@
../../../../src/Server/grpc_protos/clickhouse_grpc.proto

View File

@ -1,289 +0,0 @@
import os
import grpc
import pymysql.connections
import psycopg2 as py_psql
import pytest
import random
import sys
import threading
from helpers.cluster import ClickHouseCluster, run_and_check
POSTGRES_SERVER_PORT = 5433
MYSQL_SERVER_PORT = 9001
GRPC_PORT = 9100
SESSION_LOG_MATCHING_FIELDS = "auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DEFAULT_ENCODING = "utf-8"
# Use grpcio-tools to generate *pb2.py files from *.proto.
proto_dir = os.path.join(SCRIPT_DIR, "./protos")
gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
os.makedirs(gen_dir, exist_ok=True)
run_and_check(
f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
shell=True,
)
sys.path.append(gen_dir)
import clickhouse_grpc_pb2
import clickhouse_grpc_pb2_grpc
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"node",
main_configs=[
"configs/ports.xml",
"configs/log.xml",
"configs/session_log.xml",
],
user_configs=["configs/users.xml"],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
)
def grpc_get_url():
return f"{instance.ip_address}:{GRPC_PORT}"
def grpc_create_insecure_channel():
channel = grpc.insecure_channel(grpc_get_url())
grpc.channel_ready_future(channel).result(timeout=2)
return channel
session_id_counter = 0
def next_session_id():
global session_id_counter
session_id = session_id_counter
session_id_counter += 1
return str(session_id)
def grpc_query(query, user_, pass_, raise_exception):
try:
query_info = clickhouse_grpc_pb2.QueryInfo(
query=query,
session_id=next_session_id(),
user_name=user_,
password=pass_,
)
channel = grpc_create_insecure_channel()
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
result = stub.ExecuteQuery(query_info)
if result and result.HasField("exception"):
raise Exception(result.exception.display_text)
return result.output.decode(DEFAULT_ENCODING)
except Exception:
assert raise_exception
def postgres_query(query, user_, pass_, raise_exception):
try:
client = py_psql.connect(
host=instance.ip_address,
port=POSTGRES_SERVER_PORT,
user=user_,
password=pass_,
database="default",
)
cursor = client.cursor()
cursor.execute(query)
cursor.fetchall()
except Exception:
assert raise_exception
def mysql_query(query, user_, pass_, raise_exception):
try:
client = pymysql.connections.Connection(
host=instance.ip_address,
user=user_,
password=pass_,
database="default",
port=MYSQL_SERVER_PORT,
)
cursor = client.cursor(pymysql.cursors.DictCursor)
if raise_exception:
with pytest.raises(Exception):
cursor.execute(query)
else:
cursor.execute(query)
cursor.fetchall()
except Exception:
assert raise_exception
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_grpc_session(started_cluster):
grpc_query("SELECT 1", "grpc_user", "pass", False)
grpc_query("SELECT 2", "grpc_user", "wrong_pass", True)
grpc_query("SELECT 3", "wrong_grpc_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "grpc_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'Logout'"
)
assert logout_records == "grpc_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "grpc_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_mysql_session(started_cluster):
mysql_query("SELECT 1", "mysql_user", "pass", False)
mysql_query("SELECT 2", "mysql_user", "wrong_pass", True)
mysql_query("SELECT 3", "wrong_mysql_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "mysql_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'Logout'"
)
assert logout_records == "mysql_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "mysql_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_postgres_session(started_cluster):
postgres_query("SELECT 1", "postgres_user", "pass", False)
postgres_query("SELECT 2", "postgres_user", "wrong_pass", True)
postgres_query("SELECT 3", "wrong_postgres_user", "pass", True)
instance.query("SYSTEM FLUSH LOGS")
login_success_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginSuccess'"
)
assert login_success_records == "postgres_user\t1\t1\n"
logout_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'Logout'"
)
assert logout_records == "postgres_user\t1\t1\n"
login_failure_records = instance.query(
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginFailure'"
)
assert login_failure_records == "postgres_user\t1\t1\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'Logout')"
)
assert logins_and_logouts == "1\n"
def test_parallel_sessions(started_cluster):
thread_list = []
for _ in range(10):
# Sleep time does not significantly matter here,
# test should pass even without sleeping.
for function in [postgres_query, grpc_query, mysql_query]:
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"parallel_user",
"pass",
False,
),
)
thread.start()
thread_list.append(thread)
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"parallel_user",
"wrong_pass",
True,
),
)
thread.start()
thread_list.append(thread)
thread = threading.Thread(
target=function,
args=(
f"SELECT sleep({random.uniform(0.03, 0.04)})",
"wrong_parallel_user",
"pass",
True,
),
)
thread.start()
thread_list.append(thread)
for thread in thread_list:
thread.join()
instance.query("SYSTEM FLUSH LOGS")
port_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user'"
)
assert port_0_sessions == "90\n"
port_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_port = 0"
)
assert port_0_sessions == "0\n"
address_0_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_address = toIPv6('::')"
)
assert address_0_sessions == "0\n"
grpc_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'gRPC'"
)
assert grpc_sessions == "30\n"
mysql_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'MySQL'"
)
assert mysql_sessions == "30\n"
postgres_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'PostgreSQL'"
)
assert postgres_sessions == "30\n"
logins_and_logouts = instance.query(
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'Logout')"
)
assert logins_and_logouts == "30\n"
logout_failure_sessions = instance.query(
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginFailure'"
)
assert logout_failure_sessions == "30\n"

View File

@ -45,7 +45,11 @@ def test_system_logs(flush_logs, table, exists):
if exists: if exists:
node.query(q) node.query(q)
else: else:
assert "Table {} doesn't exist".format(table) in node.query_and_get_error(q) response = node.query_and_get_error(q)
assert (
"Table {} doesn't exist".format(table) in response
or "Unknown table expression identifier '{}'".format(table) in response
)
# Logic is tricky, let's check that there is no hang in case of message queue # Logic is tricky, let's check that there is no hang in case of message queue

View File

@ -1,34 +0,0 @@
sessions:
150
port_0_sessions:
0
address_0_sessions:
0
tcp_sessions
60
http_sessions
30
http_with_session_id_sessions
30
my_sql_sessions
30
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10
Corresponding LoginSuccess/Logout
10
LoginFailure
10

View File

@ -1,138 +0,0 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
# Each user uses a separate thread.
readonly TCP_USERS=( "02833_TCP_USER_${PID}"_{1,2} ) # 2 concurrent TCP users
readonly HTTP_USERS=( "02833_HTTP_USER_${PID}" )
readonly HTTP_WITH_SESSION_ID_SESSION_USERS=( "02833_HTTP_WITH_SESSION_ID_USER_${PID}" )
readonly MYSQL_USERS=( "02833_MYSQL_USER_${PID}")
readonly ALL_USERS=( "${TCP_USERS[@]}" "${HTTP_USERS[@]}" "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}" "${MYSQL_USERS[@]}" )
readonly TCP_USERS_SQL_COLLECTION_STRING="$( echo "${TCP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_WITH_SESSION_ID_SESSION_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly MYSQL_USERS_SQL_COLLECTION_STRING="$( echo "${MYSQL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly ALL_USERS_SQL_COLLECTION_STRING="$( echo "${ALL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${user} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${user}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${user}";
done
# All <type>_session functions execute in separate threads.
# These functions try to create a session with successful login and logout.
# Sleep a small, random amount of time to make concurrency more intense.
# and try to login with an invalid password.
function tcp_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 2" --user="${user}" --password 'invalid'
done
}
function http_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=wrong" -d "SELECT 4"
done
}
function http_with_session_id_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}"
# login failure
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=wrong" -d "SELECT 6"
done
}
function mysql_session()
{
local user=$1
local i=0
while (( (i++) < 10 )); do
# login logout
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')"
# login failure
${CLICKHOUSE_CLIENT} -q "SELECT 1 FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'wrong', SETTINGS connection_max_tries=1)"
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
export -f mysql_session;
for user in "${TCP_USERS[@]}"; do
timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_USERS[@]}"; do
timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 &
done
for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do
timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 &
done
for user in "${MYSQL_USERS[@]}"; do
timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_address = toIPv6('::')"
echo "tcp_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${TCP_USERS_SQL_COLLECTION_STRING}) AND interface = 'TCP'"
echo "http_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "http_with_session_id_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
echo "my_sql_sessions"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'"
for user in "${ALL_USERS[@]}"; do
${CLICKHOUSE_CLIENT} -q "DROP USER ${user}"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
done

View File

@ -1,13 +0,0 @@
0
0
0
0
client_port 0 connections:
0
client_address '::' connections:
0
login failures:
0
TCP Login and logout count is equal
HTTP Login and logout count is equal
MySQL Login and logout count is equal

View File

@ -1,56 +0,0 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER=$"02834_USER_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER} IDENTIFIED WITH plaintext_password BY 'pass'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TEMPORARY TABLE, MYSQL, REMOTE ON *.* TO ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
-d "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "client_port 0 connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_port = 0"
echo "client_address '::' connections:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_address = toIPv6('::')"
echo "login failures:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and type = 'LoginFailure'"
# remote(...) function sometimes reuses old cached sessions for query execution.
# This makes LoginSuccess/Logout entries count unstable, but success and logouts must always match.
for interface in 'TCP' 'HTTP' 'MySQL'
do
LOGIN_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}'"`
CORRESPONDING_LOGOUT_RECORDS_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}')"`
if [ "$LOGIN_COUNT" == "$CORRESPONDING_LOGOUT_RECORDS_COUNT" ]; then
echo "${interface} Login and logout count is equal"
else
TOTAL_LOGOUT_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}'"`
echo "${interface} Login count ${LOGIN_COUNT} != corresponding logout count ${CORRESPONDING_LOGOUT_RECORDS_COUNT}. TOTAL_LOGOUT_COUNT ${TOTAL_LOGOUT_COUNT}"
fi
done
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"

View File

@ -1,8 +0,0 @@
port_0_sessions:
0
address_0_sessions:
0
Corresponding LoginSuccess/Logout
9
LoginFailure
0

View File

@ -1,114 +0,0 @@
#!/usr/bin/env bash
# Tags: no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
readonly PID=$$
readonly TEST_USER="02835_USER_${PID}"
readonly TEST_ROLE="02835_ROLE_${PID}"
readonly TEST_PROFILE="02835_PROFILE_${PID}"
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
function tcp_session()
{
local user=$1
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.numbers" --user="${user}"
}
function http_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
function http_with_session_id_session()
{
local user=$1
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
}
# Busy-waits until user $1, specified amount of queries ($2) will run simultaneously.
function wait_for_queries_start()
{
local user=$1
local queries_count=$2
# 10 seconds waiting
counter=0 retries=100
while [[ $counter -lt $retries ]]; do
result=$($CLICKHOUSE_CLIENT --query "SELECT COUNT(*) FROM system.processes WHERE user = '${user}'")
if [[ $result == "${queries_count}" ]]; then
break;
fi
sleep 0.1
((++counter))
done
}
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
# DROP USE CASE
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
export -f tcp_session;
export -f http_session;
export -f http_with_session_id_session;
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP ROLE CASE
${CLICKHOUSE_CLIENT} -q "CREATE ROLE IF NOT EXISTS ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} DEFAULT ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP ROLE ${TEST_ROLE}"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
# DROP PROFILE CASE
${CLICKHOUSE_CLIENT} -q "CREATE SETTINGS PROFILE IF NOT EXISTS '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
wait_for_queries_start $TEST_USER 3
${CLICKHOUSE_CLIENT} -q "DROP SETTINGS PROFILE '${TEST_PROFILE}'"
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
wait
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
echo "port_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_port = 0"
echo "address_0_sessions:"
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_address = toIPv6('::')"
echo "Corresponding LoginSuccess/Logout"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS}, FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout')"
echo "LoginFailure"
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginFailure'"

View File

@ -0,0 +1,2 @@
in file/uri
test.csv

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
FILENAME="${CLICKHOUSE_TMP}/test.csv"
printf 'Bad\rHeader\n123\n' > "${FILENAME}"
${CLICKHOUSE_LOCAL} --query "SELECT * FROM file('${CLICKHOUSE_TMP}/t*e*s*t.csv')" 2>&1 | grep -o -P 'in file/uri|test\.csv'
rm "${FILENAME}"