ClickHouse/tests/integration/helpers/keeper_utils.py
2023-09-16 00:32:32 +00:00

283 lines
8.3 KiB
Python

import io
import subprocess
import socket
import time
import typing as tp
import contextlib
import select
from kazoo.client import KazooClient
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.client import CommandRequest
def execute_keeper_client_query(
cluster: ClickHouseCluster, node: ClickHouseInstance, query: str
) -> str:
request = CommandRequest(
[
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip(node.name)),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
return request.get_answer()
class KeeperException(Exception):
pass
class KeeperClient(object):
SEPARATOR = b"\a\a\a\a\n"
def __init__(self, bin_path: str, host: str, port: int):
self.bin_path = bin_path
self.host = host
self.port = port
self.proc = subprocess.Popen(
[
bin_path,
"keeper-client",
"--host",
host,
"--port",
str(port),
"--log-level",
"error",
"--tests-mode",
"--no-confirmation",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.poller = select.epoll()
self.poller.register(self.proc.stdout)
self.poller.register(self.proc.stderr)
self._fd_nums = {
self.proc.stdout.fileno(): self.proc.stdout,
self.proc.stderr.fileno(): self.proc.stderr,
}
self.stopped = False
def execute_query(self, query: str, timeout: float = 60.0) -> str:
output = io.BytesIO()
self.proc.stdin.write(query.encode() + b"\n")
self.proc.stdin.flush()
events = self.poller.poll(timeout)
if not events:
raise TimeoutError(f"Keeper client returned no output")
for fd_num, event in events:
if event & (select.EPOLLIN | select.EPOLLPRI):
file = self._fd_nums[fd_num]
if file == self.proc.stdout:
while True:
chunk = file.readline()
if chunk.endswith(self.SEPARATOR):
break
output.write(chunk)
elif file == self.proc.stderr:
assert self.proc.stdout.readline() == self.SEPARATOR
raise KeeperException(self.proc.stderr.readline().strip().decode())
else:
raise ValueError(f"Failed to read from pipe. Flag {event}")
data = output.getvalue().strip().decode()
return data
def cd(self, path: str, timeout: float = 60.0):
self.execute_query(f"cd {path}", timeout)
def ls(self, path: str, timeout: float = 60.0) -> list[str]:
return self.execute_query(f"ls {path}", timeout).split(" ")
def create(self, path: str, value: str, timeout: float = 60.0):
self.execute_query(f"create {path} {value}", timeout)
def get(self, path: str, timeout: float = 60.0) -> str:
return self.execute_query(f"get {path}", timeout)
def set(self, path: str, value: str, version: tp.Optional[int] = None) -> None:
self.execute_query(
f"set {path} {value} {version if version is not None else ''}"
)
def rm(self, path: str, version: tp.Optional[int] = None) -> None:
self.execute_query(f"rm {path} {version if version is not None else ''}")
def exists(self, path: str, timeout: float = 60.0) -> bool:
return bool(int(self.execute_query(f"exists {path}", timeout)))
def stop(self):
if not self.stopped:
self.stopped = True
self.proc.communicate(b"exit\n", timeout=10.0)
def sync(self, path: str, timeout: float = 60.0):
self.execute_query(f"sync {path}", timeout)
def touch(self, path: str, timeout: float = 60.0):
self.execute_query(f"touch {path}", timeout)
def find_big_family(self, path: str, n: int = 10, timeout: float = 60.0) -> str:
return self.execute_query(f"find_big_family {path} {n}", timeout)
def find_super_nodes(self, threshold: int, timeout: float = 60.0) -> str:
return self.execute_query(f"find_super_nodes {threshold}", timeout)
def delete_stale_backups(self, timeout: float = 60.0) -> str:
return self.execute_query("delete_stale_backups", timeout)
def reconfig(
self,
joining: tp.Optional[str],
leaving: tp.Optional[str],
new_members: tp.Optional[str],
timeout: float = 60.0,
) -> str:
if bool(joining) + bool(leaving) + bool(new_members) != 1:
raise ValueError(
"Exactly one of joining, leaving or new_members must be specified"
)
if joining is not None:
operation = "add"
elif leaving is not None:
operation = "remove"
elif new_members is not None:
operation = "set"
else:
raise ValueError(
"At least one of joining, leaving or new_members must be specified"
)
return self.execute_query(
f"reconfig {operation} {joining or leaving or new_members}", timeout
)
@classmethod
@contextlib.contextmanager
def from_cluster(
cls, cluster: ClickHouseCluster, keeper_node: str, port: tp.Optional[int] = None
) -> "KeeperClient":
client = cls(
cluster.server_bin_path,
cluster.get_instance_ip(keeper_node),
port or cluster.zookeeper_port,
)
try:
yield client
finally:
client.stop()
def get_keeper_socket(cluster, node, port=9181):
hosts = cluster.get_instance_ip(node.name)
client = socket.socket()
client.settimeout(10)
client.connect((hosts, port))
return client
def send_4lw_cmd(cluster, node, cmd="ruok", port=9181):
client = None
try:
client = get_keeper_socket(cluster, node, port)
client.send(cmd.encode())
data = client.recv(100_000)
data = data.decode()
return data
finally:
if client is not None:
client.close()
NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving requests"
def wait_until_connected(cluster, node, port=9181, timeout=30.0):
elapsed = 0.0
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
elapsed += 0.1
if elapsed >= timeout:
raise Exception(
f"{timeout}s timeout while waiting for {node.name} to start serving requests"
)
def wait_until_quorum_lost(cluster, node, port=9181):
while send_4lw_cmd(cluster, node, "mntr", port) != NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
def wait_nodes(cluster, nodes):
for node in nodes:
wait_until_connected(cluster, node)
def is_leader(cluster, node, port=9181):
stat = send_4lw_cmd(cluster, node, "stat", port)
return "Mode: leader" in stat
def get_leader(cluster, nodes):
for node in nodes:
if is_leader(cluster, node):
return node
raise Exception("No leader in Keeper cluster.")
def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
_fake = KazooClient(
hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout
)
_fake.start()
return _fake
def get_config_str(zk: KeeperClient) -> str:
"""
Return decoded contents of /keeper/config node
"""
return zk.get("/keeper/config")
def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float = 30.0):
"""
Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection.
"""
elapsed: float = 0.0
while sorted(left_config.split("\n")) != sorted(
get_config_str(right_zk).split("\n")
):
time.sleep(1)
elapsed += 1
if elapsed >= timeout:
raise Exception(
f"timeout while checking nodes configs to get equal. "
f"Left: {left_config}, right: {get_config_str(right_zk)}"
)