ClickHouse/tests/integration/helpers/keeper_utils.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

96 lines
2.6 KiB
Python
Raw Normal View History

2022-09-06 10:58:14 +00:00
import socket
import time
2023-04-20 13:26:02 +00:00
from kazoo.client import KazooClient
2022-11-08 11:56:20 +00:00
2022-09-06 10:58:14 +00:00
def get_keeper_socket(cluster, node, port=9181):
hosts = cluster.get_instance_ip(node.name)
client = socket.socket()
client.settimeout(10)
client.connect((hosts, port))
return client
def send_4lw_cmd(cluster, node, cmd="ruok", port=9181):
client = None
try:
client = get_keeper_socket(cluster, node, port)
client.send(cmd.encode())
data = client.recv(100_000)
data = data.decode()
return data
finally:
if client is not None:
client.close()
NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving requests"
2023-04-20 13:26:02 +00:00
def wait_until_connected(cluster, node, port=9181, timeout=30.0):
elapsed = 0.0
2022-09-06 10:58:14 +00:00
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
2023-04-20 13:26:02 +00:00
elapsed += 0.1
if elapsed >= timeout:
raise Exception(
f"{timeout}s timeout while waiting for {node.name} to start serving requests"
)
2022-09-06 10:58:14 +00:00
def wait_until_quorum_lost(cluster, node, port=9181):
while send_4lw_cmd(cluster, node, "mntr", port) != NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
def wait_nodes(cluster, nodes):
for node in nodes:
wait_until_connected(cluster, node)
2022-11-08 10:44:43 +00:00
def is_leader(cluster, node, port=9181):
stat = send_4lw_cmd(cluster, node, "stat", port)
2022-11-08 11:43:36 +00:00
return "Mode: leader" in stat
2022-11-16 01:33:46 +00:00
def get_leader(cluster, nodes):
for node in nodes:
if is_leader(cluster, node):
return node
raise Exception("No leader in Keeper cluster.")
2023-04-20 13:26:02 +00:00
def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
_fake = KazooClient(
hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout
)
_fake.start()
return _fake
def get_config_str(zk: KazooClient) -> str:
"""
Return decoded contents of /keeper/config node
"""
return zk.get("/keeper/config")[0].decode("utf-8")
def wait_configs_equal(left_config: str, right_zk: KazooClient, timeout: float = 30.0):
2023-04-20 13:26:02 +00:00
"""
Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection.
2023-04-20 13:26:02 +00:00
"""
2023-07-03 17:52:57 +00:00
elapsed: float = 0.0
while sorted(left_config.split("\n")) != sorted(
get_config_str(right_zk).split("\n")
):
time.sleep(1)
elapsed += 1
if elapsed >= timeout:
raise Exception(
f"timeout while checking nodes configs to get equal. "
2023-07-03 17:52:57 +00:00
f"Left: {left_config}, right: {get_config_str(right_zk)}"
)