mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 13:42:02 +00:00
220 lines
7.0 KiB
Python
Executable File
220 lines
7.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import subprocess
|
|
import signal
|
|
import sys
|
|
import time
|
|
import random
|
|
from argparse import ArgumentParser
|
|
|
|
|
|
XML_TEMPLATE = """
|
|
<clickhouse>
|
|
<logger>
|
|
<level>trace</level>
|
|
<log>{data_dir}/clickhouse-keeper.log</log>
|
|
<errorlog>{data_dir}/clickhouse-keeper.err.log</errorlog>
|
|
<stderr>{data_dir}/stderr.log</stderr>
|
|
<stdout>{data_dir}/stdout.log</stdout>
|
|
<size>1000M</size>
|
|
<count>10</count>
|
|
<console>0</console>
|
|
</logger>
|
|
|
|
<keeper_server>
|
|
<tcp_port>{client_port}</tcp_port>
|
|
<server_id>{server_id}</server_id>
|
|
<log_storage_path>{data_dir}/log</log_storage_path>
|
|
<snapshot_storage_path>{data_dir}/snapshots</snapshot_storage_path>
|
|
|
|
<coordination_settings>
|
|
<operation_timeout_ms>5000</operation_timeout_ms>
|
|
<session_timeout_ms>10000</session_timeout_ms>
|
|
<raft_logs_level>trace</raft_logs_level>
|
|
<snapshot_distance>1</snapshot_distance>
|
|
</coordination_settings>
|
|
|
|
<raft_configuration>
|
|
<server>
|
|
<id>1</id>
|
|
<hostname>localhost</hostname>
|
|
<port>9234</port>
|
|
</server>
|
|
<server>
|
|
<id>2</id>
|
|
<hostname>localhost</hostname>
|
|
<port>9235</port>
|
|
</server>
|
|
<server>
|
|
<id>3</id>
|
|
<hostname>localhost</hostname>
|
|
<port>9236</port>
|
|
</server>
|
|
</raft_configuration>
|
|
</keeper_server>
|
|
</clickhouse>
|
|
|
|
"""
|
|
|
|
|
|
class Keeper:
|
|
def __init__(
|
|
self, keeper_binary_path, server_id, client_port, workdir, with_thread_fuzzer
|
|
):
|
|
self.keeper_binary_path = keeper_binary_path
|
|
if not os.path.exists(self.keeper_binary_path):
|
|
raise Exception(
|
|
f"Path for keeper binary doesn't exist {self.keeper_binary_path}"
|
|
)
|
|
self.server_id = server_id
|
|
self.client_port = client_port
|
|
self.workdir = workdir
|
|
self.data_dir = os.path.join(self.workdir, f"data_server_{self.server_id}")
|
|
self.config_dir = os.path.join(self.workdir, f"config_server_{self.server_id}")
|
|
self.config_path = os.path.join(self.config_dir, "config.xml")
|
|
self.with_thread_fuzzer = with_thread_fuzzer
|
|
self.process = None
|
|
|
|
def _create_dirs(self):
|
|
if not os.path.exists(self.workdir):
|
|
os.makedirs(self.workdir)
|
|
for p in [self.data_dir, self.config_dir]:
|
|
if not os.path.exists(p):
|
|
os.mkdir(p)
|
|
|
|
def _prepare_and_write_config(self):
|
|
if not os.path.exists(self.config_path):
|
|
config = XML_TEMPLATE.format(
|
|
client_port=self.client_port,
|
|
server_id=self.server_id,
|
|
data_dir=self.data_dir,
|
|
)
|
|
with open(self.config_path, "w", encoding="utf-8") as f:
|
|
f.write(config)
|
|
|
|
def prepare(self):
|
|
self._create_dirs()
|
|
self._prepare_and_write_config()
|
|
|
|
def start(self):
|
|
env = os.environ.copy()
|
|
if self.with_thread_fuzzer:
|
|
env["THREAD_FUZZER_CPU_TIME_PERIOD_US"] = "1000"
|
|
env["THREAD_FUZZER_SLEEP_PROBABILITY"] = "0.1"
|
|
env["THREAD_FUZZER_SLEEP_TIME_US"] = "100000"
|
|
env["THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY"] = "1"
|
|
env["THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY"] = "1"
|
|
env["THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY"] = "1"
|
|
env["THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY"] = "1"
|
|
|
|
env["THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY"] = "0.001"
|
|
env["THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY"] = "0.001"
|
|
env["THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY"] = "0.001"
|
|
env["THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY"] = "0.001"
|
|
env["THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US"] = "10000"
|
|
env["THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US"] = "10000"
|
|
env["THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US"] = "10000"
|
|
env["THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US"] = "10000"
|
|
|
|
self.process = subprocess.Popen(
|
|
[
|
|
self.keeper_binary_path,
|
|
"--config",
|
|
self.config_path,
|
|
"--log-file",
|
|
f"{self.data_dir}/clickhouse-keeper.log",
|
|
"--errorlog-file",
|
|
f"{self.data_dir}/clickhouse-keeper.err.log",
|
|
],
|
|
env=env,
|
|
)
|
|
|
|
def restart(self):
|
|
print("Restarting keeper", self.server_id)
|
|
self.stop()
|
|
self.start()
|
|
|
|
def stop(self):
|
|
if self.process:
|
|
self.process.kill()
|
|
self.process = None
|
|
|
|
|
|
class KeeperBench:
|
|
def __init__(self, bench_binary_path, client_ports):
|
|
self.bench_binary_path = bench_binary_path
|
|
self.client_ports = client_ports
|
|
self.process = None
|
|
|
|
def start(self):
|
|
hosts = " ".join([f"--hosts localhost:{port}" for port in self.client_ports])
|
|
cmd = f"{self.bench_binary_path} {hosts} --continue_on_errors -c 32 --generator create_small_data"
|
|
self.process = subprocess.Popen(cmd, shell=True)
|
|
|
|
def stop(self):
|
|
if self.process:
|
|
self.process.kill()
|
|
self.process = None
|
|
|
|
|
|
def main(args):
|
|
PORTS = [9181, 9182, 9183]
|
|
SERVER_IDS = [1, 2, 3]
|
|
workdir = args.workdir
|
|
keeper_binary_path = args.keeper_binary_path
|
|
keeper_bench_path = args.keeper_bench_path
|
|
|
|
keepers = []
|
|
for port, server_id in zip(PORTS, SERVER_IDS):
|
|
keepers.append(
|
|
Keeper(
|
|
keeper_binary_path, server_id, port, workdir, args.with_thread_fuzzer
|
|
)
|
|
)
|
|
|
|
bench = KeeperBench(keeper_bench_path, PORTS)
|
|
|
|
for keeper in keepers:
|
|
keeper.prepare()
|
|
keeper.start()
|
|
|
|
time.sleep(10)
|
|
|
|
bench.start()
|
|
|
|
def signal_handler(sig, frame):
|
|
print("You pressed Ctrl+C!")
|
|
for keeper in keepers:
|
|
keeper.stop()
|
|
|
|
bench.stop()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
for _ in range(100):
|
|
time.sleep(30)
|
|
sacrifice = random.choice(keepers)
|
|
sacrifice.restart()
|
|
time.sleep(60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = ArgumentParser(description="Simple tool to load three-nodes keeper")
|
|
parser.add_argument(
|
|
"--workdir", default="./keeper-overload-workdir", help="Path to workdir"
|
|
)
|
|
parser.add_argument(
|
|
"--keeper-binary-path", required=True, help="Path to keeper binary"
|
|
)
|
|
parser.add_argument(
|
|
"--keeper-bench-path", required=True, help="Path to keeper bench utility"
|
|
)
|
|
parser.add_argument(
|
|
"--with-thread-fuzzer", action="store_true", help="Path to keeper bench utility"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
main(args)
|