#!/usr/bin/env python3 #!/usr/bin/env python3 import pytest from helpers.cluster import ClickHouseCluster import random import string import os import time from kazoo.client import KazooClient, KazooState cluster = ClickHouseCluster(__file__) # clickhouse itself will use external zookeeper node = cluster.add_instance('node', main_configs=['configs/enable_keeper.xml', 'configs/logs_conf.xml'], stay_alive=True, with_zookeeper=True) def random_string(length): return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) def create_random_path(prefix="", depth=1): if depth == 0: return prefix return create_random_path(os.path.join(prefix, random_string(3)), depth - 1) @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() def get_connection_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) def reset_listener(state): nonlocal _fake_zk_instance print("Fake zk callback called for state", state) if state != KazooState.CONNECTED: _fake_zk_instance._reset() _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance def test_state_after_restart(started_cluster): try: node_zk = None node_zk2 = None node_zk = get_connection_zk("node") node_zk.create("/test_state_after_restart", b"somevalue") strs = [] for i in range(100): strs.append(random_string(123).encode()) node_zk.create("/test_state_after_restart/node" + str(i), strs[i]) existing_children = [] for i in range(100): if i % 7 == 0: node_zk.delete("/test_state_after_restart/node" + str(i)) else: existing_children.append("node" + str(i)) node.restart_clickhouse(kill=True) node_zk2 = get_connection_zk("node") assert node_zk2.get("/test_state_after_restart")[0] == b"somevalue" for i in range(100): if i % 7 == 0: assert node_zk2.exists("/test_state_after_restart/node" + str(i)) is None else: data, stat = node_zk2.get("/test_state_after_restart/node" + str(i)) assert len(data) == 123 assert data == strs[i] assert stat.ephemeralOwner == 0 assert list(sorted(existing_children)) == list(sorted(node_zk2.get_children("/test_state_after_restart"))) finally: try: if node_zk is not None: node_zk.stop() node_zk.close() if node_zk2 is not None: node_zk2.stop() node_zk2.close() except: pass def test_ephemeral_after_restart(started_cluster): try: node_zk = None node_zk2 = None node_zk = get_connection_zk("node") session_id = node_zk._session_id node_zk.create("/test_ephemeral_after_restart", b"somevalue") strs = [] for i in range(100): strs.append(random_string(123).encode()) node_zk.create("/test_ephemeral_after_restart/node" + str(i), strs[i], ephemeral=True) existing_children = [] for i in range(100): if i % 7 == 0: node_zk.delete("/test_ephemeral_after_restart/node" + str(i)) else: existing_children.append("node" + str(i)) node.restart_clickhouse(kill=True) node_zk2 = get_connection_zk("node") assert node_zk2.get("/test_ephemeral_after_restart")[0] == b"somevalue" for i in range(100): if i % 7 == 0: assert node_zk2.exists("/test_ephemeral_after_restart/node" + str(i)) is None else: data, stat = node_zk2.get("/test_ephemeral_after_restart/node" + str(i)) assert len(data) == 123 assert data == strs[i] assert stat.ephemeralOwner == session_id assert list(sorted(existing_children)) == list(sorted(node_zk2.get_children("/test_ephemeral_after_restart"))) finally: try: if node_zk is not None: node_zk.stop() node_zk.close() if node_zk2 is not None: node_zk2.stop() node_zk2.close() except: pass