ClickHouse/tests/integration/test_zookeeper_fallback_session/test.py

102 lines
3.3 KiB
Python
Raw Normal View History

2023-07-27 17:13:58 +00:00
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
cluster = ClickHouseCluster(
__file__, zookeeper_config_path="configs/zookeeper_load_balancing.xml"
)
node1 = cluster.add_instance(
"node1",
with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_load_balancing.xml"],
)
node2 = cluster.add_instance(
"node2",
with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_load_balancing.xml"],
)
node3 = cluster.add_instance(
"node3",
with_zookeeper=True,
main_configs=["configs/remote_servers.xml", "configs/zookeeper_load_balancing.xml"],
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in [node1, node2, node3]:
node.query("DROP TABLE IF EXISTS simple SYNC")
node.query(
"""
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}') PARTITION BY toYYYYMM(date) ORDER BY id;
""".format(
replica=node.name
)
)
yield cluster
finally:
cluster.shutdown()
def assert_uses_zk_node(node: ClickHouseInstance, zk_node):
def check_callback(host):
return host.strip() == expected_zk_ip_addr
expected_zk_ip_addr = node.cluster.get_instance_ip(zk_node)
host = node.query_with_retry(
"select host from system.zookeeper_connection", check_callback=check_callback
)
assert host.strip() == expected_zk_ip_addr
def test_fallback_session(started_cluster: ClickHouseCluster):
# only leave connecting to zoo3 possible
with PartitionManager() as pm:
for node in started_cluster.instances.values():
for zk in ["zoo1", "zoo2"]:
pm._add_rule(
{
"source": node.ip_address,
"destination": cluster.get_instance_ip(zk),
"action": "REJECT --reject-with tcp-reset",
}
)
for node in [node1, node2, node3]:
# all nodes will have to switch to zoo3
assert_uses_zk_node(node, "zoo3")
node1.query_with_retry("INSERT INTO simple VALUES ({0}, {0})".format(1))
# and replication still works
for node in [node2, node3]:
assert (
node.query_with_retry(
"SELECT count() from simple",
check_callback=lambda count: count.strip() == "1",
)
== "1\n"
)
# at this point network partitioning has been reverted.
# the nodes should switch to zoo1 automatically because of `in_order` load-balancing.
# otherwise they would connect to a random replica
for node in [node1, node2, node3]:
assert_uses_zk_node(node, "zoo1")
node1.query_with_retry("INSERT INTO simple VALUES ({0}, {0})".format(2))
for node in [node2, node3]:
assert (
node.query_with_retry(
"SELECT count() from simple",
check_callback=lambda count: count.strip() == "2",
)
== "2\n"
)