Clean even more os.path

This commit is contained in:
Mikhail f. Shiryaev 2023-09-27 16:27:37 +02:00
parent ff58e152d8
commit c5b1aa4aa5
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4
11 changed files with 197 additions and 209 deletions

View File

@ -2,7 +2,6 @@
import logging
import subprocess
import os
import sys
from pathlib import Path
@ -39,7 +38,7 @@ IMAGE_NAME = "clickhouse/fuzzer"
def get_run_command(
pr_info: PRInfo,
build_url: str,
workspace_path: str,
workspace_path: Path,
ci_logs_args: str,
image: DockerImage,
) -> str:
@ -69,14 +68,12 @@ def main():
stopwatch = Stopwatch()
temp_path = TEMP_PATH
reports_path = REPORTS_PATH
temp_path = Path(TEMP_PATH)
temp_path.mkdir(parents=True, exist_ok=True)
reports_path = Path(REPORTS_PATH)
check_name = sys.argv[1]
if not os.path.exists(temp_path):
os.makedirs(temp_path)
pr_info = PRInfo()
gh = Github(get_best_robot_token(), per_page=100)
@ -90,7 +87,6 @@ def main():
docker_image = get_image_with_version(reports_path, IMAGE_NAME)
build_name = get_build_name_for_check(check_name)
print(build_name)
urls = read_build_urls(build_name, reports_path)
if not urls:
raise Exception("No build URLs found")
@ -104,10 +100,10 @@ def main():
logging.info("Got build url %s", build_url)
workspace_path = os.path.join(temp_path, "workspace")
if not os.path.exists(workspace_path):
os.makedirs(workspace_path)
ci_logs_credentials = CiLogsCredentials(Path(temp_path) / "export-logs-config.sh")
workspace_path = temp_path / "workspace"
workspace_path.mkdir(parents=True, exist_ok=True)
ci_logs_credentials = CiLogsCredentials(temp_path / "export-logs-config.sh")
ci_logs_args = ci_logs_credentials.get_docker_arguments(
pr_info, stopwatch.start_time_str, check_name
)
@ -121,8 +117,8 @@ def main():
)
logging.info("Going to run %s", run_command)
run_log_path = os.path.join(temp_path, "run.log")
main_log_path = os.path.join(workspace_path, "main.log")
run_log_path = temp_path / "run.log"
main_log_path = workspace_path / "main.log"
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
@ -132,7 +128,7 @@ def main():
logging.info("Run failed")
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
ci_logs_credentials.clean_ci_logs_from_credentials(Path(run_log_path))
ci_logs_credentials.clean_ci_logs_from_credentials(run_log_path)
check_name_lower = (
check_name.lower().replace("(", "").replace(")", "").replace(" ", "")
@ -141,40 +137,39 @@ def main():
paths = {
"run.log": run_log_path,
"main.log": main_log_path,
"fuzzer.log": os.path.join(workspace_path, "fuzzer.log"),
"report.html": os.path.join(workspace_path, "report.html"),
"core.zst": os.path.join(workspace_path, "core.zst"),
"dmesg.log": os.path.join(workspace_path, "dmesg.log"),
"fuzzer.log": workspace_path / "fuzzer.log",
"report.html": workspace_path / "report.html",
"core.zst": workspace_path / "core.zst",
"dmesg.log": workspace_path / "dmesg.log",
}
compressed_server_log_path = os.path.join(workspace_path, "server.log.zst")
if os.path.exists(compressed_server_log_path):
compressed_server_log_path = workspace_path / "server.log.zst"
if compressed_server_log_path.exists():
paths["server.log.zst"] = compressed_server_log_path
# The script can fail before the invocation of `zstd`, but we are still interested in its log:
not_compressed_server_log_path = os.path.join(workspace_path, "server.log")
if os.path.exists(not_compressed_server_log_path):
not_compressed_server_log_path = workspace_path / "server.log"
if not_compressed_server_log_path.exists():
paths["server.log"] = not_compressed_server_log_path
s3_helper = S3Helper()
for f in paths:
urls = []
report_url = ""
for file, path in paths.items():
try:
paths[f] = s3_helper.upload_test_report_to_s3(Path(paths[f]), s3_prefix + f)
url = s3_helper.upload_test_report_to_s3(path, s3_prefix + file)
report_url = url if file == "report.html" else report_url
urls.append(url)
except Exception as ex:
logging.info("Exception uploading file %s text %s", f, ex)
paths[f] = ""
logging.info("Exception uploading file %s text %s", file, ex)
# Try to get status message saved by the fuzzer
try:
with open(
os.path.join(workspace_path, "status.txt"), "r", encoding="utf-8"
) as status_f:
with open(workspace_path / "status.txt", "r", encoding="utf-8") as status_f:
status = status_f.readline().rstrip("\n")
with open(
os.path.join(workspace_path, "description.txt"), "r", encoding="utf-8"
) as desc_f:
with open(workspace_path / "description.txt", "r", encoding="utf-8") as desc_f:
description = desc_f.readline().rstrip("\n")
except:
status = "failure"
@ -186,9 +181,7 @@ def main():
if "fail" in status:
test_result.status = "FAIL"
if paths["report.html"]:
report_url = paths["report.html"]
else:
if not report_url:
report_url = upload_results(
s3_helper,
pr_info.number,
@ -196,7 +189,7 @@ def main():
[test_result],
[],
check_name,
[url for url in paths.values() if url],
urls,
)
ch_helper = ClickHouseHelper()

View File

@ -1,10 +1,10 @@
#!/usr/bin/env python3
from pathlib import Path
from typing import List, Tuple
import argparse
import csv
import logging
import os
from github import Github
@ -16,13 +16,13 @@ from s3_helper import S3Helper
from upload_result_helper import upload_results
def parse_args():
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("status", nargs="+", help="Path to status file")
parser.add_argument("files", nargs="+", type=Path, help="Path to status files")
return parser.parse_args()
def post_commit_status_from_file(file_path: str) -> List[str]:
def post_commit_status_from_file(file_path: Path) -> List[str]:
with open(file_path, "r", encoding="utf-8") as f:
res = list(csv.reader(f, delimiter="\t"))
if len(res) < 1:
@ -32,22 +32,24 @@ def post_commit_status_from_file(file_path: str) -> List[str]:
return res[0]
def process_result(file_path: str) -> Tuple[bool, TestResults]:
def process_result(file_path: Path) -> Tuple[bool, TestResults]:
test_results = [] # type: TestResults
state, report_url, description = post_commit_status_from_file(file_path)
prefix = os.path.basename(os.path.dirname(file_path))
prefix = file_path.parent.name
is_ok = state == "success"
if is_ok and report_url == "null":
return is_ok, test_results
status = f'OK: Bug reproduced (<a href="{report_url}">Report</a>)'
if not is_ok:
status = f'Bug is not reproduced (<a href="{report_url}">Report</a>)'
status = (
f'OK: Bug reproduced (<a href="{report_url}">Report</a>)'
if is_ok
else f'Bug is not reproduced (<a href="{report_url}">Report</a>)'
)
test_results.append(TestResult(f"{prefix}: {description}", status))
return is_ok, test_results
def process_all_results(file_paths: str) -> Tuple[bool, TestResults]:
def process_all_results(file_paths: List[Path]) -> Tuple[bool, TestResults]:
any_ok = False
all_results = []
for status_path in file_paths:
@ -59,12 +61,14 @@ def process_all_results(file_paths: str) -> Tuple[bool, TestResults]:
return any_ok, all_results
def main(args):
def main():
logging.basicConfig(level=logging.INFO)
args = parse_args()
status_files = args.files # type: List[Path]
check_name_with_group = "Bugfix validate check"
is_ok, test_results = process_all_results(args.status)
is_ok, test_results = process_all_results(status_files)
if not test_results:
logging.info("No results to upload")
@ -76,7 +80,7 @@ def main(args):
pr_info.number,
pr_info.sha,
test_results,
args.status,
status_files,
check_name_with_group,
)
@ -93,4 +97,4 @@ def main(args):
if __name__ == "__main__":
main(parse_args())
main()

View File

@ -4,7 +4,6 @@ from pathlib import Path
from typing import Tuple
import subprocess
import logging
import os
import sys
import time
@ -54,7 +53,7 @@ def _can_export_binaries(build_config: BuildConfig) -> bool:
def get_packager_cmd(
build_config: BuildConfig,
packager_path: str,
packager_path: Path,
output_path: Path,
cargo_cache_dir: Path,
build_version: str,
@ -105,12 +104,12 @@ def build_clickhouse(
with TeePopen(packager_cmd, build_log_path) as process:
retcode = process.wait()
if build_output_path.exists():
build_results = os.listdir(build_output_path)
results_exists = any(build_output_path.iterdir())
else:
build_results = []
results_exists = False
if retcode == 0:
if len(build_results) > 0:
if results_exists:
success = True
logging.info("Built successfully")
else:
@ -130,7 +129,7 @@ def check_for_success_run(
) -> None:
# TODO: Remove after S3 artifacts
# the final empty argument is necessary for distinguish build and build_suffix
logged_prefix = os.path.join(S3_BUILDS_BUCKET, s3_prefix, "")
logged_prefix = "/".join((S3_BUILDS_BUCKET, s3_prefix, ""))
logging.info("Checking for artifacts in %s", logged_prefix)
try:
# Performance artifacts are now part of regular build, so we're safe
@ -223,11 +222,12 @@ def main():
build_config = CI_CONFIG.build_config[build_name]
temp_path = Path(TEMP_PATH)
os.makedirs(temp_path, exist_ok=True)
temp_path.mkdir(parents=True, exist_ok=True)
repo_path = Path(REPO_COPY)
pr_info = PRInfo()
logging.info("Repo copy path %s", REPO_COPY)
logging.info("Repo copy path %s", repo_path)
s3_helper = S3Helper()
@ -263,7 +263,7 @@ def main():
logging.info("Build short name %s", build_name)
build_output_path = temp_path / build_name
os.makedirs(build_output_path, exist_ok=True)
build_output_path.mkdir(parents=True, exist_ok=True)
cargo_cache = CargoCache(
temp_path / "cargo_cache" / "registry", temp_path, s3_helper
)
@ -271,7 +271,7 @@ def main():
packager_cmd = get_packager_cmd(
build_config,
os.path.join(REPO_COPY, "docker/packager"),
repo_path / "docker" / "packager",
build_output_path,
cargo_cache.directory,
version.string,
@ -282,7 +282,7 @@ def main():
logging.info("Going to run packager with %s", packager_cmd)
logs_path = temp_path / "build_log"
os.makedirs(logs_path, exist_ok=True)
logs_path.mkdir(parents=True, exist_ok=True)
start = time.time()
log_path, build_status = build_clickhouse(
@ -316,7 +316,7 @@ def main():
"Uploaded performance.tar.zst to %s, now delete to avoid duplication",
performance_urls[0],
)
os.remove(performance_path)
performance_path.unlink()
build_urls = (
s3_helper.upload_build_directory_to_s3(
@ -413,7 +413,7 @@ FORMAT JSONCompactEachRow"""
}
url = f"https://{ci_logs_credentials.host}/"
profiles_dir = temp_path / "profiles_source"
os.makedirs(profiles_dir, exist_ok=True)
profiles_dir.mkdir(parents=True, exist_ok=True)
logging.info("Processing profile JSON files from {GIT_REPO_ROOT}/build_docker")
git_runner(
"./utils/prepare-time-trace/prepare-time-trace.sh "
@ -421,7 +421,7 @@ FORMAT JSONCompactEachRow"""
)
profile_data_file = temp_path / "profile.json"
with open(profile_data_file, "wb") as profile_fd:
for profile_sourse in os.listdir(profiles_dir):
for profile_sourse in profiles_dir.iterdir():
with open(profiles_dir / profile_sourse, "rb") as ps_fd:
profile_fd.write(ps_fd.read())

View File

@ -58,10 +58,9 @@ def get_ccache_if_not_exists(
download_build_with_progress(url, compressed_cache)
path_to_decompress = path_to_ccache_dir.parent
if not path_to_decompress.exists():
os.makedirs(path_to_decompress)
path_to_decompress.mkdir(parents=True, exist_ok=True)
if os.path.exists(path_to_ccache_dir):
if path_to_ccache_dir.exists():
shutil.rmtree(path_to_ccache_dir)
logging.info("Ccache already exists, removing it")
@ -74,7 +73,7 @@ def get_ccache_if_not_exists(
if not cache_found:
logging.info("ccache not found anywhere, cannot download anything :(")
if os.path.exists(path_to_ccache_dir):
if path_to_ccache_dir.exists():
logging.info("But at least we have some local cache")
else:
logging.info("ccache downloaded")

View File

@ -28,6 +28,7 @@ import logging
import os
from contextlib import contextmanager
from datetime import date, datetime, timedelta
from pathlib import Path
from subprocess import CalledProcessError
from typing import List, Optional
@ -617,8 +618,8 @@ def stash():
def main():
if not os.path.exists(TEMP_PATH):
os.makedirs(TEMP_PATH)
temp_path = Path(TEMP_PATH)
temp_path.mkdir(parents=True, exist_ok=True)
args = parse_args()
if args.debug_helpers:
@ -636,7 +637,7 @@ def main():
args.backport_created_label,
)
# https://github.com/python/mypy/issues/3004
bp.gh.cache_path = f"{TEMP_PATH}/gh_cache" # type: ignore
bp.gh.cache_path = temp_path / "gh_cache"
bp.receive_release_prs()
bp.update_local_release_branches()
bp.receive_prs_for_backport()

View File

@ -5,7 +5,6 @@ from pathlib import Path
from typing import List, Tuple
import argparse
import logging
import os
import subprocess
import sys
@ -169,11 +168,10 @@ def main():
download_builds_filter(args.check_name, reports_path, packages_path, url_filter)
for f in os.listdir(packages_path):
if ".deb" in f:
full_path = os.path.join(packages_path, f)
for package in packages_path.iterdir():
if package.suffix == ".deb":
subprocess.check_call(
f"dpkg -x {full_path} {packages_path} && rm {full_path}", shell=True
f"dpkg -x {package} {packages_path} && rm {package}", shell=True
)
server_log_path = temp_path / "server_log"

View File

@ -5,7 +5,6 @@ This file is needed to avoid cicle import build_download_helper.py <=> env_helpe
import argparse
import logging
import os
from pathlib import Path
from build_download_helper import download_build_with_progress
@ -14,8 +13,6 @@ from env_helper import RUNNER_TEMP, S3_ARTIFACT_DOWNLOAD_TEMPLATE
from git_helper import Git, commit
from version_helper import get_version_from_repo, version_arg
TEMP_PATH = os.path.join(RUNNER_TEMP, "download_binary")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
@ -58,7 +55,7 @@ def parse_args() -> argparse.Namespace:
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
args = parse_args()
temp_path = Path(TEMP_PATH)
temp_path = Path(RUNNER_TEMP) / Path(__file__).name
temp_path.mkdir(parents=True, exist_ok=True)
for build in args.build_names:
# check if it's in CI_CONFIG

View File

@ -202,10 +202,12 @@ def main():
description = format_description(test_results[-1].name)
ch_helper = ClickHouseHelper()
s3_path_prefix = os.path.join(
get_release_or_pr(pr_info, get_version_from_repo())[0],
pr_info.sha,
"fast_tests",
s3_path_prefix = "/".join(
(
get_release_or_pr(pr_info, get_version_from_repo())[0],
pr_info.sha,
"fast_tests",
)
)
build_urls = s3_helper.upload_build_directory_to_s3(
output_path / "binaries",

View File

@ -15,7 +15,15 @@ from github import Github
from commit_status_helper import RerunHelper, get_commit, post_commit_status
from ci_config import CI_CONFIG
from docker_pull_helper import get_image_with_version
from env_helper import GITHUB_EVENT_PATH, GITHUB_RUN_URL, S3_BUILDS_BUCKET, S3_DOWNLOAD
from env_helper import (
GITHUB_EVENT_PATH,
GITHUB_RUN_URL,
REPO_COPY,
REPORTS_PATH,
S3_BUILDS_BUCKET,
S3_DOWNLOAD,
TEMP_PATH,
)
from get_robot_token import get_best_robot_token, get_parameter_from_ssm
from pr_info import PRInfo
from s3_helper import S3Helper
@ -61,42 +69,19 @@ def get_run_command(
)
class RamDrive:
def __init__(self, path, size):
self.path = path
self.size = size
def __enter__(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
subprocess.check_call(
f"sudo mount -t tmpfs -o rw,size={self.size} tmpfs {self.path}", shell=True
)
def __exit__(self, exc_type, exc_val, exc_tb):
subprocess.check_call(f"sudo umount {self.path}", shell=True)
if __name__ == "__main__":
def main():
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = os.getenv("TEMP_PATH", os.path.abspath("."))
repo_path = os.getenv("REPO_COPY", os.path.abspath("../../"))
repo_tests_path = os.path.join(repo_path, "tests")
ramdrive_path = os.getenv("RAMDRIVE_PATH", os.path.join(temp_path, "ramdrive"))
# currently unused, doesn't make tests more stable
ramdrive_size = os.getenv("RAMDRIVE_SIZE", "0G")
reports_path = os.getenv("REPORTS_PATH", "./reports")
temp_path = Path(TEMP_PATH)
temp_path.mkdir(parents=True, exist_ok=True)
repo_tests_path = Path(REPO_COPY, "tests")
reports_path = Path(REPORTS_PATH)
check_name = sys.argv[1]
required_build = CI_CONFIG.test_configs[check_name].required_build
if not os.path.exists(temp_path):
os.makedirs(temp_path)
with open(GITHUB_EVENT_PATH, "r", encoding="utf-8") as event_file:
event = json.load(event_file)
@ -158,10 +143,8 @@ if __name__ == "__main__":
docker_image = get_image_with_version(reports_path, IMAGE_NAME)
# with RamDrive(ramdrive_path, ramdrive_size):
result_path = ramdrive_path
if not os.path.exists(result_path):
os.makedirs(result_path)
result_path = temp_path / "result"
result_path.mkdir(parents=True, exist_ok=True)
database_url = get_parameter_from_ssm("clickhouse-test-stat-url")
database_username = get_parameter_from_ssm("clickhouse-test-stat-login")
@ -190,8 +173,8 @@ if __name__ == "__main__":
)
logging.info("Going to run command %s", run_command)
run_log_path = os.path.join(temp_path, "run.log")
compare_log_path = os.path.join(result_path, "compare.log")
run_log_path = temp_path / "run.log"
compare_log_path = result_path / "compare.log"
popen_env = os.environ.copy()
popen_env.update(env_extra)
@ -206,13 +189,11 @@ if __name__ == "__main__":
paths = {
"compare.log": compare_log_path,
"output.7z": os.path.join(result_path, "output.7z"),
"report.html": os.path.join(result_path, "report.html"),
"all-queries.html": os.path.join(result_path, "all-queries.html"),
"queries.rep": os.path.join(result_path, "queries.rep"),
"all-query-metrics.tsv": os.path.join(
result_path, "report/all-query-metrics.tsv"
),
"output.7z": result_path / "output.7z",
"report.html": result_path / "report.html",
"all-queries.html": result_path / "all-queries.html",
"queries.rep": result_path / "queries.rep",
"all-query-metrics.tsv": result_path / "report/all-query-metrics.tsv",
"run.log": run_log_path,
}
@ -238,7 +219,8 @@ if __name__ == "__main__":
def too_many_slow(msg):
match = re.search(r"(|.* )(\d+) slower.*", msg)
# This threshold should be synchronized with the value in https://github.com/ClickHouse/ClickHouse/blob/master/docker/test/performance-comparison/report.py#L629
# This threshold should be synchronized with the value in
# https://github.com/ClickHouse/ClickHouse/blob/master/docker/test/performance-comparison/report.py#L629
threshold = 5
return int(match.group(2).strip()) > threshold if match else False
@ -246,9 +228,7 @@ if __name__ == "__main__":
status = ""
message = ""
try:
with open(
os.path.join(result_path, "report.html"), "r", encoding="utf-8"
) as report_fd:
with open(result_path / "report.html", "r", encoding="utf-8") as report_fd:
report_text = report_fd.read()
status_match = re.search("<!--[ ]*status:(.*)-->", report_text)
message_match = re.search("<!--[ ]*message:(.*)-->", report_text)
@ -276,17 +256,12 @@ if __name__ == "__main__":
report_url = GITHUB_RUN_URL
if uploaded["run.log"]:
report_url = uploaded["run.log"]
if uploaded["compare.log"]:
report_url = uploaded["compare.log"]
if uploaded["output.7z"]:
report_url = uploaded["output.7z"]
if uploaded["report.html"]:
report_url = uploaded["report.html"]
report_url = (
uploaded["report.html"]
or uploaded["output.7z"]
or uploaded["compare.log"]
or uploaded["run.log"]
)
post_commit_status(
commit, status, report_url, message, check_name_with_group, pr_info
@ -294,3 +269,7 @@ if __name__ == "__main__":
if status == "error":
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import hashlib
import logging
import os
import re
import shutil
import time
@ -131,11 +130,7 @@ class S3Helper:
def fast_parallel_upload_dir(
self, dir_path: Path, s3_dir_path: str, bucket_name: str
) -> List[str]:
all_files = []
for root, _, files in os.walk(dir_path):
for file in files:
all_files.append(os.path.join(root, file))
all_files = [file for file in dir_path.rglob("*") if file.is_file()]
logging.info("Files found %s", len(all_files))
@ -143,12 +138,13 @@ class S3Helper:
t = time.time()
sum_time = 0
def upload_task(file_path: str) -> str:
def upload_task(file_path: Path) -> str:
nonlocal counter
nonlocal t
nonlocal sum_time
file_str = file_path.as_posix()
try:
s3_path = file_path.replace(str(dir_path), s3_dir_path)
s3_path = file_str.replace(str(dir_path), s3_dir_path)
metadata = {}
if s3_path.endswith("html"):
metadata["ContentType"] = "text/html; charset=utf-8"
@ -214,11 +210,11 @@ class S3Helper:
def task(file_path: Path) -> Union[str, List[str]]:
full_fs_path = file_path.absolute()
if keep_dirs_in_s3_path:
full_s3_path = os.path.join(s3_directory_path, directory_path.name)
full_s3_path = "/".join((s3_directory_path, directory_path.name))
else:
full_s3_path = s3_directory_path
if os.path.isdir(full_fs_path):
if full_fs_path.is_dir():
return self._upload_directory_to_s3(
full_fs_path,
full_s3_path,

View File

@ -1,31 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""This script is used in docker images for stress tests and upgrade tests"""
from multiprocessing import cpu_count
from pathlib import Path
from subprocess import Popen, call, check_output, STDOUT, PIPE
import os
from typing import List
import argparse
import logging
import time
import random
import time
def get_options(i, upgrade_check):
def get_options(i: int, upgrade_check: bool) -> str:
options = []
client_options = []
if i > 0:
options.append("--order=random")
if i % 3 == 2 and not upgrade_check:
options.append(
'''--db-engine="Replicated('/test/db/test_{}', 's1', 'r1')"'''.format(i)
)
options.append(f'''--db-engine="Replicated('/test/db/test_{i}', 's1', 'r1')"''')
client_options.append("allow_experimental_database_replicated=1")
client_options.append("enable_deflate_qpl_codec=1")
# If database name is not specified, new database is created for each functional test.
# Run some threads with one database for all tests.
if i % 2 == 1:
options.append(" --database=test_{}".format(i))
options.append(f" --database=test_{i}")
if i % 3 == 1:
client_options.append("join_use_nulls=1")
@ -66,45 +66,43 @@ def get_options(i, upgrade_check):
def run_func_test(
cmd,
output_prefix,
num_processes,
skip_tests_option,
global_time_limit,
upgrade_check,
):
cmd: str,
output_prefix: Path,
num_processes: int,
skip_tests_option: str,
global_time_limit: int,
upgrade_check: bool,
) -> List[Popen]:
upgrade_check_option = "--upgrade-check" if upgrade_check else ""
global_time_limit_option = ""
if global_time_limit:
global_time_limit_option = "--global_time_limit={}".format(global_time_limit)
global_time_limit_option = (
f"--global_time_limit={global_time_limit}" if global_time_limit else ""
)
output_paths = [
os.path.join(output_prefix, "stress_test_run_{}.txt".format(i))
for i in range(num_processes)
output_prefix / f"stress_test_run_{i}.txt" for i in range(num_processes)
]
pipes = []
for i, path in enumerate(output_paths):
f = open(path, "w")
full_command = "{} {} {} {} {}".format(
cmd,
get_options(i, upgrade_check),
global_time_limit_option,
skip_tests_option,
upgrade_check_option,
)
logging.info("Run func tests '%s'", full_command)
p = Popen(full_command, shell=True, stdout=f, stderr=f)
pipes.append(p)
time.sleep(0.5)
with open(path, "w") as op:
full_command = (
f"{cmd} {get_options(i, upgrade_check)} {global_time_limit_option} "
f"{skip_tests_option} {upgrade_check_option}"
)
logging.info("Run func tests '%s'", full_command)
pipes.append(Popen(full_command, shell=True, stdout=op, stderr=op))
time.sleep(0.5)
return pipes
def compress_stress_logs(output_path, files_prefix):
cmd = f"cd {output_path} && tar --zstd --create --file=stress_run_logs.tar.zst {files_prefix}* && rm {files_prefix}*"
def compress_stress_logs(output_path: Path, files_prefix: str) -> None:
cmd = (
f"cd {output_path} && tar --zstd --create --file=stress_run_logs.tar.zst "
f"{files_prefix}* && rm {files_prefix}*"
)
check_output(cmd, shell=True)
def call_with_retry(query, timeout=30, retry_count=5):
def call_with_retry(query: str, timeout: int = 30, retry_count: int = 5) -> None:
for i in range(retry_count):
code = call(query, shell=True, stderr=STDOUT, timeout=timeout)
if code != 0:
@ -113,11 +111,14 @@ def call_with_retry(query, timeout=30, retry_count=5):
break
def make_query_command(query):
return f"""clickhouse client -q "{query}" --max_untracked_memory=1Gi --memory_profiler_step=1Gi --max_memory_usage_for_user=0"""
def make_query_command(query: str) -> str:
return (
f'clickhouse client -q "{query}" --max_untracked_memory=1Gi '
"--memory_profiler_step=1Gi --max_memory_usage_for_user=0"
)
def prepare_for_hung_check(drop_databases):
def prepare_for_hung_check(drop_databases: bool) -> bool:
# FIXME this function should not exist, but...
# We attach gdb to clickhouse-server before running tests
@ -149,28 +150,33 @@ def prepare_for_hung_check(drop_databases):
call_with_retry(make_query_command("KILL QUERY WHERE upper(query) LIKE 'WATCH %'"))
# Kill other queries which known to be slow
# It's query from 01232_preparing_sets_race_condition_long, it may take up to 1000 seconds in slow builds
# It's query from 01232_preparing_sets_race_condition_long,
# it may take up to 1000 seconds in slow builds
call_with_retry(
make_query_command("KILL QUERY WHERE query LIKE 'insert into tableB select %'")
)
# Long query from 00084_external_agregation
call_with_retry(
make_query_command(
"KILL QUERY WHERE query LIKE 'SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u %'"
"KILL QUERY WHERE query LIKE 'SELECT URL, uniq(SearchPhrase) AS u FROM "
"test.hits GROUP BY URL ORDER BY u %'"
)
)
# Long query from 02136_kill_scalar_queries
call_with_retry(
make_query_command(
"KILL QUERY WHERE query LIKE 'SELECT (SELECT number FROM system.numbers WHERE number = 1000000000000)%'"
"KILL QUERY WHERE query LIKE "
"'SELECT (SELECT number FROM system.numbers WHERE number = 1000000000000)%'"
)
)
if drop_databases:
for i in range(5):
try:
# Here we try to drop all databases in async mode. If some queries really hung, than drop will hung too.
# Otherwise we will get rid of queries which wait for background pool. It can take a long time on slow builds (more than 900 seconds).
# Here we try to drop all databases in async mode.
# If some queries really hung, than drop will hung too.
# Otherwise we will get rid of queries which wait for background pool.
# It can take a long time on slow builds (more than 900 seconds).
#
# Also specify max_untracked_memory to allow 1GiB of memory to overcommit.
databases = (
@ -195,7 +201,8 @@ def prepare_for_hung_check(drop_databases):
time.sleep(i)
else:
raise Exception(
"Cannot drop databases after stress tests. Probably server consumed too much memory and cannot execute simple queries"
"Cannot drop databases after stress tests. Probably server consumed "
"too much memory and cannot execute simple queries"
)
# Wait for last queries to finish if any, not longer than 300 seconds
@ -220,7 +227,7 @@ def prepare_for_hung_check(drop_databases):
# Even if all clickhouse-test processes are finished, there are probably some sh scripts,
# which still run some new queries. Let's ignore them.
try:
query = """clickhouse client -q "SELECT count() FROM system.processes where elapsed > 300" """
query = 'clickhouse client -q "SELECT count() FROM system.processes where elapsed > 300" '
output = (
check_output(query, shell=True, stderr=STDOUT, timeout=30)
.decode("utf-8")
@ -233,9 +240,12 @@ def prepare_for_hung_check(drop_databases):
return True
def is_ubsan_build():
def is_ubsan_build() -> bool:
try:
query = """clickhouse client -q "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'" """
query = (
'clickhouse client -q "SELECT value FROM system.build_options '
"WHERE name = 'CXX_FLAGS'\" "
)
output = (
check_output(query, shell=True, stderr=STDOUT, timeout=30)
.decode("utf-8")
@ -247,27 +257,34 @@ def is_ubsan_build():
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="ClickHouse script for running stresstest"
)
parser.add_argument("--test-cmd", default="/usr/bin/clickhouse-test")
parser.add_argument("--skip-func-tests", default="")
parser.add_argument("--server-log-folder", default="/var/log/clickhouse-server")
parser.add_argument("--output-folder")
parser.add_argument(
"--server-log-folder", default="/var/log/clickhouse-server", type=Path
)
parser.add_argument("--output-folder", type=Path)
parser.add_argument("--global-time-limit", type=int, default=1800)
parser.add_argument("--num-parallel", type=int, default=cpu_count())
parser.add_argument("--upgrade-check", action="store_true")
parser.add_argument("--hung-check", action="store_true", default=False)
# make sense only for hung check
parser.add_argument("--drop-databases", action="store_true", default=False)
return parser.parse_args()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
args = parse_args()
args = parser.parse_args()
if args.drop_databases and not args.hung_check:
raise Exception("--drop-databases only used in hung check (--hung-check)")
# FIXME Hung check with ubsan is temporarily disabled due to https://github.com/ClickHouse/ClickHouse/issues/45372
# FIXME Hung check with ubsan is temporarily disabled due to
# https://github.com/ClickHouse/ClickHouse/issues/45372
suppress_hung_check = is_ubsan_build()
func_pipes = []
@ -329,7 +346,7 @@ if __name__ == "__main__":
"00001_select_1",
]
)
hung_check_log = os.path.join(args.output_folder, "hung_check.log")
hung_check_log = args.output_folder / "hung_check.log" # type: Path
tee = Popen(["/usr/bin/tee", hung_check_log], stdin=PIPE)
res = call(cmd, shell=True, stdout=tee.stdin, stderr=STDOUT)
if tee.stdin is not None:
@ -338,10 +355,12 @@ if __name__ == "__main__":
logging.info("Hung check failed with exit code %d", res)
else:
hung_check_status = "No queries hung\tOK\t\\N\t\n"
with open(
os.path.join(args.output_folder, "test_results.tsv"), "w+"
) as results:
with open(args.output_folder / "test_results.tsv", "w+") as results:
results.write(hung_check_status)
os.remove(hung_check_log)
hung_check_log.unlink()
logging.info("Stress test finished")
if __name__ == "__main__":
main()