ClickHouse/tests/ci/jepsen_check.py

313 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, List
import boto3 # type: ignore
import requests # type: ignore
from github import Github
from build_download_helper import get_build_name_for_check
from clickhouse_helper import ClickHouseHelper, prepare_tests_results_for_clickhouse
from commit_status_helper import RerunHelper, get_commit, post_commit_status
from compress_files import compress_fast
from env_helper import REPO_COPY, TEMP_PATH, S3_BUILDS_BUCKET, S3_DOWNLOAD
from get_robot_token import get_best_robot_token, get_parameter_from_ssm
from pr_info import PRInfo
from report import TestResults, TestResult
from s3_helper import S3Helper
from ssh import SSHKey
from stopwatch import Stopwatch
from tee_popen import TeePopen
from upload_result_helper import upload_results
from version_helper import get_version_from_repo
from build_check import get_release_or_pr
JEPSEN_GROUP_NAME = "jepsen_group"
KEEPER_DESIRED_INSTANCE_COUNT = 3
SERVER_DESIRED_INSTANCE_COUNT = 4
KEEPER_IMAGE_NAME = "clickhouse/keeper-jepsen-test"
KEEPER_CHECK_NAME = "ClickHouse Keeper Jepsen"
SERVER_IMAGE_NAME = "clickhouse/server-jepsen-test"
SERVER_CHECK_NAME = "ClickHouse Server Jepsen"
SUCCESSFUL_TESTS_ANCHOR = "# Successful tests"
INTERMINATE_TESTS_ANCHOR = "# Indeterminate tests"
CRASHED_TESTS_ANCHOR = "# Crashed tests"
FAILED_TESTS_ANCHOR = "# Failed tests"
def _parse_jepsen_output(path: Path) -> TestResults:
test_results = [] # type: TestResults
current_type = ""
with open(path, "r") as f:
for line in f:
if SUCCESSFUL_TESTS_ANCHOR in line:
current_type = "OK"
elif INTERMINATE_TESTS_ANCHOR in line or CRASHED_TESTS_ANCHOR in line:
current_type = "ERROR"
elif FAILED_TESTS_ANCHOR in line:
current_type = "FAIL"
if (
line.startswith("store/clickhouse") or line.startswith("clickhouse")
) and current_type:
test_results.append(TestResult(line.strip(), current_type))
return test_results
def get_autoscaling_group_instances_ids(asg_client, group_name):
group_description = asg_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[group_name]
)
our_group = group_description["AutoScalingGroups"][0]
instance_ids = []
for instance in our_group["Instances"]:
if (
instance["LifecycleState"] == "InService"
and instance["HealthStatus"] == "Healthy"
):
instance_ids.append(instance["InstanceId"])
return instance_ids
def get_instances_addresses(ec2_client, instance_ids):
ec2_response = ec2_client.describe_instances(InstanceIds=instance_ids)
instance_ips = []
for instances in ec2_response["Reservations"]:
for ip in instances["Instances"]:
instance_ips.append(ip["PrivateIpAddress"])
return instance_ips
def prepare_autoscaling_group_and_get_hostnames(count):
asg_client = boto3.client("autoscaling", region_name="us-east-1")
asg_client.set_desired_capacity(
AutoScalingGroupName=JEPSEN_GROUP_NAME, DesiredCapacity=count
)
instances = get_autoscaling_group_instances_ids(asg_client, JEPSEN_GROUP_NAME)
counter = 0
while len(instances) < count:
time.sleep(5)
instances = get_autoscaling_group_instances_ids(asg_client, JEPSEN_GROUP_NAME)
counter += 1
if counter > 30:
raise Exception("Cannot wait autoscaling group")
ec2_client = boto3.client("ec2", region_name="us-east-1")
return get_instances_addresses(ec2_client, instances)
def clear_autoscaling_group():
asg_client = boto3.client("autoscaling", region_name="us-east-1")
asg_client.set_desired_capacity(
AutoScalingGroupName=JEPSEN_GROUP_NAME, DesiredCapacity=0
)
instances = get_autoscaling_group_instances_ids(asg_client, JEPSEN_GROUP_NAME)
counter = 0
while len(instances) > 0:
time.sleep(5)
instances = get_autoscaling_group_instances_ids(asg_client, JEPSEN_GROUP_NAME)
counter += 1
if counter > 30:
raise Exception("Cannot wait autoscaling group")
def save_nodes_to_file(instances: List[Any], temp_path: Path) -> Path:
nodes_path = temp_path / "nodes.txt"
with open(nodes_path, "w") as f:
f.write("\n".join(instances))
f.flush()
return nodes_path
def get_run_command(
ssh_auth_sock,
ssh_sock_dir,
pr_info,
nodes_path,
repo_path,
build_url,
result_path,
extra_args,
docker_image,
):
return (
f"docker run --network=host -v '{ssh_sock_dir}:{ssh_sock_dir}' -e SSH_AUTH_SOCK={ssh_auth_sock} "
f"-e PR_TO_TEST={pr_info.number} -e SHA_TO_TEST={pr_info.sha} -v '{nodes_path}:/nodes.txt' -v {result_path}:/test_output "
f"-e 'CLICKHOUSE_PACKAGE={build_url}' -v '{repo_path}:/ch' -e 'CLICKHOUSE_REPO_PATH=/ch' -e NODES_USERNAME=ubuntu {extra_args} {docker_image}"
)
def main():
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
prog="Jepsen Check",
description="Check that uses Jepsen. Both Keeper and Server can be tested.",
)
parser.add_argument(
"program", help='What should be tested. Valid values "keeper", "server"'
)
args = parser.parse_args()
if args.program != "server" and args.program != "keeper":
logging.warning("Invalid argument '%s'", args.program)
sys.exit(0)
stopwatch = Stopwatch()
temp_path = Path(TEMP_PATH)
temp_path.mkdir(parents=True, exist_ok=True)
pr_info = PRInfo()
logging.info(
"Start at PR number %s, commit sha %s labels %s",
pr_info.number,
pr_info.sha,
pr_info.labels,
)
if pr_info.number != 0 and "jepsen-test" not in pr_info.labels:
logging.info("Not jepsen test label in labels list, skipping")
sys.exit(0)
gh = Github(get_best_robot_token(), per_page=100)
commit = get_commit(gh, pr_info.sha)
check_name = KEEPER_CHECK_NAME if args.program == "keeper" else SERVER_CHECK_NAME
rerun_helper = RerunHelper(commit, check_name)
if rerun_helper.is_already_finished_by_status():
logging.info("Check is already finished according to github status, exiting")
sys.exit(0)
if not os.path.exists(TEMP_PATH):
os.makedirs(TEMP_PATH)
result_path = temp_path / "result_path"
result_path.mkdir(parents=True, exist_ok=True)
instances = prepare_autoscaling_group_and_get_hostnames(
KEEPER_DESIRED_INSTANCE_COUNT
if args.program == "keeper"
else SERVER_DESIRED_INSTANCE_COUNT
)
nodes_path = save_nodes_to_file(
instances[:KEEPER_DESIRED_INSTANCE_COUNT], temp_path
)
# always use latest
docker_image = KEEPER_IMAGE_NAME if args.program == "keeper" else SERVER_IMAGE_NAME
build_name = get_build_name_for_check(check_name)
release_or_pr, _ = get_release_or_pr(pr_info, get_version_from_repo())
# This check run separately from other checks because it requires exclusive
# run (see .github/workflows/jepsen.yml) So we cannot add explicit
# dependency on a build job and using busy loop on it's results. For the
# same reason we are using latest docker image.
build_url = (
f"{S3_DOWNLOAD}/{S3_BUILDS_BUCKET}/{release_or_pr}/{pr_info.sha}/"
f"{build_name}/clickhouse"
)
head = requests.head(build_url)
counter = 0
while head.status_code != 200:
time.sleep(10)
head = requests.head(build_url)
counter += 1
if counter >= 180:
logging.warning("Cannot fetch build in 30 minutes, exiting")
sys.exit(0)
extra_args = ""
if args.program == "server":
extra_args = f"-e KEEPER_NODE={instances[-1]}"
with SSHKey(key_value=get_parameter_from_ssm("jepsen_ssh_key") + "\n"):
ssh_auth_sock = os.environ["SSH_AUTH_SOCK"]
auth_sock_dir = os.path.dirname(ssh_auth_sock)
cmd = get_run_command(
ssh_auth_sock,
auth_sock_dir,
pr_info,
nodes_path,
REPO_COPY,
build_url,
result_path,
extra_args,
docker_image,
)
logging.info("Going to run jepsen: %s", cmd)
run_log_path = temp_path / "run.log"
with TeePopen(cmd, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
status = "success"
description = "No invalid analysis found ヽ(‘ー`)"
jepsen_log_path = result_path / "jepsen_run_all_tests.log"
additional_data = []
try:
test_result = _parse_jepsen_output(jepsen_log_path)
if any(r.status == "FAIL" for r in test_result):
status = "failure"
description = "Found invalid analysis (ノಥ益ಥ)ノ ┻━┻"
compress_fast(result_path / "store", result_path / "jepsen_store.tar.zst")
additional_data.append(result_path / "jepsen_store.tar.zst")
except Exception as ex:
print("Exception", ex)
status = "failure"
description = "No Jepsen output log"
test_result = [TestResult("No Jepsen output log", "FAIL")]
s3_helper = S3Helper()
report_url = upload_results(
s3_helper,
pr_info.number,
pr_info.sha,
test_result,
[run_log_path] + additional_data,
check_name,
)
print(f"::notice ::Report url: {report_url}")
post_commit_status(commit, status, report_url, description, check_name, pr_info)
ch_helper = ClickHouseHelper()
prepared_events = prepare_tests_results_for_clickhouse(
pr_info,
test_result,
status,
stopwatch.duration_seconds,
stopwatch.start_time_str,
report_url,
check_name,
)
ch_helper.insert_events_into(db="default", table="checks", events=prepared_events)
clear_autoscaling_group()
if __name__ == "__main__":
main()