ClickHouse/tests/integration/test_keeper_persistent_log/test.py
2022-03-22 17:39:58 +01:00

210 lines
6.0 KiB
Python

#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper.xml", "configs/use_keeper.xml"],
stay_alive=True,
)
def random_string(length):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
def create_random_path(prefix="", depth=1):
if depth == 0:
return prefix
return create_random_path(os.path.join(prefix, random_string(3)), depth - 1)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_connection_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
_fake_zk_instance.start()
return _fake_zk_instance
def test_state_after_restart(started_cluster):
try:
node_zk = None
node_zk2 = None
node_zk = get_connection_zk("node")
node_zk.create("/test_state_after_restart", b"somevalue")
strs = []
for i in range(100):
strs.append(random_string(123).encode())
node_zk.create("/test_state_after_restart/node" + str(i), strs[i])
for i in range(100):
if i % 7 == 0:
node_zk.delete("/test_state_after_restart/node" + str(i))
node.restart_clickhouse(kill=True)
node_zk2 = get_connection_zk("node")
assert node_zk2.get("/test_state_after_restart")[0] == b"somevalue"
for i in range(100):
if i % 7 == 0:
assert (
node_zk2.exists("/test_state_after_restart/node" + str(i)) is None
)
else:
assert (
len(node_zk2.get("/test_state_after_restart/node" + str(i))[0])
== 123
)
assert (
node_zk2.get("/test_state_after_restart/node" + str(i))[0]
== strs[i]
)
finally:
try:
if node_zk is not None:
node_zk.stop()
node_zk.close()
if node_zk2 is not None:
node_zk2.stop()
node_zk2.close()
except:
pass
def test_state_duplicate_restart(started_cluster):
try:
node_zk = None
node_zk2 = None
node_zk3 = None
node_zk = get_connection_zk("node")
node_zk.create("/test_state_duplicated_restart", b"somevalue")
strs = []
for i in range(100):
strs.append(random_string(123).encode())
node_zk.create("/test_state_duplicated_restart/node" + str(i), strs[i])
for i in range(100):
if i % 7 == 0:
node_zk.delete("/test_state_duplicated_restart/node" + str(i))
node.restart_clickhouse(kill=True)
node_zk2 = get_connection_zk("node")
node_zk2.create("/test_state_duplicated_restart/just_test1")
node_zk2.create("/test_state_duplicated_restart/just_test2")
node_zk2.create("/test_state_duplicated_restart/just_test3")
node.restart_clickhouse(kill=True)
node_zk3 = get_connection_zk("node")
assert node_zk3.get("/test_state_duplicated_restart")[0] == b"somevalue"
for i in range(100):
if i % 7 == 0:
assert (
node_zk3.exists("/test_state_duplicated_restart/node" + str(i))
is None
)
else:
assert (
len(node_zk3.get("/test_state_duplicated_restart/node" + str(i))[0])
== 123
)
assert (
node_zk3.get("/test_state_duplicated_restart/node" + str(i))[0]
== strs[i]
)
finally:
try:
if node_zk is not None:
node_zk.stop()
node_zk.close()
if node_zk2 is not None:
node_zk2.stop()
node_zk2.close()
if node_zk3 is not None:
node_zk3.stop()
node_zk3.close()
except:
pass
# http://zookeeper-user.578899.n2.nabble.com/Why-are-ephemeral-nodes-written-to-disk-tp7583403p7583418.html
def test_ephemeral_after_restart(started_cluster):
try:
node_zk = None
node_zk2 = None
node_zk = get_connection_zk("node")
node_zk.create("/test_ephemeral_after_restart", b"somevalue")
strs = []
for i in range(100):
strs.append(random_string(123).encode())
node_zk.create(
"/test_ephemeral_after_restart/node" + str(i), strs[i], ephemeral=True
)
for i in range(100):
if i % 7 == 0:
node_zk.delete("/test_ephemeral_after_restart/node" + str(i))
node.restart_clickhouse(kill=True)
node_zk2 = get_connection_zk("node")
assert node_zk2.get("/test_ephemeral_after_restart")[0] == b"somevalue"
for i in range(100):
if i % 7 == 0:
assert (
node_zk2.exists("/test_ephemeral_after_restart/node" + str(i))
is None
)
else:
assert (
len(node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0])
== 123
)
assert (
node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0]
== strs[i]
)
finally:
try:
if node_zk is not None:
node_zk.stop()
node_zk.close()
if node_zk2 is not None:
node_zk2.stop()
node_zk2.close()
except:
pass