ClickHouse/tests/integration/test_distributed_respect_user_timeouts/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

183 lines
5.4 KiB
Python
Raw Normal View History

2019-06-01 05:43:02 +00:00
import itertools
2024-09-27 10:19:39 +00:00
import logging
import os.path
import timeit
2019-06-01 05:43:02 +00:00
import pytest
2024-09-27 10:19:39 +00:00
2019-06-01 05:43:02 +00:00
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
2020-02-06 17:23:05 +00:00
NODES = {"node" + str(i): None for i in (1, 2)}
2019-06-01 05:43:02 +00:00
2021-08-19 11:32:32 +00:00
IS_DEBUG = False
2019-06-01 05:43:02 +00:00
CREATE_TABLES_SQL = """
CREATE DATABASE test;
CREATE TABLE base_table(
node String
)
ENGINE = MergeTree
PARTITION BY node
ORDER BY node;
CREATE TABLE distributed_table
ENGINE = Distributed(test_cluster, default, base_table) AS base_table;
"""
2019-06-01 05:43:02 +00:00
INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}')"
SELECTS_SQL = {
"distributed": "SELECT node FROM distributed_table ORDER BY node",
"remote": (
"SELECT node FROM remote('node1,node2', default.base_table) " "ORDER BY node"
),
}
2021-07-12 05:59:01 +00:00
EXCEPTION_NETWORK = "DB::NetException: "
2019-06-01 05:43:02 +00:00
EXCEPTION_TIMEOUT = "Timeout exceeded while reading from socket ("
2023-03-07 15:34:39 +00:00
EXCEPTION_CONNECT_TIMEOUT = "Timeout exceeded while connecting to socket ("
2019-06-01 05:43:02 +00:00
TIMEOUT_MEASUREMENT_EPS = 0.01
EXPECTED_BEHAVIOR = {
"default": {
"times": 3,
"timeout": 1,
},
"ready_to_wait": {
"times": 5,
"timeout": 3,
},
}
2020-02-06 17:23:05 +00:00
TIMEOUT_DIFF_UPPER_BOUND = {
"default": {
"distributed": 5.5,
"remote": 2.5,
},
"ready_to_wait": {
"distributed": 3,
2021-07-28 10:29:15 +00:00
"remote": 2.0,
2020-02-06 17:23:05 +00:00
},
}
2019-06-01 05:43:02 +00:00
2019-06-01 05:43:02 +00:00
def _check_exception(exception, expected_tries=3):
lines = exception.split("\n")
assert len(lines) > 4, "Unexpected exception (expected: timeout info)"
assert lines[0].startswith("Received exception from server (version")
assert lines[1].startswith("Code: 279")
assert lines[1].endswith("All connection tries failed. Log: ")
assert lines[2] == "", "Unexpected exception text (expected: empty line)"
for i, line in enumerate(lines[3 : 3 + expected_tries]):
expected_lines = (
"Code: 209. " + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
"Code: 209. " + EXCEPTION_NETWORK + "Timeout: connect timed out",
2023-03-07 15:34:39 +00:00
EXCEPTION_CONNECT_TIMEOUT,
2021-02-21 14:03:24 +00:00
EXCEPTION_TIMEOUT,
2019-06-01 05:43:02 +00:00
)
assert any(
line.startswith(expected) for expected in expected_lines
), 'Unexpected exception "{}" at one of the connection attempts'.format(line)
2019-06-01 05:43:02 +00:00
assert lines[3 + expected_tries] == "", "Wrong number of connect attempts"
2020-02-06 17:23:05 +00:00
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def started_cluster(request):
cluster = ClickHouseCluster(__file__, request.param)
2020-02-06 17:23:05 +00:00
cluster.__with_ssl_config = request.param == "configs_secure"
main_configs = []
main_configs += [os.path.join(request.param, "config.d/remote_servers.xml")]
if cluster.__with_ssl_config:
main_configs += [os.path.join(request.param, "server.crt")]
main_configs += [os.path.join(request.param, "server.key")]
main_configs += [os.path.join(request.param, "dhparam.pem")]
main_configs += [os.path.join(request.param, "config.d/ssl_conf.xml")]
user_configs = [os.path.join(request.param, "users.d/set_distributed_defaults.xml")]
2020-02-06 17:23:05 +00:00
for name in NODES:
NODES[name] = cluster.add_instance(
name, main_configs=main_configs, user_configs=user_configs
)
2019-06-01 05:43:02 +00:00
try:
cluster.start()
2021-08-19 11:32:32 +00:00
if cluster.instances["node1"].is_debug_build():
global IS_DEBUG
IS_DEBUG = True
logging.warning(
"Debug build is too slow to show difference in timings. We disable checks."
)
2020-10-02 16:54:07 +00:00
for node_id, node in list(NODES.items()):
2019-06-01 05:43:02 +00:00
node.query(CREATE_TABLES_SQL)
node.query(INSERT_SQL_TEMPLATE.format(node_id=node_id))
yield cluster
finally:
cluster.shutdown()
2020-02-06 17:23:05 +00:00
def _check_timeout_and_exception(node, user, query_base, query):
2019-06-01 05:43:02 +00:00
repeats = EXPECTED_BEHAVIOR[user]["times"]
2022-07-21 11:57:24 +00:00
expected_timeout = EXPECTED_BEHAVIOR[user]["timeout"] * repeats
2019-06-01 05:43:02 +00:00
start = timeit.default_timer()
2020-02-06 17:23:05 +00:00
exception = node.query_and_get_error(query, user=user)
2019-06-01 05:43:02 +00:00
# And it should timeout no faster than:
measured_timeout = timeit.default_timer() - start
2021-08-19 11:32:32 +00:00
if not IS_DEBUG:
assert expected_timeout - measured_timeout <= TIMEOUT_MEASUREMENT_EPS
assert (
measured_timeout - expected_timeout
<= TIMEOUT_DIFF_UPPER_BOUND[user][query_base]
)
2019-06-01 05:43:02 +00:00
# And exception should reflect connection attempts:
_check_exception(exception, repeats)
@pytest.mark.parametrize(
("first_user", "node_name", "query_base"),
tuple(itertools.product(EXPECTED_BEHAVIOR, NODES, SELECTS_SQL)),
)
def test_reconnect(started_cluster, node_name, first_user, query_base):
node = NODES[node_name]
2020-02-06 17:23:05 +00:00
query = SELECTS_SQL[query_base]
if started_cluster.__with_ssl_config:
query = query.replace("remote(", "remoteSecure(")
2019-06-01 05:43:02 +00:00
# Everything is up, select should work:
assert TSV(node.query(query, user=first_user)) == TSV("node1\nnode2")
with PartitionManager() as pm:
# Break the connection.
2020-10-02 16:54:07 +00:00
pm.partition_instances(*list(NODES.values()))
2019-06-01 05:43:02 +00:00
# Now it shouldn't:
2020-02-06 17:23:05 +00:00
_check_timeout_and_exception(node, first_user, query_base, query)
2019-06-01 05:43:02 +00:00
# Other user should have different timeout and exception
_check_timeout_and_exception(
node,
"default" if first_user != "default" else "ready_to_wait",
query_base,
2020-02-06 17:23:05 +00:00
query,
2019-06-01 05:43:02 +00:00
)
# select should work again:
assert TSV(node.query(query, user=first_user)) == TSV("node1\nnode2")