mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 20:24:07 +00:00
223 lines
6.3 KiB
Python
223 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import random
|
|
import string
|
|
import time
|
|
from multiprocessing.dummy import Pool
|
|
|
|
import pytest
|
|
from kazoo.client import KazooClient, KazooRetry
|
|
from kazoo.handlers.threading import KazooTimeoutError
|
|
|
|
import helpers.keeper_utils as keeper_utils
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance(
|
|
"node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True
|
|
)
|
|
node2 = cluster.add_instance(
|
|
"node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True
|
|
)
|
|
node3 = cluster.add_instance(
|
|
"node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True
|
|
)
|
|
|
|
|
|
def start_zookeeper(node):
|
|
node.exec_in_container(["bash", "-c", "/opt/zookeeper/bin/zkServer.sh start"])
|
|
|
|
|
|
def stop_zookeeper(node):
|
|
node.exec_in_container(["bash", "-c", "/opt/zookeeper/bin/zkServer.sh stop"])
|
|
timeout = time.time() + 60
|
|
while node.get_process_pid("zookeeper") != None:
|
|
if time.time() > timeout:
|
|
raise Exception("Failed to stop ZooKeeper in 60 secs")
|
|
time.sleep(0.2)
|
|
|
|
|
|
def generate_zk_snapshot(node):
|
|
for _ in range(100):
|
|
stop_zookeeper(node)
|
|
start_zookeeper(node)
|
|
time.sleep(2)
|
|
stop_zookeeper(node)
|
|
|
|
# get last snapshot
|
|
last_snapshot = node.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
"find /zookeeper/version-2 -name 'snapshot.*' -printf '%T@ %p\n' | sort -n | awk 'END {print $2}'",
|
|
]
|
|
).strip()
|
|
|
|
print(f"Latest snapshot: {last_snapshot}")
|
|
|
|
try:
|
|
# verify last snapshot
|
|
# zkSnapShotToolkit is a tool to inspect generated snapshots - if it's broken, an exception is thrown
|
|
node.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
f"/opt/zookeeper/bin/zkSnapShotToolkit.sh {last_snapshot}",
|
|
]
|
|
)
|
|
return
|
|
except Exception as err:
|
|
print(f"Got error while reading snapshot: {err}")
|
|
|
|
raise Exception("Failed to generate a ZooKeeper snapshot")
|
|
|
|
|
|
def clear_zookeeper(node):
|
|
node.exec_in_container(["bash", "-c", "rm -fr /zookeeper/*"])
|
|
|
|
|
|
def restart_and_clear_zookeeper(node):
|
|
stop_zookeeper(node)
|
|
clear_zookeeper(node)
|
|
start_zookeeper(node)
|
|
|
|
|
|
def restart_zookeeper(node):
|
|
stop_zookeeper(node)
|
|
start_zookeeper(node)
|
|
|
|
|
|
def clear_clickhouse_data(node):
|
|
node.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
"rm -fr /var/lib/clickhouse/coordination/logs/* /var/lib/clickhouse/coordination/snapshots/*",
|
|
]
|
|
)
|
|
|
|
|
|
def convert_zookeeper_data(node):
|
|
node.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
"tar -cvzf /var/lib/clickhouse/zk-data.tar.gz /zookeeper/version-2",
|
|
]
|
|
)
|
|
cmd = "/usr/bin/clickhouse keeper-converter --zookeeper-logs-dir /zookeeper/version-2/ --zookeeper-snapshots-dir /zookeeper/version-2/ --output-dir /var/lib/clickhouse/coordination/snapshots"
|
|
node.exec_in_container(["bash", "-c", cmd])
|
|
return os.path.join(
|
|
"/var/lib/clickhouse/coordination/snapshots",
|
|
node.exec_in_container(
|
|
["bash", "-c", "ls /var/lib/clickhouse/coordination/snapshots"]
|
|
).strip(),
|
|
)
|
|
|
|
|
|
def stop_clickhouse(node):
|
|
node.stop_clickhouse()
|
|
|
|
|
|
def start_clickhouse(node):
|
|
node.start_clickhouse()
|
|
keeper_utils.wait_until_connected(cluster, node)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_fake_zk(node, timeout=30.0):
|
|
_fake_zk_instance = KazooClient(
|
|
hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout
|
|
)
|
|
_fake_zk_instance.start()
|
|
return _fake_zk_instance
|
|
|
|
|
|
def get_genuine_zk(node, timeout=30.0):
|
|
CONNECTION_RETRIES = 100
|
|
for i in range(CONNECTION_RETRIES):
|
|
try:
|
|
_genuine_zk_instance = KazooClient(
|
|
hosts=cluster.get_instance_ip(node.name) + ":2181",
|
|
timeout=timeout,
|
|
connection_retry=KazooRetry(max_tries=20),
|
|
)
|
|
_genuine_zk_instance.start()
|
|
return _genuine_zk_instance
|
|
except KazooTimeoutError:
|
|
if i == CONNECTION_RETRIES - 1:
|
|
raise
|
|
|
|
print(
|
|
"Failed to connect to ZK cluster because of timeout. Restarting cluster and trying again."
|
|
)
|
|
time.sleep(0.2)
|
|
restart_zookeeper(node)
|
|
|
|
|
|
def test_snapshot_and_load(started_cluster):
|
|
genuine_connection = None
|
|
fake_zks = []
|
|
|
|
try:
|
|
restart_and_clear_zookeeper(node1)
|
|
genuine_connection = get_genuine_zk(node1)
|
|
|
|
for node in [node1, node2, node3]:
|
|
print("Stop and clear", node.name, "with dockerid", node.docker_id)
|
|
stop_clickhouse(node)
|
|
clear_clickhouse_data(node)
|
|
|
|
for i in range(1000):
|
|
genuine_connection.create("/test" + str(i), b"data")
|
|
|
|
print("Data loaded to zookeeper")
|
|
|
|
generate_zk_snapshot(node1)
|
|
|
|
print("Data copied to node1")
|
|
resulted_path = convert_zookeeper_data(node1)
|
|
print("Resulted path", resulted_path)
|
|
for node in [node2, node3]:
|
|
print("Copy snapshot from", node1.name, "to", node.name)
|
|
cluster.copy_file_from_container_to_container(
|
|
node1, resulted_path, node, "/var/lib/clickhouse/coordination/snapshots"
|
|
)
|
|
|
|
print("Starting clickhouses")
|
|
|
|
p = Pool(3)
|
|
result = p.map_async(start_clickhouse, [node1, node2, node3])
|
|
result.wait()
|
|
|
|
print("Loading additional data")
|
|
fake_zks = [get_fake_zk(node) for node in [node1, node2, node3]]
|
|
for i in range(1000):
|
|
fake_zk = random.choice(fake_zks)
|
|
try:
|
|
fake_zk.create("/test" + str(i + 1000), b"data")
|
|
except Exception as ex:
|
|
print("Got exception:" + str(ex))
|
|
|
|
print("Final")
|
|
fake_zks[0].create("/test10000", b"data")
|
|
finally:
|
|
for zk in fake_zks:
|
|
if zk:
|
|
zk.stop()
|
|
zk.close()
|
|
if genuine_connection:
|
|
genuine_connection.stop()
|
|
genuine_connection.close()
|