mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 03:25:15 +00:00
96 lines
2.6 KiB
Python
96 lines
2.6 KiB
Python
import socket
|
|
import time
|
|
from kazoo.client import KazooClient
|
|
|
|
|
|
def get_keeper_socket(cluster, node, port=9181):
|
|
hosts = cluster.get_instance_ip(node.name)
|
|
client = socket.socket()
|
|
client.settimeout(10)
|
|
client.connect((hosts, port))
|
|
return client
|
|
|
|
|
|
def send_4lw_cmd(cluster, node, cmd="ruok", port=9181):
|
|
client = None
|
|
try:
|
|
client = get_keeper_socket(cluster, node, port)
|
|
client.send(cmd.encode())
|
|
data = client.recv(100_000)
|
|
data = data.decode()
|
|
return data
|
|
finally:
|
|
if client is not None:
|
|
client.close()
|
|
|
|
|
|
NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving requests"
|
|
|
|
|
|
def wait_until_connected(cluster, node, port=9181, timeout=30.0):
|
|
elapsed = 0.0
|
|
|
|
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
|
|
time.sleep(0.1)
|
|
elapsed += 0.1
|
|
|
|
if elapsed >= timeout:
|
|
raise Exception(
|
|
f"{timeout}s timeout while waiting for {node.name} to start serving requests"
|
|
)
|
|
|
|
|
|
def wait_until_quorum_lost(cluster, node, port=9181):
|
|
while send_4lw_cmd(cluster, node, "mntr", port) != NOT_SERVING_REQUESTS_ERROR_MSG:
|
|
time.sleep(0.1)
|
|
|
|
|
|
def wait_nodes(cluster, nodes):
|
|
for node in nodes:
|
|
wait_until_connected(cluster, node)
|
|
|
|
|
|
def is_leader(cluster, node, port=9181):
|
|
stat = send_4lw_cmd(cluster, node, "stat", port)
|
|
return "Mode: leader" in stat
|
|
|
|
|
|
def get_leader(cluster, nodes):
|
|
for node in nodes:
|
|
if is_leader(cluster, node):
|
|
return node
|
|
raise Exception("No leader in Keeper cluster.")
|
|
|
|
|
|
def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
|
|
_fake = KazooClient(
|
|
hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout
|
|
)
|
|
_fake.start()
|
|
return _fake
|
|
|
|
|
|
def get_config_str(zk: KazooClient) -> str:
|
|
"""
|
|
Return decoded contents of /keeper/config node
|
|
"""
|
|
return zk.get("/keeper/config")[0].decode("utf-8")
|
|
|
|
|
|
def wait_configs_equal(left_config: str, right_zk: KazooClient, timeout: float = 30.0):
|
|
"""
|
|
Check whether get /keeper/config result in left_config is equal
|
|
to get /keeper/config on right_zk ZK connection.
|
|
"""
|
|
elapsed: float = 0.0
|
|
while sorted(left_config.split("\n")) != sorted(
|
|
get_config_str(right_zk).split("\n")
|
|
):
|
|
time.sleep(1)
|
|
elapsed += 1
|
|
if elapsed >= timeout:
|
|
raise Exception(
|
|
f"timeout while checking nodes configs to get equal. "
|
|
f"Left: {left_config}, right: {get_config_str(right_zk)}"
|
|
)
|