mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 08:52:06 +00:00
3e68103ac8
Right if you are executing remote() and later the query will go to the cluster with interserver secret, then you should have the same user on the nodes from that cluster, otherwise the query will fail with: DB::NetException: Connection reset by peer And on the remote node: <Debug> TCPHandler: User (initial, interserver mode): new_user (client: 172.16.1.5:40536) <Debug> TCP_INTERSERVER-Session: d29ecf7d-2c1c-44d2-8cc9-4ab08175bf05 Authentication failed with error: new_user: Authentication failed: password is incorrect, or there is no user with such name. <Error> ServerErrorHandler: Code: 516. DB::Exception: new_user: Authentication failed: password is incorrect, or there is no user with such name. (AUTHENTICATION_FAILED), Stack trace (when copying this message, always include the lines below): The problem is that remote() will not use passed to it user in any form, and instead, the initial user will be used, i.e. "cli_user" not "query_user": chc --user cli_user -q "select * from remote(node, default, some_dist_table, 'query_user')" Fix this by using the user from query for the remote(). Note, that the Distributed over Distributed in case of tables still wont work, for this you have to have the same users on all nodes in all clusters that are involved in case of interserver secret is enabled (see also test). Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> v2: move client initial_user adjustment into ClusterProxy/executeQuery.cpp v3: we cannot check for interserver_mode in updateSettingsAndClientInfoForCluster() since it is not yet interserver in remote() context
467 lines
15 KiB
Python
467 lines
15 KiB
Python
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=line-too-long
|
|
|
|
import pytest
|
|
import uuid
|
|
import time
|
|
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
|
|
def make_instance(name, *args, **kwargs):
|
|
main_configs = kwargs.pop("main_configs", [])
|
|
main_configs.append("configs/remote_servers.xml")
|
|
user_configs = kwargs.pop("user_configs", [])
|
|
user_configs.append("configs/users.xml")
|
|
return cluster.add_instance(
|
|
name,
|
|
with_zookeeper=True,
|
|
main_configs=main_configs,
|
|
user_configs=user_configs,
|
|
*args,
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
# DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 added in 23.3, ensure that CLICKHOUSE_CI_MIN_TESTED_VERSION fits
|
|
assert CLICKHOUSE_CI_MIN_TESTED_VERSION < "23.3"
|
|
|
|
# _n1/_n2 contains cluster with different <secret> -- should fail
|
|
# only n1 contains new_user
|
|
n1 = make_instance(
|
|
"n1",
|
|
main_configs=["configs/remote_servers_n1.xml"],
|
|
user_configs=["configs/users.d/new_user.xml"],
|
|
)
|
|
n2 = make_instance("n2", main_configs=["configs/remote_servers_n2.xml"])
|
|
backward = make_instance(
|
|
"backward",
|
|
main_configs=["configs/remote_servers_backward.xml"],
|
|
image="clickhouse/clickhouse-server",
|
|
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
|
|
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
|
|
with_installed_binary=True,
|
|
)
|
|
|
|
users = pytest.mark.parametrize(
|
|
"user,password",
|
|
[
|
|
("default", ""),
|
|
("nopass", ""),
|
|
("pass", "foo"),
|
|
],
|
|
)
|
|
|
|
|
|
def bootstrap():
|
|
for n in list(cluster.instances.values()):
|
|
n.query("DROP TABLE IF EXISTS data")
|
|
n.query("DROP TABLE IF EXISTS data_from_buffer")
|
|
n.query("DROP TABLE IF EXISTS dist")
|
|
n.query("CREATE TABLE data (key Int) Engine=Memory()")
|
|
n.query("CREATE TABLE data_from_buffer (key Int) Engine=Memory()")
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_insecure AS data
|
|
Engine=Distributed(insecure, currentDatabase(), data, key)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_secure AS data
|
|
Engine=Distributed(secure, currentDatabase(), data, key)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_secure_backward AS data
|
|
Engine=Distributed(secure_backward, currentDatabase(), data, key)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_secure_from_buffer AS data_from_buffer
|
|
Engine=Distributed(secure, currentDatabase(), data_from_buffer, key)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_secure_disagree AS data
|
|
Engine=Distributed(secure_disagree, currentDatabase(), data, key)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_secure_buffer AS dist_secure_from_buffer
|
|
Engine=Buffer(currentDatabase(), dist_secure_from_buffer,
|
|
/* settings for manual flush only */
|
|
1, /* num_layers */
|
|
0, /* min_time, placeholder */
|
|
0, /* max_time, placeholder */
|
|
0, /* min_rows */
|
|
0, /* max_rows */
|
|
0, /* min_bytes */
|
|
0 /* max_bytes */
|
|
)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_over_dist_secure AS data
|
|
Engine=Distributed(secure, currentDatabase(), dist_secure, key)
|
|
"""
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
bootstrap()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
# @return -- [user, initial_user]
|
|
def get_query_user_info(node, query_pattern):
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
return (
|
|
node.query(
|
|
"""
|
|
SELECT user, initial_user
|
|
FROM system.query_log
|
|
WHERE
|
|
query LIKE '%{}%' AND
|
|
query NOT LIKE '%system.query_log%' AND
|
|
type = 'QueryFinish'
|
|
""".format(
|
|
query_pattern
|
|
)
|
|
)
|
|
.strip()
|
|
.split("\t")
|
|
)
|
|
|
|
|
|
# @return -- [user, initial_user]
|
|
def get_query_user_info_by_id(node, query_id):
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
return (
|
|
node.query(
|
|
"""
|
|
SELECT user, initial_user
|
|
FROM system.query_log
|
|
WHERE
|
|
query_id = '{}' AND
|
|
type = 'QueryFinish'
|
|
""".format(
|
|
query_id
|
|
)
|
|
)
|
|
.strip()
|
|
.split("\t")
|
|
)
|
|
|
|
|
|
# @return -- settings
|
|
def get_query_setting_on_shard(node, query_pattern, setting):
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
return node.query(
|
|
"""
|
|
SELECT Settings['{}']
|
|
FROM system.query_log
|
|
WHERE
|
|
query LIKE '%{}%' AND
|
|
NOT is_initial_query AND
|
|
query NOT LIKE '%system.query_log%' AND
|
|
type = 'QueryFinish'
|
|
LIMIT 1
|
|
""".format(
|
|
setting, query_pattern
|
|
)
|
|
).strip()
|
|
|
|
|
|
def test_insecure():
|
|
n1.query("SELECT * FROM dist_insecure")
|
|
|
|
|
|
def test_insecure_insert_async():
|
|
n1.query("TRUNCATE TABLE data")
|
|
n1.query("INSERT INTO dist_insecure SELECT * FROM numbers(2)")
|
|
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER insecure dist_insecure")
|
|
assert int(n1.query("SELECT count() FROM dist_insecure")) == 2
|
|
n1.query("TRUNCATE TABLE data ON CLUSTER insecure")
|
|
|
|
|
|
def test_insecure_insert_sync():
|
|
n1.query("TRUNCATE TABLE data")
|
|
n1.query(
|
|
"INSERT INTO dist_insecure SELECT * FROM numbers(2)",
|
|
settings={"distributed_foreground_insert": 1},
|
|
)
|
|
assert int(n1.query("SELECT count() FROM dist_insecure")) == 2
|
|
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
|
|
|
|
|
|
def test_secure():
|
|
n1.query("SELECT * FROM dist_secure")
|
|
|
|
|
|
def test_secure_insert_async():
|
|
n1.query("TRUNCATE TABLE data")
|
|
n1.query("INSERT INTO dist_secure SELECT * FROM numbers(2)")
|
|
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure")
|
|
assert int(n1.query("SELECT count() FROM dist_secure")) == 2
|
|
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
|
|
|
|
|
|
def test_secure_insert_sync():
|
|
n1.query("TRUNCATE TABLE data")
|
|
n1.query(
|
|
"INSERT INTO dist_secure SELECT * FROM numbers(2)",
|
|
settings={"distributed_foreground_insert": 1},
|
|
)
|
|
assert int(n1.query("SELECT count() FROM dist_secure")) == 2
|
|
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
|
|
|
|
|
|
# INSERT without initial_user
|
|
#
|
|
# Buffer() flush happens with global context, that does not have user
|
|
# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty
|
|
#
|
|
# This is the regression test for the subsequent query that it
|
|
# will not use user from the previous query.
|
|
#
|
|
# The test a little bit complex, but I will try to explain:
|
|
# - first, we need to execute query with the readonly user (regualar SELECT),
|
|
# and then we will execute INSERT, and if the bug is there, then INSERT will
|
|
# use the user from SELECT and will fail (since you cannot do INSERT with
|
|
# readonly=1/2)
|
|
#
|
|
# - the trick with generating random priority (via sed) is to avoid reusing
|
|
# connection from n1 to n2 from another test (and we cannot simply use
|
|
# another pool after ConnectionPoolFactory had been added [1].
|
|
#
|
|
# [1]: https://github.com/ClickHouse/ClickHouse/pull/26318
|
|
#
|
|
# We need at least one change in one of fields of the node/shard definition,
|
|
# and this "priorirty" for us in this test.
|
|
#
|
|
# - after we will ensure that connection is really established from the context
|
|
# of SELECT query, and that the connection will not be established from the
|
|
# context of the INSERT query (but actually it is a no-op since the INSERT
|
|
# will be done in background, due to distributed_foreground_insert=false by
|
|
# default)
|
|
#
|
|
# - if the bug is there, then FLUSH DISTRIBUTED will fail, because it will go
|
|
# from n1 to n2 using previous user.
|
|
#
|
|
# I hope that this will clarify something for the reader.
|
|
def test_secure_insert_buffer_async():
|
|
# Change cluster definition so that the SELECT will always creates new connection
|
|
priority = int(time.time())
|
|
n1.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
f'sed -i "s#<priority>.*</priority>#<priority>{priority}</priority>#" /etc/clickhouse-server/config.d/remote_servers.xml',
|
|
]
|
|
)
|
|
n1.query("SYSTEM RELOAD CONFIG")
|
|
# ensure that SELECT creates new connection (we need separate table for
|
|
# this, so that separate distributed pool will be used)
|
|
query_id = uuid.uuid4().hex
|
|
n1.query("SELECT * FROM dist_secure_from_buffer", user="ro", query_id=query_id)
|
|
assert n1.contains_in_log(
|
|
"{" + query_id + "} <Trace> Connection (n2:9000): Connecting."
|
|
)
|
|
|
|
query_id = uuid.uuid4().hex
|
|
n1.query(
|
|
"INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)", query_id=query_id
|
|
)
|
|
# ensure that INSERT does not creates new connection, so that it will use
|
|
# previous connection that was instantiated with "ro" user (using
|
|
# interserver secret)
|
|
assert not n1.contains_in_log(
|
|
"{" + query_id + "} <Trace> Connection (n2:9000): Connecting."
|
|
)
|
|
assert get_query_user_info_by_id(n1, query_id) == ["default", "default"]
|
|
|
|
# And before the bug was fixed this query will fail with the following error:
|
|
#
|
|
# Code: 164. DB::Exception: Received from 172.16.2.5:9000. DB::Exception: There was an error on [n1:9000]: Code: 164. DB::Exception: Received from n2:9000. DB::Exception: ro: Cannot execute query in readonly mode. (READONLY)
|
|
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer")
|
|
n1.query("OPTIMIZE TABLE dist_secure_buffer")
|
|
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer")
|
|
|
|
# Check user from which the INSERT on the remote node will be executed
|
|
#
|
|
# Incorrect example:
|
|
#
|
|
# {2c55669f-71ad-48fe-98fa-7b475b80718e} <Debug> executeQuery: (from 172.16.1.1:44636, user: ro) INSERT INTO default.data_from_buffer (key) VALUES
|
|
#
|
|
# Correct example:
|
|
#
|
|
# {2c55669f-71ad-48fe-98fa-7b475b80718e} <Debug> executeQuery: (from 0.0.0.0:0, user: ) INSERT INTO default.data_from_buffer (key) VALUES
|
|
#
|
|
assert n2.contains_in_log(
|
|
"executeQuery: (from 0.0.0.0:0, user: ) INSERT INTO default.data_from_buffer (key) VALUES"
|
|
)
|
|
|
|
assert int(n1.query("SELECT count() FROM dist_secure_from_buffer")) == 2
|
|
n1.query("TRUNCATE TABLE data_from_buffer ON CLUSTER secure")
|
|
|
|
|
|
def test_secure_disagree():
|
|
with pytest.raises(QueryRuntimeException):
|
|
n1.query("SELECT * FROM dist_secure_disagree")
|
|
|
|
|
|
def test_secure_disagree_insert():
|
|
n1.query("TRUNCATE TABLE data")
|
|
n1.query("INSERT INTO dist_secure_disagree SELECT * FROM numbers(2)")
|
|
with pytest.raises(QueryRuntimeException):
|
|
n1.query(
|
|
"SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure_disagree dist_secure_disagree"
|
|
)
|
|
# check that the connection will be re-established
|
|
# IOW that we will not get "Unknown BlockInfo field"
|
|
with pytest.raises(QueryRuntimeException):
|
|
assert int(n1.query("SELECT count() FROM dist_secure_disagree")) == 0
|
|
|
|
|
|
@users
|
|
def test_user_insecure_cluster(user, password):
|
|
id_ = "query-dist_insecure-" + user
|
|
n1.query(f"SELECT *, '{id_}' FROM dist_insecure", user=user, password=password)
|
|
assert get_query_user_info(n1, id_) == [
|
|
user,
|
|
user,
|
|
] # due to prefer_localhost_replica
|
|
assert get_query_user_info(n2, id_) == ["default", user]
|
|
|
|
|
|
@users
|
|
def test_user_secure_cluster(user, password):
|
|
id_ = "query-dist_secure-" + user
|
|
n1.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
|
|
assert get_query_user_info(n1, id_) == [user, user]
|
|
assert get_query_user_info(n2, id_) == [user, user]
|
|
|
|
|
|
@users
|
|
def test_per_user_inline_settings_insecure_cluster(user, password):
|
|
id_ = "query-ddl-settings-dist_insecure-" + user
|
|
n1.query(
|
|
f"""
|
|
SELECT *, '{id_}' FROM dist_insecure
|
|
SETTINGS
|
|
prefer_localhost_replica=0,
|
|
max_memory_usage_for_user=1e9,
|
|
max_untracked_memory=0
|
|
""",
|
|
user=user,
|
|
password=password,
|
|
)
|
|
assert get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user") == ""
|
|
|
|
|
|
@users
|
|
def test_per_user_inline_settings_secure_cluster(user, password):
|
|
id_ = "query-ddl-settings-dist_secure-" + user
|
|
n1.query(
|
|
f"""
|
|
SELECT *, '{id_}' FROM dist_secure
|
|
SETTINGS
|
|
prefer_localhost_replica=0,
|
|
max_memory_usage_for_user=1e9,
|
|
max_untracked_memory=0
|
|
""",
|
|
user=user,
|
|
password=password,
|
|
)
|
|
assert int(get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user")) == int(
|
|
1e9
|
|
)
|
|
|
|
|
|
@users
|
|
def test_per_user_protocol_settings_insecure_cluster(user, password):
|
|
id_ = "query-protocol-settings-dist_insecure-" + user
|
|
n1.query(
|
|
f"SELECT *, '{id_}' FROM dist_insecure",
|
|
user=user,
|
|
password=password,
|
|
settings={
|
|
"prefer_localhost_replica": 0,
|
|
"max_memory_usage_for_user": int(1e9),
|
|
"max_untracked_memory": 0,
|
|
},
|
|
)
|
|
assert get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user") == ""
|
|
|
|
|
|
@users
|
|
def test_per_user_protocol_settings_secure_cluster(user, password):
|
|
id_ = "query-protocol-settings-dist_secure-" + user
|
|
n1.query(
|
|
f"SELECT *, '{id_}' FROM dist_secure",
|
|
user=user,
|
|
password=password,
|
|
settings={
|
|
"prefer_localhost_replica": 0,
|
|
"max_memory_usage_for_user": int(1e9),
|
|
"max_untracked_memory": 0,
|
|
},
|
|
)
|
|
assert int(get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user")) == int(
|
|
1e9
|
|
)
|
|
|
|
|
|
@users
|
|
def test_user_secure_cluster_with_backward(user, password):
|
|
id_ = "with-backward-query-dist_secure-" + user
|
|
n1.query(
|
|
f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
|
|
)
|
|
assert get_query_user_info(n1, id_) == [user, user]
|
|
assert get_query_user_info(backward, id_) == [user, user]
|
|
|
|
|
|
@users
|
|
def test_user_secure_cluster_from_backward(user, password):
|
|
id_ = "from-backward-query-dist_secure-" + user
|
|
backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
|
|
assert get_query_user_info(n1, id_) == [user, user]
|
|
assert get_query_user_info(backward, id_) == [user, user]
|
|
|
|
assert n1.contains_in_log(
|
|
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."
|
|
)
|
|
|
|
|
|
def test_secure_cluster_distributed_over_distributed_different_users():
|
|
# This works because we will have initial_user='default'
|
|
n1.query(
|
|
"SELECT * FROM remote('n1', currentDatabase(), dist_secure)", user="new_user"
|
|
)
|
|
# While this is broken because now initial_user='new_user', and n2 does not has it
|
|
with pytest.raises(QueryRuntimeException):
|
|
n2.query(
|
|
"SELECT * FROM remote('n1', currentDatabase(), dist_secure, 'new_user')"
|
|
)
|
|
# And this is still a problem, let's assume that this is OK, since we are
|
|
# expecting that in case of dist-over-dist the clusters are the same (users
|
|
# and stuff).
|
|
with pytest.raises(QueryRuntimeException):
|
|
n1.query("SELECT * FROM dist_over_dist_secure", user="new_user")
|