ClickHouse/tests/integration/helpers/keeper_utils.py
2022-11-16 12:51:51 +08:00

54 lines
1.3 KiB
Python

import socket
import time
def get_keeper_socket(cluster, node, port=9181):
hosts = cluster.get_instance_ip(node.name)
client = socket.socket()
client.settimeout(10)
client.connect((hosts, port))
return client
def send_4lw_cmd(cluster, node, cmd="ruok", port=9181):
client = None
try:
client = get_keeper_socket(cluster, node, port)
client.send(cmd.encode())
data = client.recv(100_000)
data = data.decode()
return data
finally:
if client is not None:
client.close()
NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving requests"
def wait_until_connected(cluster, node, port=9181):
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
def wait_until_quorum_lost(cluster, node, port=9181):
while send_4lw_cmd(cluster, node, "mntr", port) != NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
def wait_nodes(cluster, nodes):
for node in nodes:
wait_until_connected(cluster, node)
def is_leader(cluster, node, port=9181):
stat = send_4lw_cmd(cluster, node, "stat", port)
return "Mode: leader" in stat
def get_leader(cluster, nodes):
for node in nodes:
if is_leader(cluster, node):
return node
raise Exception("No leader in Keeper cluster.")