ClickHouse/tests/integration/test_distributed_inter_server_secret/test.py
Azat Khuzhin c25d6cd624
Rename directory monitor concept into background INSERT (#55978)
* Limit log frequence for "Skipping send data over distributed table" message

After SYSTEM STOP DISTRIBUTED SENDS it will constantly print this
message.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Rename directory monitor concept into async INSERT

Rename the following query settings (with preserving backward
compatiblity, by keeping old name as an alias):
- distributed_directory_monitor_sleep_time_ms -> distributed_async_insert_sleep_time_ms
- distributed_directory_monitor_max_sleep_time_ms -> distributed_async_insert_max_sleep_time_ms
- distributed_directory_monitor_batch -> distributed_async_insert_batch_inserts
- distributed_directory_monitor_split_batch_on_failure -> distributed_async_insert_split_batch_on_failure

Rename the following table settings (with preserving backward
compatiblity, by keeping old name as an alias):
- monitor_batch_inserts -> async_insert_batch
- monitor_split_batch_on_failure -> async_insert_split_batch_on_failure
- directory_monitor_sleep_time_ms -> async_insert_sleep_time_ms
- directory_monitor_max_sleep_time_ms -> async_insert_max_sleep_time_ms

And also update all the references:

    $ gg -e directory_monitor_ -e monitor_ tests docs | cut -d: -f1 | sort -u | xargs sed -e 's/distributed_directory_monitor_sleep_time_ms/distributed_async_insert_sleep_time_ms/g' -e 's/distributed_directory_monitor_max_sleep_time_ms/distributed_async_insert_max_sleep_time_ms/g' -e 's/distributed_directory_monitor_batch_inserts/distributed_async_insert_batch/g' -e 's/distributed_directory_monitor_split_batch_on_failure/distributed_async_insert_split_batch_on_failure/g' -e 's/monitor_batch_inserts/async_insert_batch/g' -e 's/monitor_split_batch_on_failure/async_insert_split_batch_on_failure/g' -e 's/monitor_sleep_time_ms/async_insert_sleep_time_ms/g' -e 's/monitor_max_sleep_time_ms/async_insert_max_sleep_time_ms/g' -i

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Rename async_insert for Distributed into background_insert

This will avoid amigibuity between general async INSERT's and INSERT
into Distributed, which are indeed background, so new term express it
even better.

Mostly done with:

    $ git di HEAD^ --name-only | xargs sed -i -e 's/distributed_async_insert/distributed_background_insert/g' -e 's/async_insert_batch/background_insert_batch/g' -e 's/async_insert_split_batch_on_failure/background_insert_split_batch_on_failure/g' -e 's/async_insert_sleep_time_ms/background_insert_sleep_time_ms/g' -e 's/async_insert_max_sleep_time_ms/background_insert_max_sleep_time_ms/g'

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Mark 02417_opentelemetry_insert_on_distributed_table as long

CI: https://s3.amazonaws.com/clickhouse-test-reports/55978/7a6abb03a0b507e29e999cb7e04f246a119c6f28/stateless_tests_flaky_check__asan_.html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

---------

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-11-01 15:09:39 +01:00

439 lines
14 KiB
Python

# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import pytest
import uuid
import time
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def make_instance(name, cfg, *args, **kwargs):
return cluster.add_instance(
name,
with_zookeeper=True,
main_configs=["configs/remote_servers.xml", cfg],
user_configs=["configs/users.xml"],
*args,
**kwargs,
)
# _n1/_n2 contains cluster with different <secret> -- should fail
n1 = make_instance("n1", "configs/remote_servers_n1.xml")
n2 = make_instance("n2", "configs/remote_servers_n2.xml")
backward = make_instance(
"backward",
"configs/remote_servers_backward.xml",
image="clickhouse/clickhouse-server",
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
tag="23.2.3",
with_installed_binary=True,
allow_analyzer=False,
)
users = pytest.mark.parametrize(
"user,password",
[
("default", ""),
("nopass", ""),
("pass", "foo"),
],
)
def bootstrap():
for n in list(cluster.instances.values()):
n.query("DROP TABLE IF EXISTS data")
n.query("DROP TABLE IF EXISTS data_from_buffer")
n.query("DROP TABLE IF EXISTS dist")
n.query("CREATE TABLE data (key Int) Engine=Memory()")
n.query("CREATE TABLE data_from_buffer (key Int) Engine=Memory()")
n.query(
"""
CREATE TABLE dist_insecure AS data
Engine=Distributed(insecure, currentDatabase(), data, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure AS data
Engine=Distributed(secure, currentDatabase(), data, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure_backward AS data
Engine=Distributed(secure_backward, currentDatabase(), data, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure_from_buffer AS data_from_buffer
Engine=Distributed(secure, currentDatabase(), data_from_buffer, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure_disagree AS data
Engine=Distributed(secure_disagree, currentDatabase(), data, key)
"""
)
n.query(
"""
CREATE TABLE dist_secure_buffer AS dist_secure_from_buffer
Engine=Buffer(currentDatabase(), dist_secure_from_buffer,
/* settings for manual flush only */
1, /* num_layers */
0, /* min_time, placeholder */
0, /* max_time, placeholder */
0, /* min_rows */
0, /* max_rows */
0, /* min_bytes */
0 /* max_bytes */
)
"""
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
bootstrap()
yield cluster
finally:
cluster.shutdown()
# @return -- [user, initial_user]
def get_query_user_info(node, query_pattern):
node.query("SYSTEM FLUSH LOGS")
return (
node.query(
"""
SELECT user, initial_user
FROM system.query_log
WHERE
query LIKE '%{}%' AND
query NOT LIKE '%system.query_log%' AND
type = 'QueryFinish'
""".format(
query_pattern
)
)
.strip()
.split("\t")
)
# @return -- [user, initial_user]
def get_query_user_info_by_id(node, query_id):
node.query("SYSTEM FLUSH LOGS")
return (
node.query(
"""
SELECT user, initial_user
FROM system.query_log
WHERE
query_id = '{}' AND
type = 'QueryFinish'
""".format(
query_id
)
)
.strip()
.split("\t")
)
# @return -- settings
def get_query_setting_on_shard(node, query_pattern, setting):
node.query("SYSTEM FLUSH LOGS")
return node.query(
"""
SELECT Settings['{}']
FROM system.query_log
WHERE
query LIKE '%{}%' AND
NOT is_initial_query AND
query NOT LIKE '%system.query_log%' AND
type = 'QueryFinish'
LIMIT 1
""".format(
setting, query_pattern
)
).strip()
def test_insecure():
n1.query("SELECT * FROM dist_insecure")
def test_insecure_insert_async():
n1.query("TRUNCATE TABLE data")
n1.query("INSERT INTO dist_insecure SELECT * FROM numbers(2)")
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER insecure dist_insecure")
assert int(n1.query("SELECT count() FROM dist_insecure")) == 2
n1.query("TRUNCATE TABLE data ON CLUSTER insecure")
def test_insecure_insert_sync():
n1.query("TRUNCATE TABLE data")
n1.query(
"INSERT INTO dist_insecure SELECT * FROM numbers(2)",
settings={"distributed_foreground_insert": 1},
)
assert int(n1.query("SELECT count() FROM dist_insecure")) == 2
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
def test_secure():
n1.query("SELECT * FROM dist_secure")
def test_secure_insert_async():
n1.query("TRUNCATE TABLE data")
n1.query("INSERT INTO dist_secure SELECT * FROM numbers(2)")
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure")
assert int(n1.query("SELECT count() FROM dist_secure")) == 2
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
def test_secure_insert_sync():
n1.query("TRUNCATE TABLE data")
n1.query(
"INSERT INTO dist_secure SELECT * FROM numbers(2)",
settings={"distributed_foreground_insert": 1},
)
assert int(n1.query("SELECT count() FROM dist_secure")) == 2
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
# INSERT without initial_user
#
# Buffer() flush happens with global context, that does not have user
# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty
#
# This is the regression test for the subsequent query that it
# will not use user from the previous query.
#
# The test a little bit complex, but I will try to explain:
# - first, we need to execute query with the readonly user (regualar SELECT),
# and then we will execute INSERT, and if the bug is there, then INSERT will
# use the user from SELECT and will fail (since you cannot do INSERT with
# readonly=1/2)
#
# - the trick with generating random priority (via sed) is to avoid reusing
# connection from n1 to n2 from another test (and we cannot simply use
# another pool after ConnectionPoolFactory had been added [1].
#
# [1]: https://github.com/ClickHouse/ClickHouse/pull/26318
#
# We need at least one change in one of fields of the node/shard definition,
# and this "priorirty" for us in this test.
#
# - after we will ensure that connection is really established from the context
# of SELECT query, and that the connection will not be established from the
# context of the INSERT query (but actually it is a no-op since the INSERT
# will be done in background, due to distributed_foreground_insert=false by
# default)
#
# - if the bug is there, then FLUSH DISTRIBUTED will fail, because it will go
# from n1 to n2 using previous user.
#
# I hope that this will clarify something for the reader.
def test_secure_insert_buffer_async():
# Change cluster definition so that the SELECT will always creates new connection
priority = int(time.time())
n1.exec_in_container(
[
"bash",
"-c",
f'sed -i "s#<priority>.*</priority>#<priority>{priority}</priority>#" /etc/clickhouse-server/config.d/remote_servers.xml',
]
)
n1.query("SYSTEM RELOAD CONFIG")
# ensure that SELECT creates new connection (we need separate table for
# this, so that separate distributed pool will be used)
query_id = uuid.uuid4().hex
n1.query("SELECT * FROM dist_secure_from_buffer", user="ro", query_id=query_id)
assert n1.contains_in_log(
"{" + query_id + "} <Trace> Connection (n2:9000): Connecting."
)
query_id = uuid.uuid4().hex
n1.query(
"INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)", query_id=query_id
)
# ensure that INSERT does not creates new connection, so that it will use
# previous connection that was instantiated with "ro" user (using
# interserver secret)
assert not n1.contains_in_log(
"{" + query_id + "} <Trace> Connection (n2:9000): Connecting."
)
assert get_query_user_info_by_id(n1, query_id) == ["default", "default"]
# And before the bug was fixed this query will fail with the following error:
#
# Code: 164. DB::Exception: Received from 172.16.2.5:9000. DB::Exception: There was an error on [n1:9000]: Code: 164. DB::Exception: Received from n2:9000. DB::Exception: ro: Cannot execute query in readonly mode. (READONLY)
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer")
n1.query("OPTIMIZE TABLE dist_secure_buffer")
n1.query("SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer")
# Check user from which the INSERT on the remote node will be executed
#
# Incorrect example:
#
# {2c55669f-71ad-48fe-98fa-7b475b80718e} <Debug> executeQuery: (from 172.16.1.1:44636, user: ro) INSERT INTO default.data_from_buffer (key) VALUES
#
# Correct example:
#
# {2c55669f-71ad-48fe-98fa-7b475b80718e} <Debug> executeQuery: (from 0.0.0.0:0, user: ) INSERT INTO default.data_from_buffer (key) VALUES
#
assert n2.contains_in_log(
"executeQuery: (from 0.0.0.0:0, user: ) INSERT INTO default.data_from_buffer (key) VALUES"
)
assert int(n1.query("SELECT count() FROM dist_secure_from_buffer")) == 2
n1.query("TRUNCATE TABLE data_from_buffer ON CLUSTER secure")
def test_secure_disagree():
with pytest.raises(
QueryRuntimeException, match=".*Interserver authentication failed.*"
):
n1.query("SELECT * FROM dist_secure_disagree")
def test_secure_disagree_insert():
n1.query("TRUNCATE TABLE data")
n1.query("INSERT INTO dist_secure_disagree SELECT * FROM numbers(2)")
with pytest.raises(
QueryRuntimeException, match=".*Interserver authentication failed.*"
):
n1.query(
"SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure_disagree dist_secure_disagree"
)
# check the the connection will be re-established
# IOW that we will not get "Unknown BlockInfo field"
with pytest.raises(
QueryRuntimeException, match=".*Interserver authentication failed.*"
):
assert int(n1.query("SELECT count() FROM dist_secure_disagree")) == 0
@users
def test_user_insecure_cluster(user, password):
id_ = "query-dist_insecure-" + user
n1.query(f"SELECT *, '{id_}' FROM dist_insecure", user=user, password=password)
assert get_query_user_info(n1, id_) == [
user,
user,
] # due to prefer_localhost_replica
assert get_query_user_info(n2, id_) == ["default", user]
@users
def test_user_secure_cluster(user, password):
id_ = "query-dist_secure-" + user
n1.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user]
@users
def test_per_user_inline_settings_insecure_cluster(user, password):
id_ = "query-ddl-settings-dist_insecure-" + user
n1.query(
f"""
SELECT *, '{id_}' FROM dist_insecure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""",
user=user,
password=password,
)
assert get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user") == ""
@users
def test_per_user_inline_settings_secure_cluster(user, password):
id_ = "query-ddl-settings-dist_secure-" + user
n1.query(
f"""
SELECT *, '{id_}' FROM dist_secure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
max_untracked_memory=0
""",
user=user,
password=password,
)
assert int(get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user")) == int(
1e9
)
@users
def test_per_user_protocol_settings_insecure_cluster(user, password):
id_ = "query-protocol-settings-dist_insecure-" + user
n1.query(
f"SELECT *, '{id_}' FROM dist_insecure",
user=user,
password=password,
settings={
"prefer_localhost_replica": 0,
"max_memory_usage_for_user": int(1e9),
"max_untracked_memory": 0,
},
)
assert get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user") == ""
@users
def test_per_user_protocol_settings_secure_cluster(user, password):
id_ = "query-protocol-settings-dist_secure-" + user
n1.query(
f"SELECT *, '{id_}' FROM dist_secure",
user=user,
password=password,
settings={
"prefer_localhost_replica": 0,
"max_memory_usage_for_user": int(1e9),
"max_untracked_memory": 0,
},
)
assert int(get_query_setting_on_shard(n1, id_, "max_memory_usage_for_user")) == int(
1e9
)
@users
def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user
n1.query(
f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
@users
def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user
backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
assert n1.contains_in_log(
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."
)