mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 03:12:43 +00:00
237 lines
6.4 KiB
Python
237 lines
6.4 KiB
Python
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=line-too-long
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
n1 = cluster.add_instance(
|
|
"n1",
|
|
main_configs=["configs/remote_servers.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
n2 = cluster.add_instance(
|
|
"n2",
|
|
main_configs=["configs/remote_servers.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
n3 = cluster.add_instance(
|
|
"n3",
|
|
main_configs=["configs/remote_servers.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
|
|
nodes = len(cluster.instances)
|
|
queries = nodes * 10
|
|
|
|
|
|
# SYSTEM RELOAD CONFIG will reset some attributes of the nodes in cluster
|
|
# - error_count
|
|
# - last_used (round_robing)
|
|
#
|
|
# This is required to avoid interference results of one test to another
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def test_setup():
|
|
for n in list(cluster.instances.values()):
|
|
n.query("SYSTEM RELOAD CONFIG")
|
|
|
|
|
|
def bootstrap():
|
|
for n in list(cluster.instances.values()):
|
|
n.query("DROP TABLE IF EXISTS data")
|
|
n.query("DROP TABLE IF EXISTS dist")
|
|
n.query("CREATE TABLE data (key Int) Engine=Memory()")
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist AS data
|
|
Engine=Distributed(
|
|
replicas_cluster,
|
|
currentDatabase(),
|
|
data)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_priority AS data
|
|
Engine=Distributed(
|
|
replicas_priority_cluster,
|
|
currentDatabase(),
|
|
data)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_priority_negative AS data
|
|
Engine=Distributed(
|
|
replicas_priority_negative_cluster,
|
|
currentDatabase(),
|
|
data)
|
|
"""
|
|
)
|
|
|
|
|
|
def make_uuid():
|
|
return uuid.uuid4().hex
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
bootstrap()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_node(query_node, table="dist", *args, **kwargs):
|
|
query_id = make_uuid()
|
|
|
|
settings = {
|
|
"query_id": query_id,
|
|
"log_queries": 1,
|
|
"log_queries_min_type": "QUERY_START",
|
|
"prefer_localhost_replica": 0,
|
|
}
|
|
if "settings" not in kwargs:
|
|
kwargs["settings"] = settings
|
|
else:
|
|
kwargs["settings"].update(settings)
|
|
|
|
query_node.query("SELECT * FROM " + table, *args, **kwargs)
|
|
|
|
for n in list(cluster.instances.values()):
|
|
n.query("SYSTEM FLUSH LOGS")
|
|
|
|
rows = query_node.query(
|
|
"""
|
|
SELECT hostName()
|
|
FROM cluster(shards_cluster, system.query_log)
|
|
WHERE
|
|
initial_query_id = '{query_id}' AND
|
|
is_initial_query = 0 AND
|
|
type = 'QueryFinish'
|
|
ORDER BY event_date DESC, event_time DESC
|
|
LIMIT 1
|
|
""".format(
|
|
query_id=query_id
|
|
)
|
|
)
|
|
return rows.strip()
|
|
|
|
|
|
# TODO: right now random distribution looks bad, but works
|
|
def test_load_balancing_default():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "random"}))
|
|
assert len(unique_nodes) == nodes, unique_nodes
|
|
|
|
|
|
def test_load_balancing_nearest_hostname():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "nearest_hostname"}))
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_hostname_levenshtein_distance():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(
|
|
get_node(n1, settings={"load_balancing": "hostname_levenshtein_distance"})
|
|
)
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_in_order():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "in_order"}))
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_first_or_random():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "first_or_random"}))
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_round_robin():
|
|
unique_nodes = set()
|
|
for _ in range(0, nodes):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "round_robin"}))
|
|
assert len(unique_nodes) == nodes, unique_nodes
|
|
assert unique_nodes == set(["n1", "n2", "n3"])
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"dist_table",
|
|
[
|
|
("dist_priority"),
|
|
("dist_priority_negative"),
|
|
],
|
|
)
|
|
def test_load_balancing_priority_round_robin(dist_table):
|
|
unique_nodes = set()
|
|
for _ in range(0, nodes):
|
|
unique_nodes.add(
|
|
get_node(n1, dist_table, settings={"load_balancing": "round_robin"})
|
|
)
|
|
assert len(unique_nodes) == 2, unique_nodes
|
|
# n2 has bigger priority in config
|
|
assert unique_nodes == set(["n1", "n3"])
|
|
|
|
|
|
def test_distributed_replica_max_ignored_errors():
|
|
settings = {
|
|
"use_hedged_requests": 0,
|
|
"load_balancing": "in_order",
|
|
"prefer_localhost_replica": 0,
|
|
"connect_timeout": 2,
|
|
"receive_timeout": 2,
|
|
"send_timeout": 2,
|
|
"tcp_keep_alive_timeout": 2,
|
|
"distributed_replica_max_ignored_errors": 0,
|
|
"distributed_replica_error_half_life": 60,
|
|
}
|
|
|
|
# initiate connection (if started only this test)
|
|
n2.query("SELECT * FROM dist", settings=settings)
|
|
cluster.pause_container("n1")
|
|
|
|
# n1 paused -- skipping, and increment error_count for n1
|
|
# but the query succeeds, no need in query_and_get_error()
|
|
n2.query("SELECT * FROM dist", settings=settings)
|
|
# XXX: due to config reloading we need second time (sigh)
|
|
n2.query("SELECT * FROM dist", settings=settings)
|
|
# check error_count for n1
|
|
assert (
|
|
int(
|
|
n2.query(
|
|
"""
|
|
SELECT errors_count FROM system.clusters
|
|
WHERE cluster = 'replicas_cluster' AND host_name = 'n1'
|
|
""",
|
|
settings=settings,
|
|
)
|
|
)
|
|
== 1
|
|
)
|
|
|
|
cluster.unpause_container("n1")
|
|
# still n2
|
|
assert get_node(n2, settings=settings) == "n2"
|
|
# now n1
|
|
settings["distributed_replica_max_ignored_errors"] = 1
|
|
assert get_node(n2, settings=settings) == "n1"
|