mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
225 lines
6.5 KiB
Python
225 lines
6.5 KiB
Python
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=line-too-long
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
n1 = cluster.add_instance("n1", main_configs=["configs/remote_servers.xml"])
|
|
n2 = cluster.add_instance("n2", main_configs=["configs/remote_servers.xml"])
|
|
n3 = cluster.add_instance("n3", main_configs=["configs/remote_servers.xml"])
|
|
|
|
nodes = len(cluster.instances)
|
|
queries = nodes * 10
|
|
|
|
|
|
def bootstrap():
|
|
for n in list(cluster.instances.values()):
|
|
# At startup, server loads configuration files.
|
|
#
|
|
# However ConfigReloader does not know about already loaded files
|
|
# (files is empty()), hence it will always reload the configuration
|
|
# just after server starts (+ 2 seconds, reload timeout).
|
|
#
|
|
# And on configuration reload the clusters will be re-created, so some
|
|
# internal stuff will be reset:
|
|
# - error_count
|
|
# - last_used (round_robing)
|
|
#
|
|
# And if the reload will happen during round_robin test it will start
|
|
# querying from the beginning, so let's issue config reload just after
|
|
# start to avoid reload in the middle of the test execution.
|
|
n.query("SYSTEM RELOAD CONFIG")
|
|
n.query("DROP TABLE IF EXISTS data")
|
|
n.query("DROP TABLE IF EXISTS dist")
|
|
n.query("CREATE TABLE data (key Int) Engine=Memory()")
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist AS data
|
|
Engine=Distributed(
|
|
replicas_cluster,
|
|
currentDatabase(),
|
|
data)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_priority AS data
|
|
Engine=Distributed(
|
|
replicas_priority_cluster,
|
|
currentDatabase(),
|
|
data)
|
|
"""
|
|
)
|
|
n.query(
|
|
"""
|
|
CREATE TABLE dist_priority_negative AS data
|
|
Engine=Distributed(
|
|
replicas_priority_negative_cluster,
|
|
currentDatabase(),
|
|
data)
|
|
"""
|
|
)
|
|
|
|
|
|
def make_uuid():
|
|
return uuid.uuid4().hex
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
bootstrap()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_node(query_node, table="dist", *args, **kwargs):
|
|
query_id = make_uuid()
|
|
|
|
settings = {
|
|
"query_id": query_id,
|
|
"log_queries": 1,
|
|
"log_queries_min_type": "QUERY_START",
|
|
"prefer_localhost_replica": 0,
|
|
}
|
|
if "settings" not in kwargs:
|
|
kwargs["settings"] = settings
|
|
else:
|
|
kwargs["settings"].update(settings)
|
|
|
|
query_node.query("SELECT * FROM " + table, *args, **kwargs)
|
|
|
|
for n in list(cluster.instances.values()):
|
|
n.query("SYSTEM FLUSH LOGS")
|
|
|
|
rows = query_node.query(
|
|
"""
|
|
SELECT c.host_name
|
|
FROM (
|
|
SELECT _shard_num
|
|
FROM cluster(shards_cluster, system.query_log)
|
|
WHERE
|
|
initial_query_id = '{query_id}' AND
|
|
is_initial_query = 0 AND
|
|
type = 'QueryFinish'
|
|
ORDER BY event_date DESC, event_time DESC
|
|
LIMIT 1
|
|
) a
|
|
JOIN system.clusters c
|
|
ON a._shard_num = c.shard_num WHERE cluster = 'shards_cluster'
|
|
""".format(
|
|
query_id=query_id
|
|
)
|
|
)
|
|
return rows.strip()
|
|
|
|
|
|
# TODO: right now random distribution looks bad, but works
|
|
def test_load_balancing_default():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "random"}))
|
|
assert len(unique_nodes) == nodes, unique_nodes
|
|
|
|
|
|
def test_load_balancing_nearest_hostname():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "nearest_hostname"}))
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_in_order():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "in_order"}))
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_first_or_random():
|
|
unique_nodes = set()
|
|
for _ in range(0, queries):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "first_or_random"}))
|
|
assert len(unique_nodes) == 1, unique_nodes
|
|
assert unique_nodes == set(["n1"])
|
|
|
|
|
|
def test_load_balancing_round_robin():
|
|
unique_nodes = set()
|
|
for _ in range(0, nodes):
|
|
unique_nodes.add(get_node(n1, settings={"load_balancing": "round_robin"}))
|
|
assert len(unique_nodes) == nodes, unique_nodes
|
|
assert unique_nodes == set(["n1", "n2", "n3"])
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"dist_table",
|
|
[
|
|
("dist_priority"),
|
|
("dist_priority_negative"),
|
|
],
|
|
)
|
|
def test_load_balancing_priority_round_robin(dist_table):
|
|
unique_nodes = set()
|
|
for _ in range(0, nodes):
|
|
unique_nodes.add(
|
|
get_node(n1, dist_table, settings={"load_balancing": "round_robin"})
|
|
)
|
|
assert len(unique_nodes) == 2, unique_nodes
|
|
# n2 has bigger priority in config
|
|
assert unique_nodes == set(["n1", "n3"])
|
|
|
|
|
|
def test_distributed_replica_max_ignored_errors():
|
|
settings = {
|
|
"use_hedged_requests": 0,
|
|
"load_balancing": "in_order",
|
|
"prefer_localhost_replica": 0,
|
|
"connect_timeout": 2,
|
|
"receive_timeout": 2,
|
|
"send_timeout": 2,
|
|
"idle_connection_timeout": 2,
|
|
"tcp_keep_alive_timeout": 2,
|
|
"distributed_replica_max_ignored_errors": 0,
|
|
"distributed_replica_error_half_life": 60,
|
|
}
|
|
|
|
# initiate connection (if started only this test)
|
|
n2.query("SELECT * FROM dist", settings=settings)
|
|
cluster.pause_container("n1")
|
|
|
|
# n1 paused -- skipping, and increment error_count for n1
|
|
# but the query succeeds, no need in query_and_get_error()
|
|
n2.query("SELECT * FROM dist", settings=settings)
|
|
# XXX: due to config reloading we need second time (sigh)
|
|
n2.query("SELECT * FROM dist", settings=settings)
|
|
# check error_count for n1
|
|
assert (
|
|
int(
|
|
n2.query(
|
|
"""
|
|
SELECT errors_count FROM system.clusters
|
|
WHERE cluster = 'replicas_cluster' AND host_name = 'n1'
|
|
""",
|
|
settings=settings,
|
|
)
|
|
)
|
|
== 1
|
|
)
|
|
|
|
cluster.unpause_container("n1")
|
|
# still n2
|
|
assert get_node(n2, settings=settings) == "n2"
|
|
# now n1
|
|
settings["distributed_replica_max_ignored_errors"] = 1
|
|
assert get_node(n2, settings=settings) == "n1"
|